Coverage for src \ sec_report_kit \ mcp \ server.py: 100%
74 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-13 08:06 +0530
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-13 08:06 +0530
1from __future__ import annotations
3import json
4from pathlib import Path
5from typing import Any
7from sec_report_kit.parsers import detect_source_type
8from sec_report_kit.parsers.bandit import parse_bandit_json
9from sec_report_kit.parsers.checkov import parse_checkov_json
10from sec_report_kit.parsers.codeql import parse_codeql_json
11from sec_report_kit.parsers.gitleaks import parse_gitleaks_json
12from sec_report_kit.parsers.osv_scanner import parse_osv_scanner_json
13from sec_report_kit.parsers.pip_audit import parse_pip_audit_json
14from sec_report_kit.parsers.semgrep import parse_semgrep_json
15from sec_report_kit.parsers.tfsec import parse_tfsec_json
16from sec_report_kit.parsers.trivy import parse_trivy_json
17from sec_report_kit.parsers.trufflehog import parse_trufflehog_json
18from sec_report_kit.report.html_renderer import render_html_report
19from sec_report_kit.services.summarize import count_by_severity, sort_findings
22def _load_payload(source_type: str, input_path: str):
23 raw = Path(input_path).read_text(encoding="utf-8-sig")
24 try:
25 data = json.loads(raw)
26 except json.JSONDecodeError:
27 data = [json.loads(line) for line in raw.splitlines() if line.strip()]
28 if source_type == "auto":
29 source_type = detect_source_type(data)
30 if source_type == "trivy":
31 return sort_findings(parse_trivy_json(data))
32 if source_type == "pip-audit":
33 return sort_findings(parse_pip_audit_json(data))
34 if source_type == "bandit":
35 return sort_findings(parse_bandit_json(data))
36 if source_type == "gitleaks":
37 return sort_findings(parse_gitleaks_json(data))
38 if source_type == "semgrep":
39 return sort_findings(parse_semgrep_json(data))
40 if source_type == "codeql":
41 return sort_findings(parse_codeql_json(data))
42 if source_type == "osv-scanner":
43 return sort_findings(parse_osv_scanner_json(data))
44 if source_type == "checkov":
45 return sort_findings(parse_checkov_json(data))
46 if source_type == "tfsec":
47 return sort_findings(parse_tfsec_json(data))
48 if source_type == "trufflehog":
49 return sort_findings(parse_trufflehog_json(data))
50 raise ValueError(
51 "source_type must be one of: 'trivy', 'pip-audit', 'bandit', 'gitleaks', "
52 "'semgrep', 'codeql', 'osv-scanner', 'checkov', 'tfsec', 'trufflehog', or 'auto'"
53 )
56def build_server():
57 try:
58 from mcp.server.fastmcp import FastMCP
59 except Exception as exc: # pragma: no cover
60 raise RuntimeError(
61 "MCP dependencies are not installed. Install with: pip install sec-report-kit[mcp]"
62 ) from exc
64 mcp = FastMCP("sec-report-kit")
66 @mcp.tool()
67 def summarize_json(source_type: str, input_path: str) -> dict:
68 findings = _load_payload(source_type=source_type, input_path=input_path)
69 counts = count_by_severity(findings)
70 return {"total": len(findings), "counts": counts}
72 @mcp.tool()
73 def render_report_from_json(source_type: str, input_path: str, output_path: str, target: str) -> dict:
74 findings = _load_payload(source_type=source_type, input_path=input_path)
75 counts = count_by_severity(findings)
76 html = render_html_report(target_ref=target, source_label=source_type, findings=findings, counts=counts)
77 out = Path(output_path)
78 out.parent.mkdir(parents=True, exist_ok=True)
79 out.write_text(html, encoding="utf-8")
80 return {"output_path": str(out), "total": len(findings), "counts": counts}
82 @mcp.tool()
83 def validate_input(source_type: str, input_path: str) -> dict:
84 findings = _load_payload(source_type=source_type, input_path=input_path)
85 return {"valid": True, "total_findings": len(findings)}
87 return mcp
90def run_server(transport: str = "stdio") -> None:
91 mcp = build_server()
92 if transport != "stdio":
93 raise ValueError("Only stdio transport is currently supported")
94 mcp.run(transport="stdio")