Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/cve_db/annot_openvex.py: 95%

97 statements  

« prev     ^ index     » next       coverage.py v7.11.1, created at 2025-11-28 15:37 +0100

1# -*- coding: utf-8 -*- 

2# SPDX-License-Identifier: GPL-2.0-only 

3 

4import json 

5import pathlib 

6from collections.abc import Generator 

7from datetime import datetime 

8from typing import Any 

9 

10from ..utils import parsing 

11from ..vuln.cpe import Cpe23 

12from ..vuln.cve import ( 

13 CveId, 

14 CveVexAffectedAssessment, 

15 CveVexAssessment, 

16 CveVexFixedAssessment, 

17 CveVexJustification, 

18 CveVexNotAffectedAssessment, 

19 CveVexStatus, 

20 CveVexUnderInvestigationAssessment, 

21) 

22from .annot_base import AnnotDbEntry 

23from .db_base import CveDatabase, CveDatabaseT 

24from .db_git import GitAnnotDatabase 

25from .registry import register_cve_db 

26 

27 

28def cve_vex_justification_from_str(s: str | None) -> CveVexJustification | None: 

29 map_justification: dict[str, CveVexJustification] = { 

30 "component_not_present": CveVexJustification.COMPONENT_NOT_PRESENT, 

31 "vulnerable_code_not_present": CveVexJustification.VULNERABLE_CODE_NOT_PRESENT, 

32 "inline_mitigations_already_exist": 

33 CveVexJustification.INLINE_MITIGATIONS_ALREADY_EXIST, 

34 "vulnerable_code_cannot_be_controlled_by_adversary": 

35 CveVexJustification.VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY, 

36 "vulnerable_code_not_in_execute_path": 

37 CveVexJustification.VULNERABLE_CODE_NOT_IN_EXECUTE_PATH, 

38 } # fmt: skip 

39 

40 return map_justification.get(s) if s else None 

41 

42 

43class OpenVexAnnotEntry(AnnotDbEntry): 

44 """Represent one OpenVEX statement""" 

45 

46 def __init__( 

47 self, 

48 parent_db: CveDatabase, 

49 json_obj: dict[str, Any], 

50 doc_timestamp: datetime | None = None, 

51 ) -> None: 

52 super().__init__(parent_db) 

53 self._json = json_obj 

54 self._doc_timestamp = doc_timestamp 

55 

56 @property 

57 def identifier(self) -> CveId: 

58 return CveId(self._json.get("vulnerability", {}).get("name")) 

59 

60 @property 

61 def date_modified(self) -> datetime | None: 

62 return parsing.datetime_from_iso_format(self._json.get("last_updated")) 

63 

64 @property 

65 def description(self) -> str | None: 

66 d = self._json.get("vulnerability", {}).get("description") 

67 return d if isinstance(d, str) else None 

68 

69 @property 

70 def vex_assessment(self) -> CveVexAssessment | None: 

71 version = self._json.get("version") 

72 notes = self._json.get("status_notes") 

73 status = CveVexStatus(self._json.get("status")) 

74 timestamp = ( 

75 parsing.datetime_from_iso_format(self._json.get("timestamp")) 

76 or self._doc_timestamp 

77 ) 

78 

79 if status == CveVexStatus.NOT_AFFECTED: 

80 statement = self._json.get("impact_statement", "") 

81 justification = self._json.get("justification") 

82 

83 return CveVexNotAffectedAssessment( 

84 vex_version=version, 

85 status_notes=notes, 

86 impact_statement=statement, 

87 impact_statement_time=timestamp, 

88 justification=cve_vex_justification_from_str(justification), 

89 ) 

90 

91 if status == CveVexStatus.AFFECTED: 

92 statement = self._json.get("action_statement", "") 

93 statement_time = self._json.get("action_statement_timestamp") 

94 if not statement_time: 

95 statement_time = timestamp 

96 return CveVexAffectedAssessment( 

97 vex_version=version, 

98 status_notes=notes, 

99 action_statement=statement, 

100 action_statement_time=statement_time, 

101 ) 

102 

103 if status == CveVexStatus.FIXED: 

104 return CveVexFixedAssessment(vex_version=version, status_notes=notes) 

105 

106 if status == CveVexStatus.UNDER_INVESTIGATION: 

107 return CveVexUnderInvestigationAssessment( 

108 vex_version=version, status_notes=notes 

109 ) 

110 

111 return None 

112 

113 def _iterate_cpes(self, pkg: Any) -> Generator[Cpe23, None, None]: 

114 for product in self._json.get("products", []): 

115 cpe = Cpe23.parse(product.get("identifiers", {}).get("cpe23")) 

116 if cpe: 

117 yield cpe 

118 

119 

120@register_cve_db("openvex-dir") 

121class OpenVexAnnotDatabase(GitAnnotDatabase): 

122 @classmethod 

123 def create_from_config(cls: type[CveDatabaseT], **kwargs: Any) -> CveDatabaseT: 

124 parsing.update_list_param(kwargs, "globs", required=True) 

125 return super().create_from_config(**kwargs) 

126 

127 def __init__( 

128 self, path: pathlib.Path, name: str, globs: list[str], **kwargs: Any 

129 ) -> None: 

130 super().__init__(path, name, **kwargs) 

131 self._vex_globs = globs 

132 self._statements: dict[CveId, OpenVexAnnotEntry] = {} 

133 

134 def iterate_file_statements( 

135 self, path_vex: pathlib.Path 

136 ) -> Generator[OpenVexAnnotEntry, None, None]: 

137 with path_vex.open(encoding="utf-8") as f: 

138 doc = json.load(f) 

139 doc_timestamp = parsing.datetime_from_iso_format(doc.get("timestamp")) 

140 for s in doc.get("statements", []): 

141 yield OpenVexAnnotEntry(self, s, doc_timestamp) 

142 

143 def _iterate_vex_files(self) -> Generator[pathlib.Path, None, None]: 

144 """Iterate over all VEX files managed by this database""" 

145 for vex_glob in self._vex_globs: 

146 yield from self._db_dir.glob(vex_glob) 

147 

148 def _initialize(self) -> None: 

149 self._statements.clear() 

150 for path_json in self._iterate_vex_files(): 

151 for a in self.iterate_file_statements(path_json): 

152 if a.identifier.id in self._statements: 

153 raise RuntimeError( 

154 f"Multiple statements for the same CVE: {a.identifier.id}" 

155 ) 

156 self._statements[a.identifier] = a 

157 

158 def _get_index_cache_invalidation_data(self) -> bytes: 

159 return str(self._vex_globs).encode() 

160 

161 def get_cve(self, cve_id: CveId) -> OpenVexAnnotEntry | None: 

162 return self._statements.get(cve_id) 

163 

164 def iterate_cves(self) -> Generator[OpenVexAnnotEntry, None, None]: 

165 yield from self._statements.values() 

166 

167 

168@register_cve_db("openvex-file") 

169class OpenVexFileAnnotDatabase(OpenVexAnnotDatabase): 

170 @classmethod 

171 def create_from_config(cls: type[CveDatabaseT], **kwargs: Any) -> CveDatabaseT: 

172 kwargs["globs"] = [] 

173 return super().create_from_config(**kwargs) 

174 

175 def __init__(self, path: pathlib.Path, name: str, **kwargs: Any) -> None: 

176 kwargs.pop("globs", None) 

177 super().__init__(path.parent, name, globs=[path.name], **kwargs)