Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/cve_db/annot_yocto.py: 79%

112 statements  

« prev     ^ index     » next       coverage.py v7.11.1, created at 2025-11-28 15:37 +0100

1# -*- coding: utf-8 -*- 

2# SPDX-License-Identifier: GPL-2.0-only 

3 

4import json 

5import pathlib 

6from collections.abc import Generator, Iterable 

7from datetime import UTC, datetime 

8from typing import Any 

9 

10from ..sbom.component import CompId 

11from ..utils import parsing 

12from ..vuln.cpe import Cpe23 

13from ..vuln.cve import ( 

14 CveId, 

15 CveVexAffectedAssessment, 

16 CveVexAssessment, 

17 CveVexFixedAssessment, 

18 CveVexJustification, 

19 CveVexNotAffectedAssessment, 

20) 

21from ..vuln.cvss import CvssMetric, CvssVersion 

22from ..vuln.version import SemVerRange 

23from .annot_base import AnnotDatabase, AnnotDbEntry 

24from .db_base import CveDatabase 

25from .registry import register_cve_db 

26 

27 

28class YoctoAnnotEntry(AnnotDbEntry): 

29 """Represent one Yocto statement""" 

30 

31 def __init__( 

32 self, 

33 parent_db: CveDatabase, 

34 pkg_obj: dict[str, Any], 

35 issue_obj: dict[str, Any], 

36 doc_timestamp: datetime, 

37 ) -> None: 

38 super().__init__(parent_db) 

39 self._pkg_obj = pkg_obj 

40 self._issue_obj = issue_obj 

41 self._timestamp = ( 

42 parsing.datetime_from_iso_format(self._issue_obj.get("modified")) 

43 or doc_timestamp 

44 ) 

45 

46 @property 

47 def identifier(self) -> CveId: 

48 return CveId(self._issue_obj["id"]) 

49 

50 @property 

51 def date_modified(self) -> datetime | None: 

52 return self._timestamp 

53 

54 @property 

55 def description(self) -> str | None: 

56 return self._issue_obj.get("summary") 

57 

58 @property 

59 def cvss_metrics(self) -> list[CvssMetric]: 

60 vect: str | None = self._issue_obj.get("vectorString") 

61 if not vect: 

62 return [] 

63 

64 score_str: str | None = self._issue_obj.get("scorev4") 

65 if score_str and score_str != "0.0" and vect.startswith("CVSS:4.0/"): 

66 score = float(score_str) 

67 sev = CvssMetric.compute_severity_from_score(score) 

68 return [CvssMetric(CvssVersion.V4_0, score, vect, sev)] 

69 

70 score_str = self._issue_obj.get("scorev3") 

71 if score_str and score_str != "0.0": 

72 score = float(score_str) 

73 sev = CvssMetric.compute_severity_from_score(score) 

74 if vect.startswith("CVSS:3.1/"): 

75 return [CvssMetric(CvssVersion.V3_1, score, vect, sev)] 

76 if vect.startswith("CVSS:3.0/"): 

77 return [CvssMetric(CvssVersion.V3_0, score, vect, sev)] 

78 

79 score_str = self._issue_obj.get("scorev2") 

80 if score_str and score_str != "0.0": 

81 score = float(score_str) 

82 return [CvssMetric(CvssVersion.V2_0, score, vect)] 

83 

84 return [] 

85 

86 def _get_patch_files_msg(self, detail: str | None) -> str | None: 

87 if detail != "fix-file-included": 

88 return None 

89 

90 files = self._issue_obj.get("patch-file") 

91 if not files: 

92 return None 

93 

94 lst_files: list[str] = [] 

95 if isinstance(files, str): 

96 lst_files.append(pathlib.Path(files).name) 

97 elif isinstance(files, list): 

98 lst_files.extend(pathlib.Path(f).name for f in files) 

99 

100 return f"Fixed by: {lst_files}" if lst_files else None 

101 

102 @property 

103 def vex_assessment(self) -> CveVexAssessment | None: 

104 status = self._issue_obj.get("status") 

105 detail = self._issue_obj.get("detail") 

106 description = self._issue_obj.get("description") 

107 

108 if status == "Patched": 

109 patches = self._get_patch_files_msg(detail) 

110 return CveVexFixedAssessment( 

111 status_notes=": ".join(v for v in (detail, description, patches) if v) 

112 ) 

113 

114 if status == "Unpatched": 

115 return CveVexAffectedAssessment( 

116 status_notes=": ".join(v for v in (detail, description) if v), 

117 action_statement="Mitigation action unknown", 

118 action_statement_time=self._timestamp, 

119 ) 

120 

121 if status == "Ignored": 

122 justification = None 

123 if detail == "cpe-incorrect": 

124 justification = CveVexJustification.COMPONENT_NOT_PRESENT 

125 elif detail == "disputed": 

126 justification = CveVexJustification.VULNERABLE_CODE_NOT_PRESENT 

127 elif detail in ("not-applicable-config", "not-applicable-platform"): 

128 justification = CveVexJustification.VULNERABLE_CODE_NOT_IN_EXECUTE_PATH 

129 

130 return CveVexNotAffectedAssessment( 

131 status_notes=detail, 

132 impact_statement=description or "", 

133 impact_statement_time=self._timestamp, 

134 justification=justification, 

135 ) 

136 

137 return None 

138 

139 def _iterate_cpes(self, pkg: Any) -> Generator[Cpe23, None, None]: 

140 for cpe_str in self._pkg_obj.get("cpes", []): 

141 cpe = Cpe23.parse(cpe_str) 

142 if cpe: 

143 yield cpe 

144 

145 def get_associated_sem_ver_ranges( 

146 self, comp_ids: Iterable[CompId] 

147 ) -> Iterable[SemVerRange]: 

148 return self._get_associated_sem_ver_ranges( 

149 comp_ids, self._pkg_obj.get("version") 

150 ) 

151 

152 

153@register_cve_db("yocto-vex-manifest") 

154class YoctoAnnotDatabase(AnnotDatabase): 

155 def __init__(self, path: pathlib.Path, name: str, **kwargs: Any) -> None: 

156 super().__init__(name, **kwargs) 

157 self._manifest_file = path.resolve(strict=True) 

158 self._statements: dict[CveId, YoctoAnnotEntry] = {} 

159 

160 def _initialize(self) -> None: 

161 self._statements.clear() 

162 

163 with self._manifest_file.open(encoding="utf-8") as f: 

164 doc = json.load(f) 

165 

166 doc_timestamp = datetime.fromtimestamp( 

167 self._manifest_file.stat().st_mtime, tz=UTC 

168 ) 

169 

170 for pkg in doc.get("package", []): 

171 for issue in pkg.get("issue", []): 

172 a = YoctoAnnotEntry(self, pkg, issue, doc_timestamp) 

173 if a.identifier.id in self._statements: 

174 raise RuntimeError( 

175 f"Multiple statements for the same CVE: {a.identifier.id}" 

176 ) 

177 self._statements[a.identifier] = a 

178 

179 def get_cve(self, cve_id: CveId) -> YoctoAnnotEntry | None: 

180 return self._statements.get(cve_id) 

181 

182 def iterate_cves(self) -> Generator[YoctoAnnotEntry, None, None]: 

183 yield from self._statements.values()