Source code for statick_tool.statick

"""Code analysis front-end."""
import argparse
import copy
import io
import logging
import multiprocessing
import os
import sys
import time
from logging.handlers import MemoryHandler
from typing import Any, Dict, List, Optional, Tuple

from yapsy.PluginManager import PluginManager

from statick_tool import __version__
from statick_tool.config import Config
from statick_tool.discovery_plugin import DiscoveryPlugin
from statick_tool.exceptions import Exceptions
from statick_tool.issue import Issue
from statick_tool.package import Package
from statick_tool.plugin_context import PluginContext
from statick_tool.profile import Profile
from statick_tool.reporting_plugin import ReportingPlugin
from statick_tool.resources import Resources
from statick_tool.timing import Timing
from statick_tool.tool_plugin import ToolPlugin


[docs]class Statick: # pylint: disable=too-many-instance-attributes """Code analysis front-end.""" def __init__(self, user_paths: List[str]) -> None: """Initialize Statick.""" self.default_level = "default" self.resources = Resources(user_paths) self.manager = PluginManager() self.manager.setPluginPlaces(self.resources.get_plugin_paths()) self.manager.setCategoriesFilter( { "Discovery": DiscoveryPlugin, "Tool": ToolPlugin, "Reporting": ReportingPlugin, } ) self.manager.collectPlugins() self.discovery_plugins: Dict[str, Any] = {} for plugin_info in self.manager.getPluginsOfCategory("Discovery"): self.discovery_plugins[ plugin_info.plugin_object.get_name() ] = plugin_info.plugin_object self.tool_plugins: Dict[str, Any] = {} for plugin_info in self.manager.getPluginsOfCategory("Tool"): self.tool_plugins[ plugin_info.plugin_object.get_name() ] = plugin_info.plugin_object self.reporting_plugins: Dict[str, Any] = {} for plugin_info in self.manager.getPluginsOfCategory("Reporting"): self.reporting_plugins[ plugin_info.plugin_object.get_name() ] = plugin_info.plugin_object self.config: Optional[Config] = None self.exceptions: Optional[Exceptions] = None self.timings: List[Timing] = []
[docs] @staticmethod def set_logging_level(args: argparse.Namespace) -> None: """Set the logging level to use for output. Valid levels are: DEBUG, INFO, WARNING, ERROR, CRITICAL. Specifying the level is case-insensitive (both upper-case and lower-case are allowed). """ valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] log_level = args.log_level.upper() if log_level not in valid_levels: log_level = "WARNING" args.log_level = log_level logging.basicConfig(level=log_level) logging.root.setLevel(log_level) logging.info("Log level set to %s", args.log_level.upper())
[docs] @classmethod def set_cpu_count(cls, num_cpus: str) -> int: """Set correct number of CPU cores to use.""" max_cpus = multiprocessing.cpu_count() desired = int(num_cpus) if desired > max_cpus or desired == -1: return max_cpus if desired > 1: return desired return 1
[docs] def get_config(self, args: argparse.Namespace) -> None: """Get Statick configuration.""" base_config_filename = "config.yaml" user_config_filename = "" if args.config is not None: user_config_filename = args.config try: self.config = Config( self.resources.get_file(base_config_filename), self.resources.get_file(user_config_filename), self.default_level, ) except OSError as ex: logging.error( "Failed to access configuration file %s or %s: %s", base_config_filename, user_config_filename, ex, ) except ValueError as ex: logging.error( "Configuration file %s or %s has errors: %s", base_config_filename, user_config_filename, ex, )
[docs] def get_exceptions(self, args: argparse.Namespace) -> None: """Get Statick exceptions.""" exceptions_filename = "exceptions.yaml" if args.exceptions is not None: exceptions_filename = args.exceptions try: self.exceptions = Exceptions(self.resources.get_file(exceptions_filename)) except OSError as ex: logging.error( "Failed to access exceptions file %s: %s", exceptions_filename, ex ) except ValueError as ex: logging.error("Exceptions file %s has errors: %s", exceptions_filename, ex)
[docs] def get_ignore_packages(self) -> List[str]: """Get packages to ignore during scan process.""" if self.exceptions is None: return [] return self.exceptions.get_ignore_packages()
[docs] def gather_args(self, args: argparse.ArgumentParser) -> None: """Gather arguments.""" args.add_argument( "--output-directory", dest="output_directory", type=str, help="Directory to write output files to", ) args.add_argument( "--log", dest="log_level", type=str, default="WARNING", help="Verbosity level of output to show (DEBUG, INFO, WARNING, ERROR" ", CRITICAL)", ) args.add_argument( "--check", dest="check", action="store_true", help="Return the status. Return code 0 means there were no issues. \ Return code 1 means there were issues.", ) args.add_argument( "--config", dest="config", type=str, help="Name of config yaml file" ) args.add_argument( "--level", dest="level", type=str, help="Scan level to use from config file. \ Overrides any levels specified by the profile.", ) args.add_argument( "--profile", dest="profile", type=str, help="Name of profile yaml file" ) args.add_argument( "--exceptions", dest="exceptions", type=str, help="Name of exceptions yaml file", ) args.add_argument( "--force-tool-list", dest="force_tool_list", type=str, help="Force only the given list of tools to run", ) args.add_argument( "--version", action="version", version=f"%(prog)s {__version__}", ) args.add_argument( "--mapping-file-suffix", dest="mapping_file_suffix", type=str, help="Suffix to use when searching for CERT mapping files", ) args.add_argument( "--timings", dest="timings", action="store_true", help="Enable printing timing information to stdout", ) # Statick workspace arguments. args.add_argument( "-ws", dest="workspace", action="store_true", help="Treat the path argument as a workspace of multiple packages", ) args.add_argument( "--max-procs", dest="max_procs", type=self.set_cpu_count, default=int(multiprocessing.cpu_count() / 2), help="Maximum number of CPU cores to use. " "Defaults to half the available CPU cores. Setting to -1 will " "cause Statick to use all available CPU cores", ) args.add_argument( "--packages-file", dest="packages_file", type=str, help="File listing packages to scan, only used when running on a workspace", ) args.add_argument( "--list-packages", dest="list_packages", action="store_true", help="List packages and levels, only used when running on a workspace", ) for _, plugin in list(self.discovery_plugins.items()): plugin.gather_args(args) for _, plugin in list(self.tool_plugins.items()): plugin.gather_args(args) for _, plugin in list(self.reporting_plugins.items()): plugin.gather_args(args)
[docs] def get_level(self, path: str, args: argparse.Namespace) -> Optional[str]: """Get level to scan package at.""" path = os.path.abspath(path) if args.level is not None: return str(args.level) profile_filename = "profile.yaml" if args.profile is not None: profile_filename = args.profile profile_resource = self.resources.get_file(profile_filename) if profile_resource is None: logging.error("Could not find profile file %s!", profile_filename) return None try: profile = Profile(profile_resource) except OSError as ex: # This isn't quite redundant with the profile_resource check: it's possible # that something else triggers an OSError, like permissions. logging.error("Failed to access profile file %s: %s", profile_filename, ex) return None except ValueError as ex: logging.error("Profile file %s has errors: %s", profile_filename, ex) return None package = Package(os.path.basename(path), path) level = profile.get_package_level(package) return level
[docs] def add_timing( self, package: str, name: str, plugin_type: str, duration: str ) -> None: """Add an entry to the timings list.""" timing = Timing(package, name, plugin_type, duration) self.timings.append(timing)
[docs] def get_timings(self) -> List[Timing]: """Return list of timings for each component.""" return self.timings
# pylint: disable=too-many-locals, too-many-return-statements, too-many-branches # pylint: disable=too-many-statements
[docs] def run( self, path: str, args: argparse.Namespace, start_time: Optional[float] = None ) -> Tuple[Optional[Dict[str, List[Issue]]], bool]: """Run scan tools against targets on path.""" success = True path = os.path.abspath(path) if not os.path.exists(path): logging.error("No package found at %s!", path) return None, False package = Package(os.path.basename(path), path) level: Optional[str] = self.get_level(path, args) logging.info("level: %s", level) if level is None: logging.error("Level is not valid.") return None, False if not self.config or ( level != self.default_level and not self.config.has_level(level) ): logging.error("Can't find specified level %s in config!", level) return None, False orig_path = os.getcwd() if args.output_directory: if not os.path.isdir(args.output_directory): try: os.mkdir(args.output_directory) except OSError as ex: logging.error( "Unable to create output directory at %s: %s", args.output_directory, ex, ) return None, False output_dir = os.path.join(args.output_directory, package.name + "-" + level) if not os.path.isdir(output_dir): try: os.mkdir(output_dir) except OSError as ex: logging.error( "Unable to create output directory at %s: %s", output_dir, ex ) return None, False logging.info("Writing output to: %s", output_dir) os.chdir(output_dir) logging.info("------") logging.info( "Scanning package %s (%s) at level %s", package.name, package.path, level ) issues: Dict[str, List[Issue]] = {} ignore_packages = self.get_ignore_packages() if package.name in ignore_packages: logging.info( "Package %s is configured to be ignored by Statick.", package.name ) return issues, True plugin_context = PluginContext(args, self.resources, self.config) logging.info("---Discovery---") if not DiscoveryPlugin.file_command_exists(): logging.info( "file command isn't available, discovery plugins will be less effective" ) discovery_plugins = self.config.get_enabled_discovery_plugins(level) if not discovery_plugins: discovery_plugins = list(self.discovery_plugins) # Get timing information for finding files for discovery plugins. dummy_plugin = DiscoveryPlugin() plugin_start = time.time() dummy_plugin.find_files(package) duration = format(time.time() - plugin_start, ".4f") timing = Timing(package.name, "find files", "Discovery", duration) self.timings.append(timing) plugins_ran: List[Any] = [] for plugin_name in discovery_plugins: if plugin_name not in self.discovery_plugins: logging.error("Can't find specified discovery plugin %s!", plugin_name) return None, False plugin = self.discovery_plugins[plugin_name] dependencies = plugin.get_discovery_dependencies() for dependency_name in dependencies: dependency_plugin = self.discovery_plugins[dependency_name] if dependency_plugin.get_name() in plugins_ran: continue dependency_plugin.set_plugin_context(plugin_context) logging.info( "Running %s discovery plugin...", dependency_plugin.get_name() ) plugin_start = time.time() dependency_plugin.scan(package, level, self.exceptions) duration = format(time.time() - plugin_start, ".4f") timing = Timing( package.name, dependency_plugin.get_name(), "Discovery", duration ) self.timings.append(timing) logging.info("%s discovery plugin done.", dependency_plugin.get_name()) plugins_ran.append(dependency_plugin.get_name()) if plugin.get_name() not in plugins_ran: plugin.set_plugin_context(plugin_context) logging.info("Running %s discovery plugin...", plugin.get_name()) plugin_start = time.time() plugin.scan(package, level, self.exceptions) duration = format(time.time() - plugin_start, ".4f") timing = Timing(package.name, plugin.get_name(), "Discovery", duration) self.timings.append(timing) logging.info("%s discovery plugin done.", plugin.get_name()) plugins_ran.append(plugin.get_name()) logging.info("---Discovery---") logging.info("---Tools---") enabled_plugins = self.config.get_enabled_tool_plugins(level) if not enabled_plugins: enabled_plugins = list(self.tool_plugins) plugins_to_run = copy.copy(enabled_plugins) plugins_ran = [] plugin_dependencies: List[str] = [] while plugins_to_run: plugin_name = plugins_to_run[0] if plugin_name not in self.tool_plugins: logging.error("Can't find specified tool plugin %s!", plugin_name) return None, False if args.force_tool_list is not None: force_tool_list = args.force_tool_list.split(",") if ( plugin_name not in force_tool_list and plugin_name not in plugin_dependencies ): logging.info("Skipping plugin not in force list %s!", plugin_name) plugins_to_run.remove(plugin_name) continue plugin = self.tool_plugins[plugin_name] plugin.set_plugin_context(plugin_context) dependencies = plugin.get_tool_dependencies() dependencies_met = True for dependency_name in dependencies: if dependency_name not in plugins_ran: if dependency_name not in enabled_plugins: logging.error( "Plugin %s depends on plugin %s which isn't enabled!", plugin_name, dependency_name, ) return None, False plugin_dependencies.append(dependency_name) if dependency_name in plugins_to_run: plugins_to_run.remove(dependency_name) plugins_to_run.insert(0, dependency_name) dependencies_met = False if not dependencies_met: continue logging.info("Running %s tool plugin...", plugin.get_name()) plugin_start = time.time() tool_issues = plugin.scan(package, level) duration = format(time.time() - plugin_start, ".4f") timing = Timing(package.name, plugin.get_name(), "Tool", duration) self.timings.append(timing) if tool_issues is not None: issues[plugin_name] = tool_issues logging.info("%s tool plugin done.", plugin.get_name()) else: logging.error("%s tool plugin failed", plugin.get_name()) success = False plugins_to_run.remove(plugin_name) plugins_ran.append(plugin_name) logging.info("---Tools---") if self.exceptions is not None: issues = self.exceptions.filter_issues(package, issues) os.chdir(orig_path) logging.info("---Reporting---") reporting_plugins = self.config.get_enabled_reporting_plugins(level) if not reporting_plugins: if "print_to_console" in self.reporting_plugins: reporting_plugins = ["print_to_console"] else: reporting_plugins = list(self.reporting_plugins) for plugin_name in reporting_plugins: if plugin_name not in self.reporting_plugins: logging.error("Can't find specified reporting plugin %s!", plugin_name) return None, False plugin = self.reporting_plugins[plugin_name] plugin.set_plugin_context(plugin_context) logging.info("Running %s reporting plugin...", plugin.get_name()) plugin_start = time.time() plugin.report(package, issues, level) duration = format(time.time() - plugin_start, ".4f") timing = Timing(package.name, plugin.get_name(), "Reporting", duration) self.timings.append(timing) logging.info("%s reporting plugin done.", plugin.get_name()) logging.info("---Reporting---") if start_time is not None: duration = format(time.time() - start_time, ".4f") timing = Timing("Overall", "", "", duration) self.timings.append(timing) logging.info("Done!") return issues, success
[docs] def run_workspace( self, parsed_args: argparse.Namespace, start_time: Optional[float] = None ) -> Tuple[ Optional[Dict[str, List[Issue]]], bool ]: # pylint: disable=too-many-locals, too-many-branches, too-many-statements """Run statick on a workspace.""" if parsed_args.output_directory: out_dir = parsed_args.output_directory if not os.path.isdir(out_dir): try: os.mkdir(out_dir) except OSError as ex: logging.error( "Unable to create output directory at %s: %s", out_dir, ex ) return None, False ignore_packages = self.get_ignore_packages() ignore_files = ["AMENT_IGNORE", "CATKIN_IGNORE", "COLCON_IGNORE"] package_indicators = ["package.xml", "setup.py", "pyproject.toml"] packages = [] for root, dirs, files in os.walk(parsed_args.path): if any(item in files for item in ignore_files): dirs.clear() continue for sub_dir in dirs: full_dir = os.path.join(root, sub_dir) files = os.listdir(full_dir) if any(item in package_indicators for item in files) and not any( item in files for item in ignore_files ): if ignore_packages and sub_dir in ignore_packages: continue packages.append(Package(sub_dir, full_dir)) if parsed_args.packages_file is not None: packages_file_list = [] try: packages_file = os.path.abspath(parsed_args.packages_file) with open(packages_file, "r", encoding="utf8") as fname: packages_file_list = [ package.strip() for package in fname.readlines() if package.strip() and package[0] != "#" ] except OSError: logging.error("Packages file not found") return None, False packages = [ package for package in packages if package.name in packages_file_list ] if parsed_args.list_packages: for package in packages: logging.info( "%s: %s", package.name, self.get_level(package.path, parsed_args) ) return None, True count = 0 total_issues: List[Any] = [] num_packages = len(packages) mp_args = [] if multiprocessing.get_start_method() == "fork": logging.info("-- Scanning %d packages --", num_packages) for package in packages: count += 1 mp_args.append((parsed_args, count, package, num_packages)) with multiprocessing.Pool(parsed_args.max_procs) as pool: total_issues, all_timings = zip( # type: ignore *pool.starmap(self.scan_package, mp_args) ) for timings in all_timings: for timing in timings: self.timings.append(timing) else: logging.warning( "Statick's plugin manager does not currently support multiprocessing" " without UNIX's fork function. Falling back to a single process." ) logging.info("-- Scanning %d packages --", num_packages) for package in packages: count += 1 pkg_issues, pkg_timings = self.scan_package( parsed_args, count, package, num_packages ) total_issues.append(pkg_issues) for timing in pkg_timings: self.timings.append(timing) break logging.info("-- All packages run --") logging.info("-- overall report --") success = True issues: Dict[str, List[Issue]] = {} for issue in total_issues: if issue is not None: for key, value in list(issue.items()): if key in issues: issues[key] += value if value: success = False else: issues[key] = value if value: success = False enabled_reporting_plugins: List[str] = [] # Make a fake 'all' package for reporting dummy_all_package = Package("all_packages", parsed_args.path) level = self.get_level(dummy_all_package.path, parsed_args) if level is not None and self.config is not None: if not self.config or not self.config.has_level(level): logging.error("Can't find specified level %s in config!", level) else: enabled_reporting_plugins = self.config.get_enabled_reporting_plugins( level ) if not enabled_reporting_plugins: if "print_to_console" in self.reporting_plugins: enabled_reporting_plugins = ["print_to_console"] else: enabled_reporting_plugins = list(self.reporting_plugins) plugin_context = PluginContext(parsed_args, self.resources, self.config) # type: ignore plugin_context.args.output_directory = parsed_args.output_directory for plugin_name in enabled_reporting_plugins: if plugin_name not in self.reporting_plugins: logging.error("Can't find specified reporting plugin %s!", plugin_name) continue plugin = self.reporting_plugins[plugin_name] plugin.set_plugin_context(plugin_context) logging.info("Running %s reporting plugin...", plugin.get_name()) plugin.report(dummy_all_package, issues, level) logging.info("%s reporting plugin done.", plugin.get_name()) if start_time is not None: duration = format(time.time() - start_time, ".4f") timing = Timing("Overall", "", "", duration) self.timings.append(timing) return issues, success
[docs] def scan_package( self, parsed_args: argparse.Namespace, count: int, package: Package, num_packages: int, ) -> Tuple[Optional[Dict[str, List[Issue]]], List[Timing]]: """Scan each package in a separate process while buffering output.""" logger = logging.getLogger() old_handler = None if logger.handlers[0]: old_handler = logger.handlers[0] handler = MemoryHandler(10000, flushLevel=logging.ERROR, target=old_handler) logger.removeHandler(old_handler) logger.addHandler(handler) logging.info( "-- Scanning package %s (%d of %d) --", package.name, count, num_packages ) sio = io.StringIO() old_stdout = sys.stdout old_stderr = sys.stderr sys.stdout = sio sys.stderr = sio issues, dummy = self.run(package.path, parsed_args) timings = self.get_timings() sys.stdout = old_stdout sys.stderr = old_stderr logging.info(sio.getvalue()) if issues is not None: logging.info( "-- Done scanning package %s (%d of %d) --", package.name, count, num_packages, ) else: logging.error("Failed to run statick on package %s!", package.name) if old_handler is not None: handler.flush() logger.removeHandler(handler) logger.addHandler(old_handler) return issues, timings
[docs] @staticmethod def print_no_issues() -> None: """Print that no information about issues was found.""" logging.error( "Something went wrong, no information about issues." " Statick exiting with errors." )
[docs] @staticmethod def print_exit_status(status: bool) -> None: """Print Statick exit status.""" if status: logging.info("Statick exiting with success.") else: logging.error("Statick exiting with errors.")