Source code for statick_tool.plugins.tool.hadolint

"""Apply hadolint tool and gather results."""

import argparse
import json
import logging
import subprocess
from typing import Optional

from statick_tool.issue import Issue
from statick_tool.package import Package
from statick_tool.tool_plugin import ToolPlugin


[docs] class HadolintToolPlugin(ToolPlugin): """Apply hadolint tool and gather results."""
[docs] def get_name(self) -> str: """Get name of tool. Returns: Name of the tool. """ return "hadolint"
[docs] def gather_args(self, args: argparse.Namespace) -> None: """Gather arguments. Args: args: Flags for this plugin will be added to these existing arguments. """ args.add_argument( "--hadolint-bin", dest="hadolint_bin", type=str, help="hadolint binary path", ) args.add_argument( "--hadolint-docker", dest="hadolint_docker", action="store_true", help="Use hadolint docker image instead of binary", )
[docs] def get_file_types(self) -> list[str]: """Return a list of file types the plugin can scan. Returns: List of file types. """ return ["dockerfile_src"]
[docs] def get_binary( # pylint: disable=unused-argument self, level: Optional[str] = None, package: Optional[Package] = None ) -> str: """Get tool binary name. Args: level: The analysis level. package: The package being analyzed. Returns: Name of the tool binary. """ binary = self.get_name() # If the user explicitly specifies a binary, let that override the default if self.plugin_context and self.plugin_context.args.hadolint_bin is not None: binary = self.plugin_context.args.hadolint_bin return binary
[docs] def get_version(self) -> str: """Figure out and return the version of the tool that's installed. Returns: Version of the tool or "Unknown" if not found. """ if ( self.plugin_context and self.plugin_context.args.hadolint_docker is not None and self.plugin_context.args.hadolint_docker ): return self.get_version_from_docker() version = super().get_version() if version in [ToolPlugin.TOOL_MISSING_STR, ToolPlugin.TOOL_UNKNOWN_STR]: version = self.get_version_from_docker() return version
# pylint: disable=too-many-locals
[docs] def process_files( self, package: Package, level: str, files: list[str], user_flags: list[str] ) -> Optional[list[str]]: """Run tool and gather output. Args: package: The package being analyzed. level: The analysis level. files: List of files to process. user_flags: List of user flags. Returns: List of output strings or None. """ tool_config = ".hadolint.yaml" user_config = None if self.plugin_context is not None: user_config = self.plugin_context.config.get_tool_config( self.get_name(), level, "config" ) if user_config is not None: tool_config = user_config config_file_path = None if self.plugin_context is not None: config_file_path = self.plugin_context.resources.get_file(tool_config) flags: list[str] = ["-f", "json", "--no-fail"] if "-f" in user_flags: idx = user_flags.index("-f") logging.warning( "Statick requires hadolint to output in json format, " "ignoring user provided format: %s", user_flags[idx + 1], ) user_flags.pop(idx) user_flags.pop(idx) flags += user_flags tool_bin = self.get_binary() total_output: list[str] = [] if ( self.plugin_context and self.plugin_context.args.hadolint_docker is not None and self.plugin_context.args.hadolint_docker and config_file_path is not None ): output = self.scan_docker(tool_bin, flags, files, config_file_path) else: if config_file_path is not None and config_file_path: flags += ["-c", config_file_path] output = self.scan_local_binary(tool_bin, flags, files) if output: total_output.append(output) else: return None for output in total_output: logging.debug("%s", output) return total_output
# pylint: enable=too-many-locals
[docs] def scan_local_binary( self, tool_bin: str, flags: list[str], files: list[str] ) -> Optional[str]: """Use locally installed hadolint binary to scan. Args: tool_bin: The tool binary. flags: List of flags. files: List of files to scan. Returns: Output string or None. """ try: exe = [tool_bin] + flags exe.extend(files) output = subprocess.check_output( exe, stderr=subprocess.STDOUT, universal_newlines=True ) return output except subprocess.CalledProcessError as ex: logging.warning("%s failed! Returncode = %d", tool_bin, ex.returncode) logging.warning("%s exception: %s", self.get_name(), ex.output) return None except OSError as ex: logging.warning("Couldn't find %s! (%s)", tool_bin, ex) return None
[docs] def scan_docker( self, tool_bin: str, flags: list[str], files: list[str], config_file_path: str ) -> Optional[str]: """Use hadolint docker image to scan. Args: tool_bin: The tool binary. flags: List of flags. files: List of files to scan. config_file_path: Path to the config file. Returns: Output string or None. """ try: json_dict = [] for src in files: exe = [ "docker", "run", "--rm", "-i", ] if config_file_path is not None and config_file_path: exe.extend( [ "-v", config_file_path + ":/.config/hadolint.yaml", ] ) exe.extend( [ "-v", src + ":/Dockerfile", "hadolint/hadolint", "hadolint", ] ) exe.extend(flags) exe.append("Dockerfile") output = subprocess.check_output( exe, stderr=subprocess.STDOUT, universal_newlines=True ) if output: output = output.replace( '"file":"Dockerfile"', '"file":"' + src + '"' ) try: file_dict = json.loads(output) for issue in file_dict: json_dict.append(issue) except json.decoder.JSONDecodeError as ex: logging.error("Failed to decode json from %s, %s", output, ex) return None return json.dumps(json_dict) except subprocess.CalledProcessError as ex: logging.warning("%s failed! Returncode = %d", tool_bin, ex.returncode) logging.warning("%s exception: %s", self.get_name(), ex.output) return None except OSError as ex: logging.warning("Couldn't find %s! (%s)", tool_bin, ex) return None
[docs] def parse_output( self, total_output: list[str], package: Optional[Package] = None ) -> list[Issue]: """Parse tool output and report issues. Args: total_output: List of output strings. package: The package being analyzed. Returns: List of issues. """ issues: list[Issue] = [] # pylint: disable=too-many-nested-blocks for output in total_output: for line in output.splitlines(): if line: try: err_arr = json.loads(line) for issue in err_arr: severity_str = issue["level"] severity = 1 if severity_str == "style": severity = 1 elif severity_str == "info": severity = 1 elif severity_str == "warning": severity = 3 elif severity_str == "error": severity = 5 issues.append( Issue( issue["file"], int(issue["line"]), self.get_name(), issue["code"], severity, issue["message"], None, ) ) except ValueError as ex: logging.warning("ValueError: %s, line: %s", ex, line) # pylint: enable=too-many-nested-blocks return issues