Coverage for src \ sec_report_kit \ mcp \ server.py: 100%

74 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-13 08:06 +0530

1from __future__ import annotations 

2 

3import json 

4from pathlib import Path 

5from typing import Any 

6 

7from sec_report_kit.parsers import detect_source_type 

8from sec_report_kit.parsers.bandit import parse_bandit_json 

9from sec_report_kit.parsers.checkov import parse_checkov_json 

10from sec_report_kit.parsers.codeql import parse_codeql_json 

11from sec_report_kit.parsers.gitleaks import parse_gitleaks_json 

12from sec_report_kit.parsers.osv_scanner import parse_osv_scanner_json 

13from sec_report_kit.parsers.pip_audit import parse_pip_audit_json 

14from sec_report_kit.parsers.semgrep import parse_semgrep_json 

15from sec_report_kit.parsers.tfsec import parse_tfsec_json 

16from sec_report_kit.parsers.trivy import parse_trivy_json 

17from sec_report_kit.parsers.trufflehog import parse_trufflehog_json 

18from sec_report_kit.report.html_renderer import render_html_report 

19from sec_report_kit.services.summarize import count_by_severity, sort_findings 

20 

21 

22def _load_payload(source_type: str, input_path: str): 

23 raw = Path(input_path).read_text(encoding="utf-8-sig") 

24 try: 

25 data = json.loads(raw) 

26 except json.JSONDecodeError: 

27 data = [json.loads(line) for line in raw.splitlines() if line.strip()] 

28 if source_type == "auto": 

29 source_type = detect_source_type(data) 

30 if source_type == "trivy": 

31 return sort_findings(parse_trivy_json(data)) 

32 if source_type == "pip-audit": 

33 return sort_findings(parse_pip_audit_json(data)) 

34 if source_type == "bandit": 

35 return sort_findings(parse_bandit_json(data)) 

36 if source_type == "gitleaks": 

37 return sort_findings(parse_gitleaks_json(data)) 

38 if source_type == "semgrep": 

39 return sort_findings(parse_semgrep_json(data)) 

40 if source_type == "codeql": 

41 return sort_findings(parse_codeql_json(data)) 

42 if source_type == "osv-scanner": 

43 return sort_findings(parse_osv_scanner_json(data)) 

44 if source_type == "checkov": 

45 return sort_findings(parse_checkov_json(data)) 

46 if source_type == "tfsec": 

47 return sort_findings(parse_tfsec_json(data)) 

48 if source_type == "trufflehog": 

49 return sort_findings(parse_trufflehog_json(data)) 

50 raise ValueError( 

51 "source_type must be one of: 'trivy', 'pip-audit', 'bandit', 'gitleaks', " 

52 "'semgrep', 'codeql', 'osv-scanner', 'checkov', 'tfsec', 'trufflehog', or 'auto'" 

53 ) 

54 

55 

56def build_server(): 

57 try: 

58 from mcp.server.fastmcp import FastMCP 

59 except Exception as exc: # pragma: no cover 

60 raise RuntimeError( 

61 "MCP dependencies are not installed. Install with: pip install sec-report-kit[mcp]" 

62 ) from exc 

63 

64 mcp = FastMCP("sec-report-kit") 

65 

66 @mcp.tool() 

67 def summarize_json(source_type: str, input_path: str) -> dict: 

68 findings = _load_payload(source_type=source_type, input_path=input_path) 

69 counts = count_by_severity(findings) 

70 return {"total": len(findings), "counts": counts} 

71 

72 @mcp.tool() 

73 def render_report_from_json(source_type: str, input_path: str, output_path: str, target: str) -> dict: 

74 findings = _load_payload(source_type=source_type, input_path=input_path) 

75 counts = count_by_severity(findings) 

76 html = render_html_report(target_ref=target, source_label=source_type, findings=findings, counts=counts) 

77 out = Path(output_path) 

78 out.parent.mkdir(parents=True, exist_ok=True) 

79 out.write_text(html, encoding="utf-8") 

80 return {"output_path": str(out), "total": len(findings), "counts": counts} 

81 

82 @mcp.tool() 

83 def validate_input(source_type: str, input_path: str) -> dict: 

84 findings = _load_payload(source_type=source_type, input_path=input_path) 

85 return {"valid": True, "total_findings": len(findings)} 

86 

87 return mcp 

88 

89 

90def run_server(transport: str = "stdio") -> None: 

91 mcp = build_server() 

92 if transport != "stdio": 

93 raise ValueError("Only stdio transport is currently supported") 

94 mcp.run(transport="stdio")