Source code for statick_tool.tool_plugin

"""Tool plugin."""

import argparse
import logging
import os
import re
import shlex
import subprocess
from typing import Any, Match, Optional, Pattern, Union

from statick_tool.issue import Issue
from statick_tool.package import Package
from statick_tool.plugin_context import PluginContext


[docs] class ToolPlugin: """Default implementation of tool plugin.""" plugin_context = None TOOL_MISSING_STR = "Not installed" TOOL_UNKNOWN_STR = "Unknown"
[docs] def get_name(self) -> str: # type: ignore[empty-body] """Get name of tool. Returns: Name of tool. """ pass # pylint: disable=unnecessary-pass
[docs] @classmethod def get_tool_dependencies(cls) -> list[str]: """Get a list of tools that must run before this one. Returns: List of tool dependencies for a tool. """ return []
[docs] def gather_args(self, args: argparse.Namespace) -> None: """Gather arguments. Args: args: Flags for plugins will be added to existing arguments. """
[docs] def get_file_types(self) -> list[str]: # type: ignore[empty-body] """Return a list of file types the plugin can scan. Returns: List of file types the plugin can scan. """
[docs] def get_binary( # pylint: disable=unused-argument self, level: Optional[str] = None, package: Optional[Package] = None ) -> str: """Get tool binary name. Arguments are required because some tools may need to know the package or level to determine the binary name. The binary name can change, most often to add a version number as a suffix. Args: level: Level at which to run tool. package: Package on which to run tool. """ return self.get_name()
[docs] def get_version(self) -> str: """Figure out and return the version of the tool that's installed. If no version is found the function returns "Unknown". Returns: Version of the tool that's installed. """ tool_bin = self.get_binary() if not tool_bin: return self.TOOL_UNKNOWN_STR try: output = subprocess.check_output( [tool_bin, "--version"], stderr=subprocess.STDOUT ) return output.decode("utf-8") except subprocess.CalledProcessError: # NOLINT return self.TOOL_UNKNOWN_STR except FileNotFoundError: # NOLINT return self.TOOL_MISSING_STR
[docs] def get_version_from_pkg(self, subproc_args: list[str], ver_re_str: str) -> str: """Figure out and return the version of the tool that's installed. If no version is found the function returns "Unknown". Args: subproc_args: Arguments to pass to subprocess. ver_re_str: Regular expression to use to parse the version from the output. Returns: Version of the tool that's installed. """ version = self.TOOL_MISSING_STR try: output = subprocess.check_output( subproc_args, stderr=subprocess.STDOUT, universal_newlines=True, ) except subprocess.CalledProcessError: # NOLINT return self.TOOL_UNKNOWN_STR except FileNotFoundError: # NOLINT return self.TOOL_UNKNOWN_STR parse: Pattern[str] = re.compile(ver_re_str) for line in output.splitlines(): match: Optional[Match[str]] = parse.match(line) if match: return line return version
[docs] def get_version_from_apt(self) -> str: """Figure out and return the version of the tool that's installed by apt. Returns: Version of the tool that's installed. """ tool_bin = self.get_binary() if not tool_bin: return self.TOOL_UNKNOWN_STR return self.get_version_from_pkg( subproc_args=["dpkg", "-l"], ver_re_str=rf"(.+{tool_bin}.*)" )
[docs] def get_version_from_docker(self) -> str: """Figure out and return the version of the tool that's installed by Docker. Returns: Version of the tool that's installed. """ tool_bin = self.get_binary() if not tool_bin: return self.TOOL_UNKNOWN_STR return self.get_version_from_pkg( subproc_args=["docker", "image", "list"], ver_re_str=rf"(.+{tool_bin}.*)" )
[docs] def get_version_from_npm(self) -> str: """Figure out and return the version of the tool that's installed by npm. Returns: Version of the tool that's installed. """ tool_bin = self.get_binary() if not tool_bin: return self.TOOL_UNKNOWN_STR ver_re = rf"(.+{tool_bin}.*)@([0-9]*\.?[0-9]+\.?[0-9]+)" version = self.get_version_from_pkg( subproc_args=["npm", "list"], ver_re_str=ver_re ) if version in [self.TOOL_MISSING_STR, self.TOOL_UNKNOWN_STR]: # if not found locally, check globally version = self.get_version_from_pkg( subproc_args=["npm", "list", "-g"], ver_re_str=ver_re ) return version
[docs] def scan(self, package: Package, level: str) -> Optional[list[Issue]]: """Run tool and gather output. Args: package: Package to scan. level: Level at which to scan. Returns: List of issues from tool. """ files: list[str] = [] for file_type in self.get_file_types(): if file_type in package and package[file_type]: files += package[file_type] if files: total_output = ( # pylint: disable=assignment-from-no-return self.process_files(package, level, files, self.get_user_flags(level)) ) if total_output is not None: if self.plugin_context and self.plugin_context.args.output_directory: with open(self.get_name() + ".log", "w", encoding="utf8") as fid: for output in total_output: fid.write(output) return self.parse_output(total_output, package) return None return []
[docs] def process_files( self, package: Package, level: str, files: list[str], user_flags: list[str] ) -> Optional[list[str]]: """Run tool and gather output. Args: package: Package to scan. level: Level at which to scan. files: List of files to scan. user_flags: User-defined flags. Returns: List of output from tool. """
[docs] def parse_output( # type: ignore[empty-body] self, total_output: list[str], package: Optional[Package] = None ) -> list[Issue]: # pyright: ignore """Parse tool output and report issues. Args: total_output: Output from tool. package: Package with issues. Returns: List of issues. """
[docs] def set_plugin_context(self, plugin_context: Union[None, PluginContext]) -> None: """Set the plugin context. Args: plugin_context: Plugin context. """ self.plugin_context = plugin_context
[docs] def load_mapping(self) -> dict[str, str]: """Load a mapping between warnings and identifiers. Returns: Mapping between warnings and identifiers. """ file_name: str = f"plugin_mapping/{self.get_name()}.txt" assert self.plugin_context is not None full_path: Union[Any, str, None] = self.plugin_context.resources.get_file( file_name ) if ( "mapping_file_suffix" in self.plugin_context.args and self.plugin_context.args.mapping_file_suffix is not None ): # If the user specified a suffix, try to get the suffixed version of the # file. suffixed_file_name = ( f"plugin_mapping/{self.get_name()}-" f"{self.plugin_context.args.mapping_file_suffix}.txt" ) suffixed_full_path = self.plugin_context.resources.get_file( suffixed_file_name ) if suffixed_full_path is not None: # If there actually is a file with that suffix, use it. # Else use the un-suffixed version. full_path = suffixed_full_path if full_path is None: return {} warning_mapping: dict[str, str] = {} with open(full_path, "r", encoding="utf8") as mapping_file: for line in mapping_file.readlines(): split_line = line.strip().split(":") if len(split_line) != 2: logging.warning( "Invalid line %s in mapping file %s", line, file_name ) continue warning_mapping[split_line[0]] = split_line[1] return warning_mapping
[docs] def get_user_flags(self, level: str, name: Optional[str] = None) -> list[str]: """Get the user-defined extra flags for a specific tool/level combination. Args: level: Level at which to scan. name: Name of the tool. Returns: List of user-defined flags. """ if name is None: name = self.get_name() # pylint: disable=assignment-from-no-return assert self.plugin_context is not None user_flags = self.plugin_context.config.get_tool_config(name, level, "flags") flags: list[str] = [] if user_flags: # See https://github.com/python/typeshed/issues/1476 for # justification to ignore. lex = shlex.shlex(user_flags, posix=True) lex.whitespace_split = True flags = list(lex) return flags
[docs] @staticmethod def is_valid_executable(path: str) -> bool: """Return whether a provided command exists and is executable. If the provided path has an extension on it, don't change it, otherwise try adding common extensions. Args: path: Path to tool binary. Returns: True if the path is a valid executable, False otherwise """ # On Windows, PATHEXT contains a list of extensions which can be # appended to a program name when searching PATH. extensions = os.environ.get("PATHEXT", None) _, path_ext = os.path.splitext(path) if path_ext or not extensions: return os.path.isfile(path) and os.access(path, os.X_OK) extensions_list = extensions.split(";") # Add "" (no extension) as a possibility. extensions_list.insert(0, "") for ext in extensions_list: extended_path = path + ext if os.path.isfile(extended_path) and os.access(extended_path, os.X_OK): return True return False
[docs] @staticmethod def command_exists(command: str) -> bool: """Return whether a particular command is available on $PATH. Args: command: Command to check for. Returns: True if the command is available on $PATH, False otherwise. """ fpath, _ = os.path.split(command) if fpath: # Contains a path, not just a command, so don't search PATH return ToolPlugin.is_valid_executable(command) for path in os.environ["PATH"].split(os.pathsep): exe_path = os.path.join(path, command) if ToolPlugin.is_valid_executable(exe_path): return True return False