Coverage for src \ sec_report_kit \ parsers \ codeql.py: 100%

47 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-13 08:06 +0530

1from __future__ import annotations 

2 

3from sec_report_kit.models import Finding 

4from sec_report_kit.services.normalize import normalize_severity 

5 

6 

7def _extract_rule_index(result: dict) -> int | None: 

8 locations = result.get("locations") 

9 if isinstance(locations, list) and locations: 

10 logical = locations[0].get("logicalLocations") 

11 if isinstance(logical, list) and logical: 

12 idx = logical[0].get("index") 

13 if isinstance(idx, int): 

14 return idx 

15 return None 

16 

17 

18def _rule_info(run: dict, result: dict) -> tuple[str, str, str]: 

19 # Returns rule id, title, primary_url. 

20 rules = run.get("tool", {}).get("driver", {}).get("rules", []) 

21 rule_id = str(result.get("ruleId") or "-") 

22 title = str(result.get("message", {}).get("text") or "CodeQL finding") 

23 primary_url = "" 

24 

25 for rule in rules if isinstance(rules, list) else []: 

26 if str(rule.get("id") or "") == rule_id: 

27 title = str(rule.get("shortDescription", {}).get("text") or title) 

28 primary_url = str(rule.get("helpUri") or "") 

29 return rule_id, title, primary_url 

30 

31 idx = _extract_rule_index(result) 

32 if idx is not None and 0 <= idx < len(rules): 

33 rule = rules[idx] 

34 rule_id = str(rule.get("id") or rule_id) 

35 title = str(rule.get("shortDescription", {}).get("text") or title) 

36 primary_url = str(rule.get("helpUri") or "") 

37 

38 return rule_id, title, primary_url 

39 

40 

41def parse_codeql_json(data: dict) -> list[Finding]: 

42 findings: list[Finding] = [] 

43 

44 for run in data.get("runs", []) if isinstance(data, dict) else []: 

45 for result in run.get("results", []) if isinstance(run, dict) else []: 

46 level = result.get("level") 

47 if isinstance(level, str): 

48 sev_map = {"error": "HIGH", "warning": "MEDIUM", "note": "LOW"} 

49 severity = sev_map.get(level.lower(), "UNKNOWN") 

50 else: 

51 severity = "UNKNOWN" 

52 

53 rule_id, title, primary_url = _rule_info(run, result) 

54 

55 target = "repository" 

56 locations = result.get("locations") 

57 if isinstance(locations, list) and locations: 

58 phys = locations[0].get("physicalLocation", {}) 

59 artifact = phys.get("artifactLocation", {}) if isinstance(phys, dict) else {} 

60 target = str(artifact.get("uri") or target) 

61 

62 findings.append( 

63 Finding( 

64 source_type="codeql-sast", 

65 target=target, 

66 severity=normalize_severity(severity), 

67 vulnerability_id=rule_id, 

68 package=rule_id, 

69 installed_version="-", 

70 fixed_version="-", 

71 title=title, 

72 primary_url=primary_url, 

73 ) 

74 ) 

75 

76 return findings