From d86b7dc75cb13e638ae7e39e931a5c1d66c87ecd Mon Sep 17 00:00:00 2001 From: Michael Long <31821088+bluesentinelsec@users.noreply.github.com> Date: Thu, 28 Mar 2024 10:48:34 -0400 Subject: [PATCH] default output paths include run_id (#18) Co-authored-by: Michael Long --- Makefile | 4 +- action.yml | 6 +- entrypoint/entrypoint/cli.py | 14 +- scripts/decode_action_output.py | 19 -- scripts/entrypoint/__init__.py | 0 scripts/entrypoint/action_args.py | 26 --- scripts/entrypoint/downloader.py | 17 -- scripts/entrypoint/entrypoint.py | 40 ---- scripts/entrypoint/extractor.py | 22 -- scripts/entrypoint/finder.py | 19 -- scripts/entrypoint/inspector_parser.py | 244 -------------------- scripts/entrypoint/log_formatter.py | 11 - scripts/entrypoint/orchestrator.py | 156 ------------- scripts/entrypoint/presenter.py | 2 - scripts/entrypoint/test_action_args.py | 18 -- scripts/entrypoint/test_downloader.py | 25 -- scripts/entrypoint/test_extractor.py | 32 --- scripts/entrypoint/test_finder.py | 37 --- scripts/entrypoint/test_inspector_parser.py | 69 ------ scripts/entrypoint/test_orchestrator.py | 44 ---- scripts/entrypoint/test_presenter.py | 0 scripts/entrypoint/test_thresholds.py | 16 -- scripts/entrypoint/thresholds.py | 194 ---------------- scripts/present_findings.py.txt | 155 ------------- 24 files changed, 11 insertions(+), 1159 deletions(-) delete mode 100755 scripts/decode_action_output.py delete mode 100644 scripts/entrypoint/__init__.py delete mode 100644 scripts/entrypoint/action_args.py delete mode 100644 scripts/entrypoint/downloader.py delete mode 100755 scripts/entrypoint/entrypoint.py delete mode 100644 scripts/entrypoint/extractor.py delete mode 100644 scripts/entrypoint/finder.py delete mode 100644 scripts/entrypoint/inspector_parser.py delete mode 100644 scripts/entrypoint/log_formatter.py delete mode 100644 scripts/entrypoint/orchestrator.py delete mode 100644 scripts/entrypoint/presenter.py delete mode 100644 scripts/entrypoint/test_action_args.py delete mode 100644 scripts/entrypoint/test_downloader.py delete mode 100644 scripts/entrypoint/test_extractor.py delete mode 100644 scripts/entrypoint/test_finder.py delete mode 100644 scripts/entrypoint/test_inspector_parser.py delete mode 100644 scripts/entrypoint/test_orchestrator.py delete mode 100644 scripts/entrypoint/test_presenter.py delete mode 100644 scripts/entrypoint/test_thresholds.py delete mode 100644 scripts/entrypoint/thresholds.py delete mode 100755 scripts/present_findings.py.txt diff --git a/Makefile b/Makefile index dd41d8d..fcc3b06 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ run: - docker build . -t plugin:latest - docker run -it plugin:latest --artifact-type container + docker build . -t inspector-action:latest + docker run -it inspector-action:latest test: cd entrypoint; python3 -m unittest discover -v -s ./ diff --git a/action.yml b/action.yml index b6081c6..94d4ae2 100644 --- a/action.yml +++ b/action.yml @@ -15,17 +15,17 @@ inputs: output_sbom_path: description: "The destination file path for the generated SBOM." required: False - default: '/tmp/sbom.json' + default: './sbom_${{ github.run_id }}.json' output_inspector_scan_path: description: "The destination file path for Inspector's vulnerability scan (JSON format)." required: False - default: '/tmp/inspector_scan.json' + default: 'inspector_scan_${{ github.run_id }}.json' output_inspector_scan_path_csv: description: "The destination file path for Inspector's vulnerability scan (CSV format)." required: False - default: '/tmp/inspector_scan.csv' + default: 'inspector_scan_${{ github.run_id }}.csv' sbomgen_version: diff --git a/entrypoint/entrypoint/cli.py b/entrypoint/entrypoint/cli.py index 4b8a0d2..28b46ea 100644 --- a/entrypoint/entrypoint/cli.py +++ b/entrypoint/entrypoint/cli.py @@ -9,20 +9,18 @@ def init(sys_argv=None) -> argparse.Namespace: """ program_description = "This program orchestrates the business logic for the Amazon Inspector GitHub Actions plugin." parser = argparse.ArgumentParser(description=program_description) - parser.add_argument('--artifact-type', type=str, + parser.add_argument('--artifact-type', type=str, default="repository", help='The artifact you would like to scan with Amazon Inspector. Valid choices are "repository", "container", "binary", or "archive".') - parser.add_argument("--artifact-path", type=str, + parser.add_argument("--artifact-path", type=str, default="./", help='The path to the artifact you would like to scan with Amazon Inspector. If scanning a container image, you must provide a value that follows the docker pull convention: "NAME[:TAG|@DIGEST]", for example, "alpine:latest", or a path to an image exported as tarball using "docker save".') - parser.add_argument("--out-sbom", type=str, help="The destination file path for the generated SBOM.") - parser.add_argument("--out-scan", type=str, + parser.add_argument("--out-sbom", type=str, default="/tmp/sbom.json", help="The destination file path for the generated SBOM.") + parser.add_argument("--out-scan", type=str, default="/tmp/scan.json", help="The destination file path for Inspector's vulnerability scan in JSON format.") - parser.add_argument("--out-scan-csv", type=str, + parser.add_argument("--out-scan-csv", type=str, default="/tmp/scan.csv", help="The destination file path for Inspector's vulnerability scan in CSV format.") parser.add_argument("--verbose", action="store_true", help="Enables verbose console logging.") - parser.add_argument("--sbomgen-version", type=str, + parser.add_argument("--sbomgen-version", type=str, default="latest", help="The inspector-sbomgen version you wish to use for SBOM generation.") - parser.add_argument("--sbomgen-args", nargs="+", - help="Any additional arguments you wish to provide to inspector-sbomgen. Download sbomgen and execute it with './inspector-sbomgen --help' to see available arguments. https://docs.aws.amazon.com/inspector/latest/user/sbom-generator.html") parser.add_argument("--thresholds", action="store_true", help='This will cause the program to fail with exit code 1 if vulnerability thresholds are exceeded.') parser.add_argument("--critical", type=int, default=0, diff --git a/scripts/decode_action_output.py b/scripts/decode_action_output.py deleted file mode 100755 index 889b79d..0000000 --- a/scripts/decode_action_output.py +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/env python3 - - -import base64 -import sys -import zlib - - -def main(): - encoded = sys.argv[1] - decoded = base64.b64decode(encoded) - decoded = zlib.decompress(decoded) - - print(decoded.decode('utf-8')) - - -if __name__ == "__main__": - main() - diff --git a/scripts/entrypoint/__init__.py b/scripts/entrypoint/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/scripts/entrypoint/action_args.py b/scripts/entrypoint/action_args.py deleted file mode 100644 index cf39011..0000000 --- a/scripts/entrypoint/action_args.py +++ /dev/null @@ -1,26 +0,0 @@ -import sys - -ARTIFACT_TYPE = "" -ARTIFACT_PATH = "" -OUTPUT_SBOM_PATH = "" -OUTPUT_INSPECTOR_SCAN_PATH = "" -THRESHOLDS_ENABLED = "" -CRITICAL_THRESHOLD = "" -HIGH_THRESHOLD = "" -MEDIUM_THRESHOLD = "" -LOW_THRESHOLD = "" - - -def init_cli_args(os_args): - global ARTIFACT_TYPE, ARTIFACT_PATH, OUTPUT_SBOM_PATH, OUTPUT_INSPECTOR_SCAN_PATH, THRESHOLDS_ENABLED, CRITICAL_THRESHOLD, HIGH_THRESHOLD, MEDIUM_THRESHOLD, LOW_THRESHOLD - ARTIFACT_TYPE = os_args[1] - ARTIFACT_PATH = os_args[2] - OUTPUT_SBOM_PATH = os_args[3] - OUTPUT_INSPECTOR_SCAN_PATH = os_args[4] - THRESHOLDS_ENABLED = os_args[5] - CRITICAL_THRESHOLD = int(os_args[6]) - HIGH_THRESHOLD = int(os_args[7]) - MEDIUM_THRESHOLD = int(os_args[8]) - LOW_THRESHOLD = int(os_args[9]) - - return diff --git a/scripts/entrypoint/downloader.py b/scripts/entrypoint/downloader.py deleted file mode 100644 index 960360d..0000000 --- a/scripts/entrypoint/downloader.py +++ /dev/null @@ -1,17 +0,0 @@ -import logging -import urllib.request - - -def download_file(url: str, dst: str) -> bool: - """ - Download the data at 'url' and write the data - to a filepath specified by 'dst'. - Returns true on success and false on failure. - """ - try: - urllib.request.urlretrieve(url, dst) - except Exception as e: - logging.error(e) - return False - - return True diff --git a/scripts/entrypoint/entrypoint.py b/scripts/entrypoint/entrypoint.py deleted file mode 100755 index e4c2517..0000000 --- a/scripts/entrypoint/entrypoint.py +++ /dev/null @@ -1,40 +0,0 @@ -#!/usr/bin/env python3 -import json -import logging -import sys - -import action_args -import inspector_parser -import log_formatter -import orchestrator -import thresholds - - -def main(): - action_args.init_cli_args(sys.argv) - sbomgen_path = "/usr/local/bin/inspector-sbomgen" - - orchestrator.install_sbomgen(sbomgen_path) - - orchestrator.execute_sbomgen(sbomgen_path) - - orchestrator.invoke_inspector_scan() - - thresholds.check_vuln_thresholds(action_args.OUTPUT_INSPECTOR_SCAN_PATH, - action_args.OUTPUT_SBOM_PATH, - action_args.THRESHOLDS_ENABLED, - action_args.CRITICAL_THRESHOLD, - action_args.HIGH_THRESHOLD, - action_args.MEDIUM_THRESHOLD, - action_args.LOW_THRESHOLD) - - -if __name__ == "__main__": - logger = logging.getLogger() - logger.setLevel(logging.INFO) - handler = logging.StreamHandler() - handler.setLevel(logging.INFO) - formatter = log_formatter.CustomFormatter() - handler.setFormatter(formatter) - logger.addHandler(handler) - main() diff --git a/scripts/entrypoint/extractor.py b/scripts/entrypoint/extractor.py deleted file mode 100644 index f6e2f90..0000000 --- a/scripts/entrypoint/extractor.py +++ /dev/null @@ -1,22 +0,0 @@ -import logging -import zipfile - - -def extract_zip_file(filepath: str, dst: str) -> bool: - """ - extract_zip_file extracts a zip archive, specified - by 'filepath'. The zip contents are written to the directory - specified with 'dst'. If the directory does not exist, - this function will create it. Returns True on success - or False on failure. - """ - try: - z = zipfile.ZipFile(filepath, "r") - z.extractall(dst) - z.close() - return True - - except Exception as e: - logging.error(e) - - return False diff --git a/scripts/entrypoint/finder.py b/scripts/entrypoint/finder.py deleted file mode 100644 index aa5c255..0000000 --- a/scripts/entrypoint/finder.py +++ /dev/null @@ -1,19 +0,0 @@ -import os - - -def find_file_in_dir(file_name: str, dir_to_search: str) -> str: - """ - find_file_in_dir takes the name of the file you - are looking for, 'name', and a directory to search - within, 'dir'. If the file is found, return the - absolute filepath to the subject file. If the - file is NOT found, return an empty string. This - function does not look for multiple instances - of the file in the subject directory. - """ - for root, dirs, files in os.walk(dir_to_search): - if file_name in files: - path = os.path.join(root, file_name) - return path - - return "" diff --git a/scripts/entrypoint/inspector_parser.py b/scripts/entrypoint/inspector_parser.py deleted file mode 100644 index c2d57a4..0000000 --- a/scripts/entrypoint/inspector_parser.py +++ /dev/null @@ -1,244 +0,0 @@ -import json -import logging -import sys -import urllib.parse -from tabulate import tabulate - - -class InspectorFinding: - def __init__(self): - self.package_names = [] - self.vulnerability_id = "" - self.cvss_severity = "" - self.cvss_score = "" - self.installed_versions = [] - self.fixed_versions = [] - self.analysis_state = "" - self.advisories = "" - - -def print_summarized_findings_as_table(summarized_findings): - sorted_findings = sorted(summarized_findings, key=lambda obj: obj.cvss_score, reverse=True) - - header = ["Vulnerability ID", "CVSSv3 Severity", "CVSSv3 Score", "Affected Packages", "Installed Versions"] - table = [] - for finding in sorted_findings: - name_cell = "\n".join(finding.package_names) - installed_ver_cell = "\n".join(finding.installed_versions) - - row = [finding.vulnerability_id, finding.cvss_severity, finding.cvss_score, name_cell, installed_ver_cell] - table.append(row) - print(tabulate(table, headers=header, stralign="left", numalign="left")) - return - - -def print_summarized_findings(summarized_findings): - sorted_findings = sorted(summarized_findings, key=lambda obj: obj.cvss_score, reverse=True) - for finding in sorted_findings: - print(f"Vulnerability ID: {finding.vulnerability_id}") - print(f"CVSSv3 Severity: {finding.cvss_severity}") - print(f"CVSSv3 Score: {finding.cvss_score}") - print(f"Analysis State: {finding.analysis_state}") - print(f"Affected Packages (Name, Installed Version, Fixed Version):") - pkg_table = [] - padding = " " - for pkg in finding.package_names: - row = [padding, pkg, "1.1.1", "1.2.3"] - pkg_table.append(row) - print(tabulate(pkg_table, tablefmt="plain")) - # print(f"Affected Versions: {finding.installed_versions}") - # print(f"Fixed Versions: {finding.fixed_versions}") - print(f"Advisories:") - i = 1 - for adv in finding.advisories: - print(f" {i}. {adv}") - i += 1 - print("................................................................") - - -def get_summarized_findings(inspector_scan_json): - findings = get_findings(inspector_scan_json) - vulnerabilities = get_vulnerabilities(findings) - if vulnerabilities is None: - return None - - summarized_findings = [] - for vuln in vulnerabilities: - finding = InspectorFinding() - finding.cvss_severity, finding.cvss_score = get_cvss_severity(vuln) - if finding.cvss_severity == None: - finding.cvss_severity = "unknown" - - finding.vulnerability_id = get_vuln_id(vuln) - finding.analysis_state = get_analysis_state(vuln) - - advisories = get_advisories(vuln) - if advisories is not None: - finding.advisories = advisories - - affected_components = get_affected_components(vuln) - for each_comp in affected_components: - component = get_component(findings, each_comp) - name, ver = get_component_name_version(component) - finding.package_names.append(name) - finding.installed_versions.append(ver) - - fixed_ver = get_fixed_version(vuln, each_comp) - if fixed_ver is None or fixed_ver == "unknown": - fixed_ver = "" - finding.fixed_versions.append(fixed_ver) - - summarized_findings.append(finding) - - return summarized_findings - - -def get_findings(inspector_scan_json): - """ - given a JSON formatted Inspector Scan-Sbom response, - extract all keys under the root key, "sbom" so that - we can begin parsing components and vulnerabilities. - Returns a dict on success, or None on failure. - """ - findings = inspector_scan_json.get("sbom") - if findings is None: - logging.warning("expected 'sbom' json object but it was not found") - return None - return findings - - -def get_vulnerabilities(findings_json): - """ - This function takes the output of 'inspector_parser.get_findings' - as its input. Given this input, it extracts the CycloneDX - vulnerabilities array, and returns it to the caller. - Returns None if vulnerabilities array is not present. - """ - vulnerabilities = findings_json.get("vulnerabilities") - if vulnerabilities is None: - return None - return vulnerabilities - - -def get_cvss_severity(vuln_json): - ratings = vuln_json.get("ratings") - if ratings is None: - return None, 0 - - severity = "" - score = 0 - for rating in ratings: - method = rating.get("method") - if method is None or method != "CVSSv31": - continue - - severity = rating.get("severity") - if severity is None: - severity = "unknown" - - score = rating.get("score") - if score is None: - score = 0 - - return severity, score - - -def get_vuln_id(vuln_json): - vuln_id = vuln_json.get("id") - if vuln_id is None: - logging.warning("expected value from key 'id' but received none") - sys.exit(1) - return vuln_id - - -def get_analysis_state(vuln_json): - analysis = vuln_json.get("analysis") - if analysis is None: - logging.warning("expected json object from key 'analysis' but received none") - sys.exit(1) - - analysis_state = analysis.get("state") - if analysis_state is None: - logging.warning("expected value from key 'state' but received none") - sys.exit(1) - - return analysis_state - - -def get_advisories(vuln_json): - advisories = vuln_json.get("advisories") - if advisories is None: - return None - - advisory_urls = [] - for adv in advisories: - url = adv["url"] - advisory_urls.append(url) - return advisory_urls - - -def get_affected_components(vuln_json): - affects = vuln_json.get("affects") - if affects is None: - logging.warning("expected value from key 'affects' but received none") - sys.exit(1) - - affected_components = [] - for each in affects: - component = each.get("ref") - if component is None: - logging.warning("expected value from key 'ref' but received none") - sys.exit(1) - affected_components.append(component) - - return affected_components - - -def get_component(findings_json, bom_ref): - components = findings_json.get("components") - if components is None: - logging.warning("expected value from key 'components' but received none") - sys.exit(1) - - for comp in components: - ref = comp.get("bom-ref") - if ref is None: - logging.warning("expected value from key 'bom-ref' but received none") - sys.exit(1) - - if bom_ref == ref: - return comp - - -def get_component_name_version(comp_json): - name = comp_json.get("name") - if name is None: - logging.warning("expected value from key 'name' but received none") - sys.exit(1) - name = urllib.parse.unquote(name) - version = comp_json.get("version") - if version == "": - version = "unknown" - version = urllib.parse.unquote(version) - return name, version - - -def get_fixed_version(vuln_json, bom_ref): - properties = vuln_json.get("properties") - if properties is None: - return "unknown" - - prop_namespace = "amazon:inspector:sbom_scanner:fixed_version:" - for prop in properties: - name = prop.get("name") - if not prop_namespace in name: - continue - - if not bom_ref in name: - continue - - fixed_version = prop.get("value") - if fixed_version == "" or fixed_version is None: - return "unknown" - fixed_version = urllib.parse.unquote(fixed_version) - return fixed_version diff --git a/scripts/entrypoint/log_formatter.py b/scripts/entrypoint/log_formatter.py deleted file mode 100644 index 5f551e1..0000000 --- a/scripts/entrypoint/log_formatter.py +++ /dev/null @@ -1,11 +0,0 @@ -import logging -import sys - -class CustomFormatter(logging.Formatter): - def format(self, record): - log_time = self.formatTime(record, "%Y-%m-%d %H:%M:%S") - log_level = record.levelname.lower() - log_msg = record.getMessage() - log_file = f'{record.filename}:{record.lineno}' - s = f'time="{log_time}" level={log_level} msg="{log_msg}" file="{log_file}"' - return s diff --git a/scripts/entrypoint/orchestrator.py b/scripts/entrypoint/orchestrator.py deleted file mode 100644 index 1212817..0000000 --- a/scripts/entrypoint/orchestrator.py +++ /dev/null @@ -1,156 +0,0 @@ -import base64 -import logging -import os -import platform -import shutil -import subprocess -import sys -import tempfile -import zlib - -import action_args -import downloader -import extractor -import finder - - -def get_sbomgen_url(os_name: str, cpu_arch: str) -> str: - if os_name != "Linux": - logging.error(f"expected OS to be Linux but received {os_name}") - return "" - - if cpu_arch == "x86_64": - return "https://amazon-inspector-sbomgen.s3.amazonaws.com/latest/linux/amd64/inspector-sbomgen.zip" - - elif cpu_arch == "arm64": - return "https://amazon-inspector-sbomgen.s3.amazonaws.com/latest/linux/arm64/inspector-sbomgen.zip" - - else: - logging.error(f"expected cpu architecture to be either 'x86_64' or 'arm64' but received {cpu_arch}") - return "" - - -def install_sbomgen(install_path: str) -> bool: - os_name = platform.system() - cpu_arch = platform.machine() - sbomgen_url = get_sbomgen_url(os_name, cpu_arch) - if sbomgen_url == "": - return False - - tmp_dir = tempfile.gettempdir() - zip_name = "inspector-sbomgen.zip" - path_to_sbomgen_zip = os.path.join(tmp_dir, zip_name) - - logging.info(f"downloading inspector-sbomgen from '{sbomgen_url}' to '{path_to_sbomgen_zip}'") - result = downloader.download_file(sbomgen_url, path_to_sbomgen_zip) - if not result: - return False - - path_to_extracted_sbomgen_dir = os.path.join(tempfile.gettempdir(), "inspector-sbomgen") - - logging.info(f"extracting inspector-sbomgen.zip to {path_to_extracted_sbomgen_dir}") - result = extractor.extract_zip_file(path_to_sbomgen_zip, path_to_extracted_sbomgen_dir) - if not result: - return False - - sbomgen_path = finder.find_file_in_dir("inspector-sbomgen", path_to_extracted_sbomgen_dir) - if sbomgen_path == "": - return False - - try: - shutil.move(sbomgen_path, install_path) - - os.chmod(install_path, 0o500) # read and execute permissions for owner - - logging.info("validating installation") - command = ["inspector-sbomgen", "--version"] - output = subprocess.run(command, capture_output=True, text=True) - if output.returncode != 0: - logging.error(output.stderr) - return False - else: - logging.info(f"installation succeeded for inspector-sbomgen v{output.stdout.strip()}") - - except Exception as e: - logging.error(e) - return False - - -def get_path_argument() -> str: - path_arg = "" - if action_args.ARTIFACT_TYPE.lower() == "repository": - action_args.ARTIFACT_TYPE = "directory" - return "--path" - - elif action_args.ARTIFACT_TYPE.lower() == "binary": - return "--path" - - elif action_args.ARTIFACT_TYPE.lower() == "archive": - return "--path" - - elif action_args.ARTIFACT_TYPE.lower() == "container": - return "--image" - - else: - logging.error( - f"expected artifact type to be 'repository', 'container', 'archive', or 'binary', but received {action_args.ARTIFACT_TYPE}") - - -def execute_sbomgen(sbomgen_path): - path_arg = get_path_argument() - - logging.info("executing inspector-sbomgen to create software bill of materials from provided artifact") - cmd = f"{sbomgen_path} {action_args.ARTIFACT_TYPE} {path_arg} {action_args.ARTIFACT_PATH} --disable-progress-bar -o {action_args.OUTPUT_SBOM_PATH}" - os.system(cmd) - logging.info(f"sbom written to {action_args.OUTPUT_SBOM_PATH}") - - set_readable(action_args.OUTPUT_SBOM_PATH) - - contents = compress_encode_file(action_args.OUTPUT_SBOM_PATH) - set_github_output("artifact_sbom", contents) - - -def invoke_inspector_scan(): - logging.info(f"sending {action_args.OUTPUT_SBOM_PATH} to Amazon Inspector for vulnerability scan") - cmd = f"aws inspector-scan scan-sbom --sbom file://{action_args.OUTPUT_SBOM_PATH} --output-format CYCLONE_DX_1_5 > {action_args.OUTPUT_INSPECTOR_SCAN_PATH}" - print(cmd) - ret = os.system(cmd) - if ret != 0: - logging.error(f"aws CLI command failed with error code: {ret}") - sys.exit(1) - logging.info(f"Inspector scan written to {action_args.OUTPUT_INSPECTOR_SCAN_PATH}") - - set_readable(action_args.OUTPUT_INSPECTOR_SCAN_PATH) - - contents = compress_encode_file(action_args.OUTPUT_INSPECTOR_SCAN_PATH) - set_github_output("inspector_scan_results", contents) - - -def set_readable(filepath): - # since our container runs as root, we need to - # grant read access to other users so the Uploader Action - # can successfully upload job artifacts - # parent_dir = os.path.dirname(filepath) - # os.system(f"chmod -R o+r {parent_dir}") - os.system(f"chmod o+r {filepath}") - - -def compress_encode_file(file): - contents = "" - with open(file) as f: - contents = f.read() - - compressed_contents = zlib.compress(contents.encode()) - encoded = base64.b64encode(compressed_contents).decode() - return encoded - - -def set_github_output(key_name, data): - # TODO: add some size checking - os.system(f'echo "{key_name}={data}" >> "$GITHUB_OUTPUT"') - - -""" -TODO: fix sbom and scan permissions so the uploader works correctly -TODO: figure out how to set and get Action outputs -""" diff --git a/scripts/entrypoint/presenter.py b/scripts/entrypoint/presenter.py deleted file mode 100644 index 2e7b0b8..0000000 --- a/scripts/entrypoint/presenter.py +++ /dev/null @@ -1,2 +0,0 @@ -from tabulate import tabulate - diff --git a/scripts/entrypoint/test_action_args.py b/scripts/entrypoint/test_action_args.py deleted file mode 100644 index 3e8d03f..0000000 --- a/scripts/entrypoint/test_action_args.py +++ /dev/null @@ -1,18 +0,0 @@ -import unittest - -import action_args - - -class TestActionArguments(unittest.TestCase): - - def test_init_cli_args(self): - os_args = ["entrypoint.py", "container", "alpine:latest", "/tmp/sbom.json", "/tmp/inspector_scan.json"] - action_args.init_cli_args(os_args) - self.assertEqual(action_args.ARTIFACT_TYPE, os_args[1]) - self.assertEqual(action_args.ARTIFACT_PATH, os_args[2]) - self.assertEqual(action_args.OUTPUT_SBOM_PATH, os_args[3]) - self.assertEqual(action_args.OUTPUT_INSPECTOR_SCAN_PATH, os_args[4]) - - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/entrypoint/test_downloader.py b/scripts/entrypoint/test_downloader.py deleted file mode 100644 index f0b97db..0000000 --- a/scripts/entrypoint/test_downloader.py +++ /dev/null @@ -1,25 +0,0 @@ -import os -import tempfile -import unittest - -import downloader - - -class TestDownloader(unittest.TestCase): - - def test_download_file(self): - # setup test inputs - urls = [ - "https://amazon-inspector-sbomgen.s3.amazonaws.com/latest/linux/amd64/inspector-sbomgen.zip", - "https://amazon-inspector-sbomgen.s3.amazonaws.com/latest/linux/arm64/inspector-sbomgen.zip" - ] - tmp_dir = tempfile.gettempdir() - dst = os.path.join(tmp_dir, "inspector-sbomgen.zip") - - for each_url in urls: - self.assertTrue(downloader.download_file(each_url, dst)) - os.remove(dst) - - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/entrypoint/test_extractor.py b/scripts/entrypoint/test_extractor.py deleted file mode 100644 index 25a44a3..0000000 --- a/scripts/entrypoint/test_extractor.py +++ /dev/null @@ -1,32 +0,0 @@ -import os -import shutil -import tempfile -import unittest - -import downloader -import extractor - - -class TestExtractor(unittest.TestCase): - - def test_extract_zip_file(self): - # setup - url = "https://amazon-inspector-sbomgen.s3.amazonaws.com/latest/linux/amd64/inspector-sbomgen.zip" - path_to_zip_file = os.path.join(tempfile.gettempdir(), "inspector-sbomgen.zip") - result = downloader.download_file(url, path_to_zip_file) - self.assertTrue(result) - - # test - tmp_dir = tempfile.gettempdir() - extracted_contents_dir = os.path.join(tmp_dir, "inspector-sbomgen") - result = extractor.extract_zip_file(path_to_zip_file, extracted_contents_dir) - self.assertTrue(result) - - # tear down - os.remove(path_to_zip_file) - shutil.rmtree(extracted_contents_dir) - return - - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/entrypoint/test_finder.py b/scripts/entrypoint/test_finder.py deleted file mode 100644 index c4abf53..0000000 --- a/scripts/entrypoint/test_finder.py +++ /dev/null @@ -1,37 +0,0 @@ -import os -import shutil -import tempfile -import unittest - -import downloader -import extractor -import finder - -class TestFinder(unittest.TestCase): - - def test_find_sbomgen(self): - # setup - url = "https://amazon-inspector-sbomgen.s3.amazonaws.com/latest/linux/amd64/inspector-sbomgen.zip" - path_to_zip_file = os.path.join(tempfile.gettempdir(), "inspector-sbomgen.zip") - result = downloader.download_file(url, path_to_zip_file) - self.assertTrue(result) - - tmp_dir = tempfile.gettempdir() - extracted_contents_dir = os.path.join(tmp_dir, "inspector-sbomgen") - result = extractor.extract_zip_file(path_to_zip_file, extracted_contents_dir) - self.assertTrue(result) - - # test - want = "inspector-sbomgen" - got = finder.find_file_in_dir(want, extracted_contents_dir) - self.assertTrue(got != "") - self.assertEqual(want, os.path.basename(got)) - - # tear down - os.remove(path_to_zip_file) - shutil.rmtree(extracted_contents_dir) - return - - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/entrypoint/test_inspector_parser.py b/scripts/entrypoint/test_inspector_parser.py deleted file mode 100644 index 4681a97..0000000 --- a/scripts/entrypoint/test_inspector_parser.py +++ /dev/null @@ -1,69 +0,0 @@ -import json -import logging -import os -import re -import unittest - -import inspector_parser - - -def get_inspector_json(file): - inspector_json = "" - with open(file, "r") as f: - inspector_json = json.load(f) - return inspector_json - - -class TestInspectorParser(unittest.TestCase): - - def test_get_summarized_findings(self): - test_files = self.get_inspector_scan_file_paths() - for file in test_files: - with open(file, "r") as f: - inspector_scan = json.load(f) - findings = inspector_parser.get_summarized_findings(inspector_scan) - if findings == None: - continue - - inspector_parser.print_summarized_findings_as_table(findings) - #inspector_parser.print_summarized_findings(findings) - - def get_inspector_scan_file_paths(self): - test_data = os.path.join("test_data", "inspector_scans") - files = os.listdir(test_data) - test_files = [] - for file in files: - file = os.path.join(test_data, file) - test_files.append(file) - return test_files - - def test_get_findings(self): - test_files = self.get_inspector_scan_file_paths() - for file in test_files: - with open(file, "r") as f: - inspector_scan = json.load(f) - findings = inspector_parser.get_findings(inspector_scan) - self.assertIsNotNone(findings) - self.assertEqual(findings["specVersion"], "1.5") - self.assertEqual(findings["bomFormat"], "CycloneDX") - - # every CycloneDX serial number must conform to this REGEX - # https://cyclonedx.org/docs/1.5/json/#serialNumber - serial_pattern = r'^urn:uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$' - self.assertTrue(bool(re.match(serial_pattern, findings["serialNumber"]))) - - def test_get_vulnerabilities(self): - test_files = self.get_inspector_scan_file_paths() - for file in test_files: - with open(file, "r") as f: - inspector_scan = json.load(f) - findings = inspector_parser.get_findings(inspector_scan) - vulns = inspector_parser.get_vulnerabilities(findings) - if vulns is not None: - for v in vulns: - self.assertTrue(len(v["id"]) > 0) - self.assertTrue(len(v["bom-ref"]) > 0) - - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/entrypoint/test_orchestrator.py b/scripts/entrypoint/test_orchestrator.py deleted file mode 100644 index cc4a3be..0000000 --- a/scripts/entrypoint/test_orchestrator.py +++ /dev/null @@ -1,44 +0,0 @@ -import os -import platform -import tempfile -import unittest - -import orchestrator - - -class TestOrchestrator(unittest.TestCase): - - def test_get_sbomgen_url(self): - cpu_arch = platform.machine() - os_name = platform.system() - url = orchestrator.get_sbomgen_url(os_name, cpu_arch) - - if os_name == "Linux" and cpu_arch == "x86_64": - expected = "https://amazon-inspector-sbomgen.s3.amazonaws.com/latest/linux/amd64/inspector-sbomgen.zip" - self.assertEqual(url, expected) - - elif os_name == "Linux" and cpu_arch == "x86_64": - expected = "https://amazon-inspector-sbomgen.s3.amazonaws.com/latest/linux/arm64/inspector-sbomgen.zip" - self.assertEqual(url, expected) - - else: - self.assertEqual(url, "") - - def test_install_sbomgen(self): - - os_name = platform.system() - cpu_arch = platform.machine() - - if os_name != "Linux": - return - - if cpu_arch != "x86_64" or cpu_arch != "arm64": - return - - dst = os.path.join(tempfile.tempdir(), "inspector-sbomgen") - result = orchestrator.install_sbomgen(dst) - self.assertTrue(result) - - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/entrypoint/test_presenter.py b/scripts/entrypoint/test_presenter.py deleted file mode 100644 index e69de29..0000000 diff --git a/scripts/entrypoint/test_thresholds.py b/scripts/entrypoint/test_thresholds.py deleted file mode 100644 index ea64398..0000000 --- a/scripts/entrypoint/test_thresholds.py +++ /dev/null @@ -1,16 +0,0 @@ -import json -import unittest - -import thresholds - - -class TestThresholds(unittest.TestCase): - - def test_print_vulnerabilities(self): - with open("test_data/scan.json", "r") as f: - data = json.load(f) - thresholds.print_vulnerabilities(data) - - -if __name__ == '__main__': - unittest.main() diff --git a/scripts/entrypoint/thresholds.py b/scripts/entrypoint/thresholds.py deleted file mode 100644 index b87f0a0..0000000 --- a/scripts/entrypoint/thresholds.py +++ /dev/null @@ -1,194 +0,0 @@ -import json -import logging -import sys - -from tabulate import tabulate - - -# stores the output that is presented to the user on program completion -class Output: - def __init__(self): - self.artifact_type = "" - self.artifact_name = "" - self.scan_serial = "" - self.scan_timestamp = "" - self.total_vulns_found = 0 - self.criticals_found = 0 - self.critical_threshold = 0 - self.highs_found = 0 - self.high_threshold = 0 - self.mediums_found = 0 - self.medium_threshold = 0 - self.lows_found = 0 - self.low_threshold = 0 - - def present_output(self): - separator = "-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=" - print(separator) - print("\tAmazon Inspector Finding Summary") - print(separator) - - print(f"Artifact Type: {self.artifact_type}") - print(f"Artifact Name: {self.artifact_name}") # TODO: add image ID for containers - print(f"Inspector Scan Serial Number: {self.scan_serial}") - print(f"Inspector Scan Timestamp: {self.scan_timestamp}") - # print(f"Total Vulnerabilities Found: {self.total_vulns_found}") - """ - print(f"Critical Vulnerabilities Found: {self.criticals_found}") - print(f"Critical Threshold: {self.critical_threshold}") - print(f"High Vulnerabilities Found: {self.highs_found}") - print(f"High Threshold: {self.high_threshold}") - print(f"Medium Vulnerabilities Found: {self.mediums_found}") - print(f"Medium Threshold: {self.medium_threshold}") - print(f"Low Vulnerabilities Found: {self.lows_found}") - print(f"Low Threshold: {self.low_threshold}") - """ - - # TODO: add support for "Info", "None", "Unknown" - header = ["Total", "Critical", "High", "Medium", "Low"] - table = [] - row = [self.total_vulns_found, self.criticals_found, self.highs_found, self.mediums_found, self.lows_found] - table.append(row) - print(tabulate(table, headers=header)) - print() - - -new_output = Output() - - -def parse_sbom(cyclonedx_sbom_json): - """ - given a cyclonedx sbom, this function will try to extract - the artifact's name, type, and other metadata. - """ - serial_number = cyclonedx_sbom_json.get('serialNumber') - if serial_number is None: - logging.warning("expected 'serialNumber' json object but it was not found") - return - new_output.scan_serial = serial_number - - metadata = cyclonedx_sbom_json.get('metadata') - if metadata is None: - logging.warning("expected 'metadata' json object but it was not found") - return - - timestamp = metadata.get('timestamp') - if timestamp is None: - logging.warning("expected 'timestamp' in metadata object but it was not found") - return - new_output.scan_timestamp = timestamp - - component = metadata.get('component') - if component is None: - logging.warning("expected 'component' json object but it was not found") - return - - new_output.artifact_type = component["type"] - new_output.artifact_name = component["name"] - - -def parse_inspector_findings(findings_json): - """ - given inspector scan-sbom findings in cyclonedx json, - this function extracts vulnerability counts by severity (critical/high/medium/low) - and stores the counts in the 'output' variable - """ - - # try to find the sbom->metadata->properties array - sbom = findings_json.get('sbom') - if sbom is None: - logging.error("expected root json object, 'sbom', but it was not found") - return - - metadata = sbom.get('metadata') - if metadata is None: - logging.error("expected 'metadata' json object but it was not found") - return - props = metadata.get('properties') - if props is None: - logging.error("expected 'properties' json object but it was not found") - return - - # iterate over the properties array and get vulnerability counts - total_vulns = 0 - for prop in props: - if 'critical' in prop['name']: - new_output.criticals_found = prop['value'] - total_vulns += int(prop['value']) - - elif 'high' in prop['name']: - new_output.highs_found = prop['value'] - total_vulns += int(prop['value']) - - elif 'medium' in prop['name']: - new_output.mediums_found = prop['value'] - total_vulns += int(prop['value']) - - elif 'low' in prop['name']: - new_output.lows_found = prop['value'] - total_vulns += int(prop['value']) - - else: - logging.warning("skipping unknown property:\n ", prop) - - new_output.total_vulns_found = total_vulns - - -def is_threshold_exceeded(threshold, vuln_count): - if int(threshold) == 0: - return False - - if int(vuln_count) >= int(threshold): - return True - - return False - - -def check_vuln_thresholds(inspector_scan_file: str, sbom_file: str, thresholds_enabled: str, critical_threshold: int, - high_threshold: int, medium_threshold: int, low_threshold: int): - # get artifact name and type from the sbom generated by inspector-sbomgen - sbom_json = "" - with open(sbom_file, "r") as f: - sbom_json = json.load(f) - parse_sbom(sbom_json) - - # get vulnerability counts by severity from Inspector scan-sbom json - findings_json = "" - with open(inspector_scan_file, "r") as f: - findings_json = json.load(f) - parse_inspector_findings(findings_json) - - # add vuln thresholds to output object - if critical_threshold > 0: - new_output.critical_threshold = critical_threshold - - if high_threshold > 0: - new_output.high_threshold = critical_threshold - - if medium_threshold > 0: - new_output.medium_threshold = critical_threshold - - if low_threshold > 0: - new_output.low_threshold = critical_threshold - - # display output to the user - new_output.present_output() - - thresholds_enabled = thresholds_enabled.lower().strip() - if thresholds_enabled != "true": - logging.info("thresholds disabled, exiting") - sys.exit(0) - - logging.info("checking if vulnerability thresholds are exceeded") - # map our thresholds to the number of vulnerabilities by severity - severity_mapping = {critical_threshold: new_output.criticals_found, - high_threshold: new_output.highs_found, - medium_threshold: new_output.mediums_found, - low_threshold: new_output.lows_found, - } - - # check if the vuln threshold is exceeded for each severity - for threshold, num_vulns in severity_mapping.items(): - if is_threshold_exceeded(threshold, num_vulns): - logging.warning(f"vulnerability count threshold exceeded - exiting with code 1") - sys.exit(1) diff --git a/scripts/present_findings.py.txt b/scripts/present_findings.py.txt deleted file mode 100755 index f708969..0000000 --- a/scripts/present_findings.py.txt +++ /dev/null @@ -1,155 +0,0 @@ -#!/usr/bin/env python3 - -import json -import logging -import sys - -# stores the output that is presented to the user on program completion -output = {} - - -def parse_sbom(cyclonedx_sbom_json): - """ - given a cyclonedx sbom, this function will try to extract - the artifact's name and type - """ - metadata = cyclonedx_sbom_json.get('metadata') - if metadata is None: - logging.warning("expected 'metadata' json object but it was not found") - return - - component = metadata.get('component') - if component is None: - logging.warning("expected 'component' json object but it was not found") - return - - output["artifact_type"] = component["type"] - output["artifact_name"] = component["name"] - - -def parse_inspector_findings(findings_json): - """ - given inspector scan-sbom findings in cyclonedx json, - this function extracts vulnerability counts by severity (critical/high/medium/low) - and stores the counts in the 'output' variable - """ - - # try to find the sbom->metadata->properties array - sbom = findings_json.get('sbom') - if sbom is None: - logging.error("expected root json object, 'sbom', but it was not found") - return - - metadata = sbom.get('metadata') - if metadata is None: - logging.error("expected 'metadata' json object but it was not found") - return - props = metadata.get('properties') - if props is None: - logging.error("expected 'properties' json object but it was not found") - return - - # iterate over the properties array and get vulnerability counts - total_vulns = 0 - for prop in props: - if 'critical' in prop['name']: - output['critical_vulnerabilities'] = prop['value'] - total_vulns += int(prop['value']) - - elif 'high' in prop['name']: - output['high_vulnerabilities'] = prop['value'] - total_vulns += int(prop['value']) - - elif 'medium' in prop['name']: - output['medium_vulnerabilities'] = prop['value'] - total_vulns += int(prop['value']) - - elif 'low' in prop['name']: - output['low_vulnerabilities'] = prop['value'] - total_vulns += int(prop['value']) - - else: - logging.warning("skipping unknown property:\n ", prop) - - output['total_vulnerabilities'] = total_vulns - - -def is_threshold_exceeded(threshold, vuln_count): - if int(threshold) == 0: - return False - - if int(vuln_count) >= int(threshold): - return True - - return False - - -def main(): - # cli args are provided from entrypoint.sh - inspector_scan_file = sys.argv[1] - sbom_file = sys.argv[2] - thresholds_enabled = sys.argv[3] - critical_threshold = int(sys.argv[4]) - high_threshold = int(sys.argv[5]) - medium_threshold = int(sys.argv[6]) - low_threshold = int(sys.argv[7]) - - # get artifact name and type from the sbom generated by inspector-sbomgen - sbom_json = "" - with open(sbom_file, "r") as f: - sbom_json = json.load(f) - parse_sbom(sbom_json) - - # get vulnerability counts by severity from Inspector scan-sbom json - findings_json = "" - with open(inspector_scan_file, "r") as f: - findings_json = json.load(f) - parse_inspector_findings(findings_json) - - # add vuln thresholds to output object - if critical_threshold > 0: - output["critical_threshold"] = critical_threshold - - if high_threshold > 0: - output["high_threshold"] = critical_threshold - - if medium_threshold > 0: - output["medium_threshold"] = critical_threshold - - if low_threshold > 0: - output["low_threshold"] = critical_threshold - - # display output to the user - logging.info(f"\n{json.dumps(output, indent=4)}") - - thresholds_enabled = thresholds_enabled.lower().strip() - if thresholds_enabled != "true": - sys.exit(0) - - # map our thresholds to the number of vulnerabilities by severity - severity_mapping = {critical_threshold: output['critical_vulnerabilities'], - high_threshold: output['high_vulnerabilities'], - medium_threshold: output['medium_vulnerabilities'], - low_threshold: output['low_vulnerabilities'], - } - - # check if the vuln threshold is exceeded for each severity - for threshold, num_vulns in severity_mapping.items(): - if is_threshold_exceeded(threshold, num_vulns): - logging.warning(f"vulnerability count threshold exceeded - exiting with code 1") - sys.exit(1) - - -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - main() - -""" -TODO: modify the output dict to be a class -use the class to control the order in which fields are printed to stdout - -TODO: pull all business logic out of main - -TODO: add tests for parsing functions in particular - -""" \ No newline at end of file