all phases complete!
This commit is contained in:
@@ -0,0 +1,88 @@
|
||||
{{- if .Values.pipeline.enabled }}
|
||||
apiVersion: argoproj.io/v1alpha1
|
||||
kind: ClusterWorkflowTemplate
|
||||
metadata:
|
||||
name: amp-security-pipeline-v1.0.0
|
||||
spec:
|
||||
templates:
|
||||
- name: enforce-policy
|
||||
inputs:
|
||||
parameters:
|
||||
- name: fail-on-cvss
|
||||
container:
|
||||
image: python:3.12-alpine
|
||||
command:
|
||||
- sh
|
||||
- -c
|
||||
args:
|
||||
- |
|
||||
set -eu
|
||||
python - <<'PY'
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
import sys
|
||||
|
||||
threshold = float(os.environ["FAIL_ON_CVSS"])
|
||||
reports_dir = pathlib.Path("/workspace/reports")
|
||||
findings = []
|
||||
|
||||
for report in sorted(reports_dir.iterdir()):
|
||||
if not report.is_file():
|
||||
continue
|
||||
text = report.read_text(errors="ignore")
|
||||
if report.suffix == ".sarif":
|
||||
try:
|
||||
data = json.loads(text)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
for run in data.get("runs", []):
|
||||
for result in run.get("results", []):
|
||||
for fix in result.get("properties", {}).get("security-severity", []):
|
||||
pass
|
||||
for level in result.get("properties", {}).values():
|
||||
pass
|
||||
for prop in [result.get("properties", {}), result.get("taxa", [])]:
|
||||
pass
|
||||
for region in result.get("locations", []):
|
||||
pass
|
||||
sev = result.get("properties", {}).get("security-severity")
|
||||
if sev is None:
|
||||
continue
|
||||
try:
|
||||
score = float(sev)
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
if score >= threshold:
|
||||
findings.append((report.name, score))
|
||||
elif report.suffix == ".json":
|
||||
try:
|
||||
data = json.loads(text)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
if isinstance(data, dict):
|
||||
for item in data.get("findings", data.get("vulnerabilities", [])):
|
||||
score = item.get("cvss") or item.get("score")
|
||||
if score is None:
|
||||
continue
|
||||
try:
|
||||
score = float(score)
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
if score >= threshold:
|
||||
findings.append((report.name, score))
|
||||
|
||||
if findings:
|
||||
for name, score in findings:
|
||||
print(f"{name}: CVSS {score} >= {threshold}", file=sys.stderr)
|
||||
raise SystemExit(1)
|
||||
|
||||
print(f"No findings met or exceeded CVSS {threshold}")
|
||||
PY
|
||||
env:
|
||||
- name: FAIL_ON_CVSS
|
||||
value: "{{inputs.parameters.fail-on-cvss}}"
|
||||
volumeMounts:
|
||||
- name: workspace
|
||||
mountPath: /workspace
|
||||
{{- end }}
|
||||
Reference in New Issue
Block a user