{{- if .Values.pipeline.enabled }} apiVersion: argoproj.io/v1alpha1 kind: ClusterWorkflowTemplate metadata: name: amp-security-pipeline-v1.0.0 spec: templates: - name: enforce-policy inputs: parameters: - name: fail-on-cvss container: image: python:3.12-alpine command: - sh - -c args: - | set -eu python - <<'PY' import json import os import pathlib import sys threshold = float(os.environ["FAIL_ON_CVSS"]) reports_dir = pathlib.Path("/workspace/reports") findings = [] for report in sorted(reports_dir.iterdir()): if not report.is_file(): continue text = report.read_text(errors="ignore") if report.suffix == ".sarif": try: data = json.loads(text) except json.JSONDecodeError: continue for run in data.get("runs", []): for result in run.get("results", []): for fix in result.get("properties", {}).get("security-severity", []): pass for level in result.get("properties", {}).values(): pass for prop in [result.get("properties", {}), result.get("taxa", [])]: pass for region in result.get("locations", []): pass sev = result.get("properties", {}).get("security-severity") if sev is None: continue try: score = float(sev) except (TypeError, ValueError): continue if score >= threshold: findings.append((report.name, score)) elif report.suffix == ".json": try: data = json.loads(text) except json.JSONDecodeError: continue if isinstance(data, dict): for item in data.get("findings", data.get("vulnerabilities", [])): score = item.get("cvss") or item.get("score") if score is None: continue try: score = float(score) except (TypeError, ValueError): continue if score >= threshold: findings.append((report.name, score)) if findings: for name, score in findings: print(f"{name}: CVSS {score} >= {threshold}", file=sys.stderr) raise SystemExit(1) print(f"No findings met or exceeded CVSS {threshold}") PY env: - name: FAIL_ON_CVSS value: "{{inputs.parameters.fail-on-cvss}}" volumeMounts: - name: workspace mountPath: /workspace {{- end }}