Source code for statick_tool.plugins.tool.shellcheck
"""Apply shellcheck tool and gather results.
The output from the tool is collected in JSON format to facilitate parsing.
"""
import argparse
import json
import logging
import subprocess
from typing import Any, Optional
from statick_tool.issue import Issue
from statick_tool.package import Package
from statick_tool.tool_plugin import ToolPlugin
[docs]
class ShellcheckToolPlugin(ToolPlugin):
"""Apply shellcheck tool and gather results."""
[docs]
def get_name(self) -> str:
"""Get name of tool.
Returns:
The name of the tool.
"""
return "shellcheck"
[docs]
def gather_args(self, args: argparse.Namespace) -> None:
"""Gather arguments.
Args:
args: Flags for this plugin will be added to these existing arguments.
"""
args.add_argument(
"--shellcheck-bin",
dest="shellcheck_bin",
type=str,
help="shellcheck binary path",
)
[docs]
def get_binary( # pylint: disable=unused-argument
self, level: Optional[str] = None, package: Optional[Package] = None
) -> str:
"""Get tool binary name.
Args:
level: The level of the scan.
package: The package to scan.
Returns:
The binary name of the tool.
"""
binary = self.get_name()
if self.plugin_context and self.plugin_context.args.shellcheck_bin is not None:
binary = self.plugin_context.args.shellcheck_bin
return binary
[docs]
def scan(self, package: Package, level: str) -> Optional[list[Issue]]:
"""Run tool and gather output.
Args:
package: The package to scan.
level: The level of the scan.
Returns:
A list of issues found by the tool.
"""
if "shell_src" not in package or not package["shell_src"]:
return []
shellcheck_bin = self.get_binary()
# Get output in JSON format.
flags: list[str] = ["-f", "json"]
flags += self.get_user_flags(level)
files: list[str] = []
if "shell_src" in package:
files += package["shell_src"]
try:
subproc_args = [shellcheck_bin] + flags + files
output = subprocess.check_output(
subproc_args, stderr=subprocess.STDOUT, universal_newlines=True
)
# We expect a CalledProcessError if issues are discovered by the tool.
except subprocess.CalledProcessError as ex:
output = ex.output
if ex.returncode != 1:
logging.warning("shellcheck failed! Returncode = %d", ex.returncode)
logging.warning("%s exception: %s", self.get_name(), ex.output)
return None
except OSError as ex:
logging.warning("Couldn't find %s! (%s)", shellcheck_bin, ex)
return None
logging.debug("%s", output)
if self.plugin_context and self.plugin_context.args.output_directory:
with open(self.get_name() + ".log", "w", encoding="utf8") as fid:
fid.write(output)
issues: list[Issue] = self.parse_json_output(json.loads(output))
return issues
[docs]
def parse_json_output(self, output: Any) -> list[Issue]:
"""Parse tool output and report issues.
Args:
output: The JSON output from the tool.
Returns:
A list of issues parsed from the output.
"""
issues: list[Issue] = []
for item in output:
if (
"level" not in item
or "file" not in item
or "line" not in item
or "code" not in item
or "message" not in item
):
logging.debug(" Found invalid shellcheck output: %s", item)
continue
if item["level"] == "style":
severity = 1
elif item["level"] == "info":
severity = 1
elif item["level"] == "warning":
severity = 3
elif item["level"] == "error":
severity = 5
else:
severity = 3
issue = Issue(
item["file"],
int(item["line"]),
self.get_name(),
"SC" + str(item["code"]),
severity,
item["message"],
None,
)
issues.append(issue)
return issues