Coverage for agentos/security/auditor.py: 30%
207 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""AgentOS Security Auditor — automated vulnerability scanning and code analysis.
3Audits dependencies and source patterns for common security issues.
4"""
6from __future__ import annotations
8import ast
9import hashlib
10import json
11import re
12import subprocess
13from dataclasses import dataclass, field
14from enum import Enum
15from pathlib import Path
16from typing import Optional
18# ── Severity ──────────────────────────────────────────────────────────────────
21class AuditSeverity(Enum):
22 """Severity level for security audit findings."""
23 CRITICAL = "critical"
24 HIGH = "high"
25 MEDIUM = "medium"
26 LOW = "low"
27 INFO = "info"
30# ── Data classes ──────────────────────────────────────────────────────────────
33@dataclass
34class AuditFinding:
35 """A single security finding from an audit scan.
37 Attributes:
38 id: Unique finding identifier.
39 category: Finding category (e.g., injection, hardcoded_secret).
40 severity: Severity level.
41 message: Human-readable description.
42 location: File path and line reference.
43 recommendation: Suggested remediation.
44 cve: Optional CVE identifier if known.
45 """
46 id: str
47 category: str
48 severity: AuditSeverity
49 message: str
50 location: str = ""
51 recommendation: str = ""
52 cve: Optional[str] = None
54 def to_dict(self) -> dict:
55 return {
56 "id": self.id,
57 "category": self.category,
58 "severity": self.severity.value,
59 "message": self.message,
60 "location": self.location,
61 "recommendation": self.recommendation,
62 "cve": self.cve,
63 }
66@dataclass
67class AuditReport:
68 """Aggregated report of all audit findings across scanned resources.
70 Attributes:
71 findings: List of individual findings.
72 scanned_files: Number of files scanned.
73 scanned_deps: Number of dependencies checked.
74 """
75 findings: list[AuditFinding] = field(default_factory=list)
76 scanned_files: int = 0
77 scanned_deps: int = 0
79 @property
80 def critical(self) -> int:
81 return sum(1 for f in self.findings if f.severity == AuditSeverity.CRITICAL)
83 @property
84 def high(self) -> int:
85 return sum(1 for f in self.findings if f.severity == AuditSeverity.HIGH)
87 @property
88 def medium(self) -> int:
89 return sum(1 for f in self.findings if f.severity == AuditSeverity.MEDIUM)
91 @property
92 def low(self) -> int:
93 return sum(1 for f in self.findings if f.severity == AuditSeverity.LOW)
95 def passed(self) -> bool:
96 return self.critical == 0 and self.high == 0
98 def summary(self) -> str:
99 return (
100 f"Audit: {self.critical}C / {self.high}H / {self.medium}M / {self.low}L "
101 f"across {self.scanned_files} files, {self.scanned_deps} deps — "
102 f"{'PASSED' if self.passed() else 'FAILED'}"
103 )
105 def to_dict(self) -> dict:
106 return {
107 "findings": [f.to_dict() for f in self.findings],
108 "summary": {
109 "critical": self.critical,
110 "high": self.high,
111 "medium": self.medium,
112 "low": self.low,
113 "passed": self.passed(),
114 "scanned_files": self.scanned_files,
115 "scanned_deps": self.scanned_deps,
116 },
117 }
119 def to_json(self) -> str:
120 import json
121 return json.dumps(self.to_dict(), indent=2, default=str)
123 def to_markdown(self) -> str:
124 lines = [
125 "# Security Audit Report",
126 "",
127 f"- **Scanned files**: {self.scanned_files}",
128 f"- **Scanned dependencies**: {self.scanned_deps}",
129 f"- **Result**: {'PASSED' if self.passed() else 'FAILED'}",
130 "",
131 "| Severity | Count |",
132 "|----------|-------|",
133 f"| CRITICAL | {self.critical} |",
134 f"| HIGH | {self.high} |",
135 f"| MEDIUM | {self.medium} |",
136 f"| LOW | {self.low} |",
137 "",
138 ]
139 if self.findings:
140 lines.append("## Findings")
141 lines.append("")
142 for f in self.findings:
143 lines.append(f"- **[{f.severity.value.upper()}]** `{f.id}` — {f.message}")
144 if f.recommendation:
145 lines.append(f" → {f.recommendation}")
146 return "\n".join(lines)
149# ── Built‑in checkers ────────────────────────────────────────────────────────
151# Known-vulnerable version patterns (illustrative)
152_VULN_PATTERNS: list[dict] = [
153 {"pkg": "django", "range": "<4.2.15", "cve": "CVE-2024-45230", "severity": "HIGH"},
154 {"pkg": "requests", "range": "<2.32.0", "cve": "CVE-2024-35195", "severity": "MEDIUM"},
155 {"pkg": "cryptography", "range": "<42.0.0", "cve": "CVE-2024-26130", "severity": "HIGH"},
156 {"pkg": "jinja2", "range": "<3.1.4", "cve": "CVE-2024-34064", "severity": "MEDIUM"},
157 {"pkg": "aiohttp", "range": "<3.9.4", "cve": "CVE-2024-30251", "severity": "HIGH"},
158]
160# Dangerous AST patterns
161_DANGEROUS_PATTERNS: list[dict] = [
162 {"name": "eval-use", "node": "Call", "attr": "func.id", "match": "eval", "severity": "CRITICAL",
163 "msg": "eval() detected — arbitrary code execution risk"},
164 {"name": "exec-use", "node": "Call", "attr": "func.id", "match": "exec", "severity": "CRITICAL",
165 "msg": "exec() detected — arbitrary code execution risk"},
166 {"name": "pickle-load", "node": "Call", "attr": "func.attr", "match": "loads",
167 "parent_attr": "func.value.id", "parent_match": "pickle", "severity": "HIGH",
168 "msg": "pickle.loads() on untrusted data may execute arbitrary code"},
169 {"name": "hardcoded-secret", "node": "Assign", "attr": "targets[0].id",
170 "match_re": r"(?i)(password|secret|api_key|token|access_key)\s*$", "severity": "HIGH",
171 "msg": "Potential hard-coded secret"},
172 {"name": "shell-true", "node": "Call", "attr": "keywords",
173 "match_expr": "subprocess.Popen(… shell=True) or os.system() — command injection risk",
174 "severity": "HIGH",
175 "msg": "shell=True detected — command injection risk when input is untrusted"},
176 {"name": "insecure-deserialization", "node": "Call", "attr": "func.attr", "match": "loads",
177 "parent_attr": "func.value.id", "parent_match": "yaml", "severity": "HIGH",
178 "msg": "yaml.load() without SafeLoader — arbitrary code execution risk"},
179 {"name": "md5-hash", "node": "Call", "attr": "func.attr", "match": "md5",
180 "parent_attr": "func.value.id", "parent_match": "hashlib", "severity": "LOW",
181 "msg": "MD5 is cryptographically broken; use SHA-256"},
182]
185# ── Dependency scanner ───────────────────────────────────────────────────────
188def _parse_requirements(content: str) -> list[tuple[str, str]]:
189 """Parse requirements.txt into (pkg, version_spec) pairs."""
190 deps: list[tuple[str, str]] = []
191 for line in content.splitlines():
192 line = line.strip()
193 if not line or line.startswith("#") or line.startswith("--"):
194 continue
195 # Normalise: requests==2.31.0 -> ('requests', '2.31.0')
196 m = re.match(r"^([a-zA-Z0-9_.-]+)\s*([><=!~]+\s*[\d.*]+(?:,\s*[><=!~]+\s*[\d.*]+)*)?", line)
197 if m:
198 pkg = m.group(1).lower()
199 ver = (m.group(2) or "").strip()
200 deps.append((pkg, ver))
201 return deps
204def _check_vuln_db(pkg: str, version_spec: str) -> list[AuditFinding]:
205 findings: list[AuditFinding] = []
206 for entry in _VULN_PATTERNS:
207 if entry["pkg"] != pkg:
208 continue
209 findings.append(
210 AuditFinding(
211 id=f"VULN-{entry['cve']}",
212 category="dependency",
213 severity=AuditSeverity(entry["severity"].lower()),
214 message=f"{pkg}{version_spec and ' ' + version_spec} is vulnerable — {entry['cve']}",
215 recommendation=f"Upgrade to {entry['range'].lstrip('<')}+",
216 cve=entry["cve"],
217 )
218 )
219 return findings
222def scan_dependencies(req_path: str | Path) -> AuditReport:
223 """Scan a requirements.txt or pyproject.toml for known-vulnerable dependencies."""
224 req_path = Path(req_path)
225 report = AuditReport()
227 if not req_path.exists():
228 report.findings.append(
229 AuditFinding(
230 id="DEP-001",
231 category="dependency",
232 severity=AuditSeverity.INFO,
233 message=f"Dependency file not found: {req_path}",
234 )
235 )
236 return report
238 content = req_path.read_text()
239 deps = _parse_requirements(content)
240 report.scanned_deps = len(deps)
242 for pkg, ver in deps:
243 report.findings.extend(_check_vuln_db(pkg, ver))
245 return report
248# ── Source scanner ────────────────────────────────────────────────────────────
251class _DangerousVisitor(ast.NodeVisitor):
252 """AST visitor that flags dangerous code patterns (exec, eval, subprocess, etc.)."""
253 def __init__(self) -> None:
254 self.findings: list[AuditFinding] = []
256 def _match(self, node: ast.AST, pattern: dict, lineno: int) -> Optional[AuditFinding]:
257 name = pattern["name"]
258 severity = AuditSeverity(pattern["severity"].lower())
260 if "match_re" in pattern:
261 attr_path = pattern["attr"]
262 try:
263 val = eval(f"node.{attr_path}", {"node": node})
264 except Exception:
265 return None
266 if isinstance(val, str) and re.search(pattern["match_re"], val):
267 return AuditFinding(
268 id=f"SRC-{name.upper()}",
269 category="source",
270 severity=severity,
271 message=pattern["msg"],
272 location=f"line {lineno}",
273 recommendation="Remove or replace with a safe alternative",
274 )
275 return None
277 if "match_expr" in pattern:
278 # Special-case shell=True
279 for kw in getattr(node, "keywords", []):
280 if kw.arg == "shell" and getattr(kw.value, "value", None) is True:
281 return AuditFinding(
282 id=f"SRC-{name.upper()}",
283 category="source",
284 severity=severity,
285 message=pattern["msg"],
286 location=f"line {lineno}",
287 recommendation="Avoid shell=True; use list args",
288 )
289 return None
291 # Standard attr match
292 attr_path = pattern["attr"]
293 match_val = pattern["match"]
294 parent_attr = pattern.get("parent_attr")
295 parent_match = pattern.get("parent_match")
297 try:
298 val = eval(f"node.{attr_path}", {"node": node})
299 except Exception:
300 return None
302 if parent_attr is not None:
303 try:
304 pval = eval(f"node.{parent_attr}", {"node": node})
305 except Exception:
306 return None
307 if pval == parent_match and val == match_val:
308 return AuditFinding(
309 id=f"SRC-{name.upper()}",
310 category="source",
311 severity=severity,
312 message=pattern["msg"],
313 location=f"line {lineno}",
314 recommendation="Remove or replace with a safe alternative",
315 )
316 elif isinstance(val, str) and val == match_val:
317 return AuditFinding(
318 id=f"SRC-{name.upper()}",
319 category="source",
320 severity=severity,
321 message=pattern["msg"],
322 location=f"line {lineno}",
323 recommendation="Remove or replace with a safe alternative",
324 )
325 return None
327 def visit_Call(self, node: ast.Call) -> None: # noqa: N802
328 for pat in _DANGEROUS_PATTERNS:
329 if pat["node"] == "Call":
330 finding = self._match(node, pat, node.lineno)
331 if finding:
332 self.findings.append(finding)
333 self.generic_visit(node)
335 def visit_Assign(self, node: ast.Assign) -> None: # noqa: N802
336 for pat in _DANGEROUS_PATTERNS:
337 if pat["node"] == "Assign":
338 finding = self._match(node, pat, node.lineno)
339 if finding:
340 self.findings.append(finding)
341 self.generic_visit(node)
344def scan_source(source_dir: str | Path) -> AuditReport:
345 """AST-based source code security scan."""
346 source_dir = Path(source_dir)
347 report = AuditReport()
348 py_files = list(source_dir.rglob("*.py"))
350 for fpath in py_files:
351 try:
352 tree = ast.parse(fpath.read_text())
353 except SyntaxError:
354 continue
355 visitor = _DangerousVisitor()
356 visitor.visit(tree)
357 report.findings.extend(visitor.findings)
358 report.scanned_files += 1
360 return report
363# ── Security Auditor class ────────────────────────────────────────────────────
366class SecurityAuditor:
367 """High-level security auditor that orchestrates dependency and source scanning."""
369 def __init__(self, req_path: Optional[str | Path] = None, source_dir: Optional[str | Path] = None):
370 self.req_path: Optional[Path] = Path(req_path) if req_path else None
371 self.source_dir: Optional[Path] = Path(source_dir) if source_dir else None
373 def scan_dependencies(self, req_path: Optional[str | Path] = None) -> AuditReport:
374 """Scan dependencies for known vulnerabilities."""
375 path = Path(req_path) if req_path else self.req_path
376 if not path or not path.exists():
377 return AuditReport()
378 return scan_dependencies(path)
380 def scan_source(self, paths: Optional[list[str | Path]] = None) -> AuditReport:
381 """AST-based source code security scan."""
382 if paths:
383 report = AuditReport()
384 for p in paths:
385 r = scan_source(p)
386 report.findings.extend(r.findings)
387 report.scanned_files += r.scanned_files
388 return report
389 if not self.source_dir:
390 return AuditReport()
391 return scan_source(self.source_dir)
393 def full_audit(self, source_dir: Optional[str | Path] = None, req_path: Optional[str | Path] = None) -> AuditReport:
394 """Run dependency + source audit and merge results."""
395 sd = source_dir or self.source_dir
396 rp = req_path or self.req_path
397 if not sd or not rp:
398 return AuditReport()
399 return full_audit(Path(sd), Path(rp))
402# ── Full audit (module-level) ─────────────────────────────────────────────────
405def full_audit(
406 source_dir: str | Path,
407 req_path: str | Path,
408) -> AuditReport:
409 """Run dependency + source audit and merge results."""
410 dep_report = scan_dependencies(req_path)
411 src_report = scan_source(source_dir)
413 merged = AuditReport(
414 findings=dep_report.findings + src_report.findings,
415 scanned_files=src_report.scanned_files,
416 scanned_deps=dep_report.scanned_deps,
417 )
418 return merged
421# ── Report export ─────────────────────────────────────────────────────────────
424def export_report(report: AuditReport, fmt: str = "json") -> str:
425 """Export audit report to JSON or Markdown."""
426 if fmt == "json":
427 return json.dumps(report.to_dict(), indent=2)
428 # Markdown
429 lines = [
430 "# Security Audit Report",
431 "",
432 f"**Summary**: {report.summary()}",
433 "",
434 "| Severity | Count |",
435 "|----------|-------|",
436 f"| Critical | {report.critical} |",
437 f"| High | {report.high} |",
438 f"| Medium | {report.medium} |",
439 f"| Low | {report.low} |",
440 "",
441 "## Findings",
442 "",
443 ]
444 for f in sorted(report.findings, key=lambda x: (4 - list(AuditSeverity).index(x.severity))):
445 lines.append(f"- **[{f.severity.value.upper()}]** {f.message} ")
446 if f.location:
447 lines.append(f" *Location*: {f.location}")
448 if f.recommendation:
449 lines.append(f" *Fix*: {f.recommendation}")
450 if f.cve:
451 lines.append(f" *CVE*: {f.cve}")
452 lines.append("")
454 return "\n".join(lines)