Coverage for agentos/security/auditor.py: 30%

207 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1"""AgentOS Security Auditor — automated vulnerability scanning and code analysis. 

2 

3Audits dependencies and source patterns for common security issues. 

4""" 

5 

6from __future__ import annotations 

7 

8import ast 

9import hashlib 

10import json 

11import re 

12import subprocess 

13from dataclasses import dataclass, field 

14from enum import Enum 

15from pathlib import Path 

16from typing import Optional 

17 

18# ── Severity ────────────────────────────────────────────────────────────────── 

19 

20 

21class AuditSeverity(Enum): 

22 """Severity level for security audit findings.""" 

23 CRITICAL = "critical" 

24 HIGH = "high" 

25 MEDIUM = "medium" 

26 LOW = "low" 

27 INFO = "info" 

28 

29 

30# ── Data classes ────────────────────────────────────────────────────────────── 

31 

32 

33@dataclass 

34class AuditFinding: 

35 """A single security finding from an audit scan. 

36 

37 Attributes: 

38 id: Unique finding identifier. 

39 category: Finding category (e.g., injection, hardcoded_secret). 

40 severity: Severity level. 

41 message: Human-readable description. 

42 location: File path and line reference. 

43 recommendation: Suggested remediation. 

44 cve: Optional CVE identifier if known. 

45 """ 

46 id: str 

47 category: str 

48 severity: AuditSeverity 

49 message: str 

50 location: str = "" 

51 recommendation: str = "" 

52 cve: Optional[str] = None 

53 

54 def to_dict(self) -> dict: 

55 return { 

56 "id": self.id, 

57 "category": self.category, 

58 "severity": self.severity.value, 

59 "message": self.message, 

60 "location": self.location, 

61 "recommendation": self.recommendation, 

62 "cve": self.cve, 

63 } 

64 

65 

66@dataclass 

67class AuditReport: 

68 """Aggregated report of all audit findings across scanned resources. 

69 

70 Attributes: 

71 findings: List of individual findings. 

72 scanned_files: Number of files scanned. 

73 scanned_deps: Number of dependencies checked. 

74 """ 

75 findings: list[AuditFinding] = field(default_factory=list) 

76 scanned_files: int = 0 

77 scanned_deps: int = 0 

78 

79 @property 

80 def critical(self) -> int: 

81 return sum(1 for f in self.findings if f.severity == AuditSeverity.CRITICAL) 

82 

83 @property 

84 def high(self) -> int: 

85 return sum(1 for f in self.findings if f.severity == AuditSeverity.HIGH) 

86 

87 @property 

88 def medium(self) -> int: 

89 return sum(1 for f in self.findings if f.severity == AuditSeverity.MEDIUM) 

90 

91 @property 

92 def low(self) -> int: 

93 return sum(1 for f in self.findings if f.severity == AuditSeverity.LOW) 

94 

95 def passed(self) -> bool: 

96 return self.critical == 0 and self.high == 0 

97 

98 def summary(self) -> str: 

99 return ( 

100 f"Audit: {self.critical}C / {self.high}H / {self.medium}M / {self.low}L " 

101 f"across {self.scanned_files} files, {self.scanned_deps} deps — " 

102 f"{'PASSED' if self.passed() else 'FAILED'}" 

103 ) 

104 

105 def to_dict(self) -> dict: 

106 return { 

107 "findings": [f.to_dict() for f in self.findings], 

108 "summary": { 

109 "critical": self.critical, 

110 "high": self.high, 

111 "medium": self.medium, 

112 "low": self.low, 

113 "passed": self.passed(), 

114 "scanned_files": self.scanned_files, 

115 "scanned_deps": self.scanned_deps, 

116 }, 

117 } 

118 

119 def to_json(self) -> str: 

120 import json 

121 return json.dumps(self.to_dict(), indent=2, default=str) 

122 

123 def to_markdown(self) -> str: 

124 lines = [ 

125 "# Security Audit Report", 

126 "", 

127 f"- **Scanned files**: {self.scanned_files}", 

128 f"- **Scanned dependencies**: {self.scanned_deps}", 

129 f"- **Result**: {'PASSED' if self.passed() else 'FAILED'}", 

130 "", 

131 "| Severity | Count |", 

132 "|----------|-------|", 

133 f"| CRITICAL | {self.critical} |", 

134 f"| HIGH | {self.high} |", 

135 f"| MEDIUM | {self.medium} |", 

136 f"| LOW | {self.low} |", 

137 "", 

138 ] 

139 if self.findings: 

140 lines.append("## Findings") 

141 lines.append("") 

142 for f in self.findings: 

143 lines.append(f"- **[{f.severity.value.upper()}]** `{f.id}` — {f.message}") 

144 if f.recommendation: 

145 lines.append(f" → {f.recommendation}") 

146 return "\n".join(lines) 

147 

148 

149# ── Built‑in checkers ──────────────────────────────────────────────────────── 

150 

151# Known-vulnerable version patterns (illustrative) 

152_VULN_PATTERNS: list[dict] = [ 

153 {"pkg": "django", "range": "<4.2.15", "cve": "CVE-2024-45230", "severity": "HIGH"}, 

154 {"pkg": "requests", "range": "<2.32.0", "cve": "CVE-2024-35195", "severity": "MEDIUM"}, 

155 {"pkg": "cryptography", "range": "<42.0.0", "cve": "CVE-2024-26130", "severity": "HIGH"}, 

156 {"pkg": "jinja2", "range": "<3.1.4", "cve": "CVE-2024-34064", "severity": "MEDIUM"}, 

157 {"pkg": "aiohttp", "range": "<3.9.4", "cve": "CVE-2024-30251", "severity": "HIGH"}, 

158] 

159 

160# Dangerous AST patterns 

161_DANGEROUS_PATTERNS: list[dict] = [ 

162 {"name": "eval-use", "node": "Call", "attr": "func.id", "match": "eval", "severity": "CRITICAL", 

163 "msg": "eval() detected — arbitrary code execution risk"}, 

164 {"name": "exec-use", "node": "Call", "attr": "func.id", "match": "exec", "severity": "CRITICAL", 

165 "msg": "exec() detected — arbitrary code execution risk"}, 

166 {"name": "pickle-load", "node": "Call", "attr": "func.attr", "match": "loads", 

167 "parent_attr": "func.value.id", "parent_match": "pickle", "severity": "HIGH", 

168 "msg": "pickle.loads() on untrusted data may execute arbitrary code"}, 

169 {"name": "hardcoded-secret", "node": "Assign", "attr": "targets[0].id", 

170 "match_re": r"(?i)(password|secret|api_key|token|access_key)\s*$", "severity": "HIGH", 

171 "msg": "Potential hard-coded secret"}, 

172 {"name": "shell-true", "node": "Call", "attr": "keywords", 

173 "match_expr": "subprocess.Popen(… shell=True) or os.system() — command injection risk", 

174 "severity": "HIGH", 

175 "msg": "shell=True detected — command injection risk when input is untrusted"}, 

176 {"name": "insecure-deserialization", "node": "Call", "attr": "func.attr", "match": "loads", 

177 "parent_attr": "func.value.id", "parent_match": "yaml", "severity": "HIGH", 

178 "msg": "yaml.load() without SafeLoader — arbitrary code execution risk"}, 

179 {"name": "md5-hash", "node": "Call", "attr": "func.attr", "match": "md5", 

180 "parent_attr": "func.value.id", "parent_match": "hashlib", "severity": "LOW", 

181 "msg": "MD5 is cryptographically broken; use SHA-256"}, 

182] 

183 

184 

185# ── Dependency scanner ─────────────────────────────────────────────────────── 

186 

187 

188def _parse_requirements(content: str) -> list[tuple[str, str]]: 

189 """Parse requirements.txt into (pkg, version_spec) pairs.""" 

190 deps: list[tuple[str, str]] = [] 

191 for line in content.splitlines(): 

192 line = line.strip() 

193 if not line or line.startswith("#") or line.startswith("--"): 

194 continue 

195 # Normalise: requests==2.31.0 -> ('requests', '2.31.0') 

196 m = re.match(r"^([a-zA-Z0-9_.-]+)\s*([><=!~]+\s*[\d.*]+(?:,\s*[><=!~]+\s*[\d.*]+)*)?", line) 

197 if m: 

198 pkg = m.group(1).lower() 

199 ver = (m.group(2) or "").strip() 

200 deps.append((pkg, ver)) 

201 return deps 

202 

203 

204def _check_vuln_db(pkg: str, version_spec: str) -> list[AuditFinding]: 

205 findings: list[AuditFinding] = [] 

206 for entry in _VULN_PATTERNS: 

207 if entry["pkg"] != pkg: 

208 continue 

209 findings.append( 

210 AuditFinding( 

211 id=f"VULN-{entry['cve']}", 

212 category="dependency", 

213 severity=AuditSeverity(entry["severity"].lower()), 

214 message=f"{pkg}{version_spec and ' ' + version_spec} is vulnerable — {entry['cve']}", 

215 recommendation=f"Upgrade to {entry['range'].lstrip('<')}+", 

216 cve=entry["cve"], 

217 ) 

218 ) 

219 return findings 

220 

221 

222def scan_dependencies(req_path: str | Path) -> AuditReport: 

223 """Scan a requirements.txt or pyproject.toml for known-vulnerable dependencies.""" 

224 req_path = Path(req_path) 

225 report = AuditReport() 

226 

227 if not req_path.exists(): 

228 report.findings.append( 

229 AuditFinding( 

230 id="DEP-001", 

231 category="dependency", 

232 severity=AuditSeverity.INFO, 

233 message=f"Dependency file not found: {req_path}", 

234 ) 

235 ) 

236 return report 

237 

238 content = req_path.read_text() 

239 deps = _parse_requirements(content) 

240 report.scanned_deps = len(deps) 

241 

242 for pkg, ver in deps: 

243 report.findings.extend(_check_vuln_db(pkg, ver)) 

244 

245 return report 

246 

247 

248# ── Source scanner ──────────────────────────────────────────────────────────── 

249 

250 

251class _DangerousVisitor(ast.NodeVisitor): 

252 """AST visitor that flags dangerous code patterns (exec, eval, subprocess, etc.).""" 

253 def __init__(self) -> None: 

254 self.findings: list[AuditFinding] = [] 

255 

256 def _match(self, node: ast.AST, pattern: dict, lineno: int) -> Optional[AuditFinding]: 

257 name = pattern["name"] 

258 severity = AuditSeverity(pattern["severity"].lower()) 

259 

260 if "match_re" in pattern: 

261 attr_path = pattern["attr"] 

262 try: 

263 val = eval(f"node.{attr_path}", {"node": node}) 

264 except Exception: 

265 return None 

266 if isinstance(val, str) and re.search(pattern["match_re"], val): 

267 return AuditFinding( 

268 id=f"SRC-{name.upper()}", 

269 category="source", 

270 severity=severity, 

271 message=pattern["msg"], 

272 location=f"line {lineno}", 

273 recommendation="Remove or replace with a safe alternative", 

274 ) 

275 return None 

276 

277 if "match_expr" in pattern: 

278 # Special-case shell=True 

279 for kw in getattr(node, "keywords", []): 

280 if kw.arg == "shell" and getattr(kw.value, "value", None) is True: 

281 return AuditFinding( 

282 id=f"SRC-{name.upper()}", 

283 category="source", 

284 severity=severity, 

285 message=pattern["msg"], 

286 location=f"line {lineno}", 

287 recommendation="Avoid shell=True; use list args", 

288 ) 

289 return None 

290 

291 # Standard attr match 

292 attr_path = pattern["attr"] 

293 match_val = pattern["match"] 

294 parent_attr = pattern.get("parent_attr") 

295 parent_match = pattern.get("parent_match") 

296 

297 try: 

298 val = eval(f"node.{attr_path}", {"node": node}) 

299 except Exception: 

300 return None 

301 

302 if parent_attr is not None: 

303 try: 

304 pval = eval(f"node.{parent_attr}", {"node": node}) 

305 except Exception: 

306 return None 

307 if pval == parent_match and val == match_val: 

308 return AuditFinding( 

309 id=f"SRC-{name.upper()}", 

310 category="source", 

311 severity=severity, 

312 message=pattern["msg"], 

313 location=f"line {lineno}", 

314 recommendation="Remove or replace with a safe alternative", 

315 ) 

316 elif isinstance(val, str) and val == match_val: 

317 return AuditFinding( 

318 id=f"SRC-{name.upper()}", 

319 category="source", 

320 severity=severity, 

321 message=pattern["msg"], 

322 location=f"line {lineno}", 

323 recommendation="Remove or replace with a safe alternative", 

324 ) 

325 return None 

326 

327 def visit_Call(self, node: ast.Call) -> None: # noqa: N802 

328 for pat in _DANGEROUS_PATTERNS: 

329 if pat["node"] == "Call": 

330 finding = self._match(node, pat, node.lineno) 

331 if finding: 

332 self.findings.append(finding) 

333 self.generic_visit(node) 

334 

335 def visit_Assign(self, node: ast.Assign) -> None: # noqa: N802 

336 for pat in _DANGEROUS_PATTERNS: 

337 if pat["node"] == "Assign": 

338 finding = self._match(node, pat, node.lineno) 

339 if finding: 

340 self.findings.append(finding) 

341 self.generic_visit(node) 

342 

343 

344def scan_source(source_dir: str | Path) -> AuditReport: 

345 """AST-based source code security scan.""" 

346 source_dir = Path(source_dir) 

347 report = AuditReport() 

348 py_files = list(source_dir.rglob("*.py")) 

349 

350 for fpath in py_files: 

351 try: 

352 tree = ast.parse(fpath.read_text()) 

353 except SyntaxError: 

354 continue 

355 visitor = _DangerousVisitor() 

356 visitor.visit(tree) 

357 report.findings.extend(visitor.findings) 

358 report.scanned_files += 1 

359 

360 return report 

361 

362 

363# ── Security Auditor class ──────────────────────────────────────────────────── 

364 

365 

366class SecurityAuditor: 

367 """High-level security auditor that orchestrates dependency and source scanning.""" 

368 

369 def __init__(self, req_path: Optional[str | Path] = None, source_dir: Optional[str | Path] = None): 

370 self.req_path: Optional[Path] = Path(req_path) if req_path else None 

371 self.source_dir: Optional[Path] = Path(source_dir) if source_dir else None 

372 

373 def scan_dependencies(self, req_path: Optional[str | Path] = None) -> AuditReport: 

374 """Scan dependencies for known vulnerabilities.""" 

375 path = Path(req_path) if req_path else self.req_path 

376 if not path or not path.exists(): 

377 return AuditReport() 

378 return scan_dependencies(path) 

379 

380 def scan_source(self, paths: Optional[list[str | Path]] = None) -> AuditReport: 

381 """AST-based source code security scan.""" 

382 if paths: 

383 report = AuditReport() 

384 for p in paths: 

385 r = scan_source(p) 

386 report.findings.extend(r.findings) 

387 report.scanned_files += r.scanned_files 

388 return report 

389 if not self.source_dir: 

390 return AuditReport() 

391 return scan_source(self.source_dir) 

392 

393 def full_audit(self, source_dir: Optional[str | Path] = None, req_path: Optional[str | Path] = None) -> AuditReport: 

394 """Run dependency + source audit and merge results.""" 

395 sd = source_dir or self.source_dir 

396 rp = req_path or self.req_path 

397 if not sd or not rp: 

398 return AuditReport() 

399 return full_audit(Path(sd), Path(rp)) 

400 

401 

402# ── Full audit (module-level) ───────────────────────────────────────────────── 

403 

404 

405def full_audit( 

406 source_dir: str | Path, 

407 req_path: str | Path, 

408) -> AuditReport: 

409 """Run dependency + source audit and merge results.""" 

410 dep_report = scan_dependencies(req_path) 

411 src_report = scan_source(source_dir) 

412 

413 merged = AuditReport( 

414 findings=dep_report.findings + src_report.findings, 

415 scanned_files=src_report.scanned_files, 

416 scanned_deps=dep_report.scanned_deps, 

417 ) 

418 return merged 

419 

420 

421# ── Report export ───────────────────────────────────────────────────────────── 

422 

423 

424def export_report(report: AuditReport, fmt: str = "json") -> str: 

425 """Export audit report to JSON or Markdown.""" 

426 if fmt == "json": 

427 return json.dumps(report.to_dict(), indent=2) 

428 # Markdown 

429 lines = [ 

430 "# Security Audit Report", 

431 "", 

432 f"**Summary**: {report.summary()}", 

433 "", 

434 "| Severity | Count |", 

435 "|----------|-------|", 

436 f"| Critical | {report.critical} |", 

437 f"| High | {report.high} |", 

438 f"| Medium | {report.medium} |", 

439 f"| Low | {report.low} |", 

440 "", 

441 "## Findings", 

442 "", 

443 ] 

444 for f in sorted(report.findings, key=lambda x: (4 - list(AuditSeverity).index(x.severity))): 

445 lines.append(f"- **[{f.severity.value.upper()}]** {f.message} ") 

446 if f.location: 

447 lines.append(f" *Location*: {f.location}") 

448 if f.recommendation: 

449 lines.append(f" *Fix*: {f.recommendation}") 

450 if f.cve: 

451 lines.append(f" *CVE*: {f.cve}") 

452 lines.append("") 

453 

454 return "\n".join(lines)