Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/cve_db/db_cvelist.py: 90%

198 statements  

« prev     ^ index     » next       coverage.py v7.11.1, created at 2025-11-28 15:37 +0100

1# -*- coding: utf-8 -*- 

2# SPDX-License-Identifier: GPL-2.0-only 

3 

4import json 

5import pathlib 

6from collections.abc import Generator, Iterable 

7from datetime import datetime 

8from typing import Any 

9 

10from ..sbom.component import CompId 

11from ..utils import parsing 

12from ..vuln.cpe import parse_cpe_match 

13from ..vuln.cve import CveExtReference, CveId 

14from ..vuln.cvss import CvssMetric, CvssVersion 

15from ..vuln.version import SemVerRange, VersionRange, VersionRangeBuilder 

16from .db_base import CveDatabase, CveDbEntry 

17from .db_git import GitCveDatabase 

18from .registry import register_cve_db 

19 

20 

21class CveListCveEntry(CveDbEntry): 

22 def __init__(self, parent_db: CveDatabase, json_obj: dict[str, Any]) -> None: 

23 super().__init__(parent_db) 

24 self._json = json_obj 

25 

26 @property 

27 def identifier(self) -> CveId: 

28 return CveId(self._json.get("cveMetadata", {}).get("cveId")) 

29 

30 def is_annotation(self) -> bool: 

31 return False 

32 

33 def is_rejected(self) -> bool: 

34 state = str(self._json.get("cveMetadata", {}).get("state")) 

35 return state == "REJECTED" 

36 

37 @property 

38 def date_published(self) -> datetime | None: 

39 return parsing.datetime_from_iso_format( 

40 self._json.get("cveMetadata", {}).get("datePublished") 

41 ) 

42 

43 @property 

44 def date_modified(self) -> datetime | None: 

45 return parsing.datetime_from_iso_format( 

46 self._json.get("cveMetadata", {}).get("dateUpdated") 

47 ) 

48 

49 def _iterate_containers(self) -> Generator[dict[str, Any], None, None]: 

50 containers_obj = self._json.get("containers", {}) 

51 cna = containers_obj.get("cna") 

52 if cna is not None: 

53 yield cna 

54 yield from containers_obj.get("adp", []) 

55 

56 @staticmethod 

57 def _container_description(container: dict[str, Any]) -> str | None: 

58 descriptions = container.get("descriptions") 

59 if not descriptions: 

60 return None 

61 

62 for desc in descriptions: 

63 if desc.get("lang") == "en": 

64 txt = desc.get("value") 

65 if txt: 

66 return str(txt) 

67 

68 d = descriptions[0].get("value") 

69 return d if isinstance(d, str) else None 

70 

71 @staticmethod 

72 def _container_cna_org(container: dict[str, Any]) -> str | None: 

73 """:return: CNA organization identifier for this container""" 

74 org = container.get("providerMetadata", {}).get("orgId") 

75 return org if isinstance(org, str) else None 

76 

77 @property 

78 def description(self) -> str | None: 

79 for c in self._iterate_containers(): 

80 desc = self._container_description(c) 

81 if desc is not None: 

82 return desc 

83 

84 return None 

85 

86 @staticmethod 

87 def _iterate_cpe_matches( 

88 container: dict[str, Any], 

89 ) -> Generator[dict[str, Any], None, None]: 

90 for cpe_applicability in container.get("cpeApplicability", []): 

91 for node in cpe_applicability.get("nodes", []): 

92 yield from node.get("cpeMatch", []) 

93 

94 @staticmethod 

95 def _parse_affected_version( 

96 json_obj: dict[str, Any], cna_org: str | None 

97 ) -> VersionRange | None: 

98 status = json_obj.get("status") 

99 if status == "affected": 

100 is_vuln = True 

101 elif status == "unaffected": 

102 is_vuln = False 

103 else: 

104 is_vuln = None 

105 

106 vers_type: str | None = json_obj.get("versionType") 

107 vers_range_builder = VersionRangeBuilder( 

108 vulnerable=is_vuln, from_cpe=False, from_cna=cna_org 

109 ) 

110 

111 v_end_le = json_obj.get("lessThanOrEqual") 

112 if v_end_le: 

113 vers_range_builder.set_end_version(v_end_le, including=True) 

114 

115 v_end_lt = json_obj.get("lessThan") 

116 if v_end_lt: 

117 vers_range_builder.set_end_version(v_end_lt, including=False) 

118 

119 vers_val = json_obj.get("version") 

120 if vers_val is None: 

121 return None 

122 

123 if v_end_le or v_end_lt or (vers_type == "git"): 

124 vers_range_builder.set_start_version(vers_val, including=True) 

125 else: 

126 vers_range_builder.set_start_version(vers_val, equal=True) 

127 

128 if vers_range_builder.is_valid(): 

129 if vers_type == "git": 

130 return vers_range_builder.git_ver_range 

131 return vers_range_builder.sem_ver_range 

132 

133 return None 

134 

135 def _iterate_applicable_comp_ids(self) -> Generator[CompId, None, None]: 

136 for c in self._iterate_containers(): 

137 # Process affected blocks 

138 for affected in c.get("affected", []): 

139 # Check if there is at least one affected version 

140 is_vuln = False 

141 for version in affected.get("versions", []): 

142 if version.get("status") == "affected": 

143 is_vuln = True 

144 break 

145 

146 # If there is no vulnerable version, do not check CPEs 

147 if not is_vuln: 

148 continue 

149 

150 # Extract CPEs and package/component name 

151 for cpe in affected.get("cpes", []): 

152 comp_id = CompId.build_from_cpe(cpe) 

153 if comp_id is not None: 

154 yield comp_id 

155 

156 pkg_name = affected.get("packageName") 

157 if pkg_name: 

158 yield CompId(name=pkg_name) 

159 

160 # Process cpeMatch blocks 

161 for cpe_match in self._iterate_cpe_matches(c): 

162 if cpe_match.get("vulnerable", True): 

163 comp_id = CompId.build_from_cpe(cpe_match.get("criteria")) 

164 if comp_id is not None: 

165 yield comp_id 

166 

167 def get_associated_sem_ver_ranges( 

168 self, comp_ids: Iterable[CompId] 

169 ) -> Iterable[SemVerRange]: 

170 vers_ranges: list[SemVerRange] = [] 

171 cpe_applicable_comp_ids: set[CompId] = set() 

172 

173 # First iterate over cpeMatch blocks 

174 for c in self._iterate_containers(): 

175 cna_org = self._container_cna_org(c) 

176 

177 for cpe_match_json in self._iterate_cpe_matches(c): 

178 cpe, vers_range = parse_cpe_match(cpe_match_json, cna_org) 

179 comp_id = CompId.build_from_cpe(cpe) 

180 if (comp_id is not None) and (vers_range is not None): 

181 if vers_range.vulnerable: 

182 cpe_applicable_comp_ids.add(comp_id) 

183 if isinstance( 

184 vers_range, SemVerRange 

185 ) and comp_id.is_matching_one_of(comp_ids): 

186 vers_ranges.append(vers_range) 

187 

188 # Then, if there is only a unique component identifier specified, we could use 

189 # this component identifier for affected versions with missing CPE 

190 accept_version_without_comp_id = (len(cpe_applicable_comp_ids) == 1) and ( 

191 len(vers_ranges) > 0 

192 ) 

193 

194 # Iterate over affected blocks 

195 for c in self._iterate_containers(): 

196 cna_org = self._container_cna_org(c) 

197 

198 for affected in c.get("affected", []): 

199 # Extract CPEs and package name to check if this block is applicable to 

200 # specified component identifier 

201 is_applicable = None if accept_version_without_comp_id else False 

202 for cpe in affected.get("cpes", []): 

203 comp_id = CompId.build_from_cpe(cpe) 

204 if comp_id is not None: 

205 is_applicable = comp_id.is_matching_one_of(comp_ids) 

206 if is_applicable: 

207 break 

208 

209 if not is_applicable: 

210 pkg_name = affected.get("packageName") 

211 if pkg_name: 

212 is_applicable = CompId(name=pkg_name).is_matching_one_of( 

213 comp_ids 

214 ) 

215 

216 # If this block is not applicable, do not parse it, skip it 

217 if is_applicable is False: 

218 continue 

219 

220 # And finally parse each version 

221 for version in affected.get("versions", []): 

222 vers_range = self._parse_affected_version(version, cna_org) 

223 if (vers_range is not None) and isinstance(vers_range, SemVerRange): 

224 vers_ranges.append(vers_range) 

225 

226 return vers_ranges 

227 

228 @property 

229 def cvss_metrics(self) -> list[CvssMetric]: 

230 def add_cvss_to_set( 

231 m: list[CvssMetric], cvss_obj: dict[str, Any], cvss_ver: CvssVersion 

232 ) -> None: 

233 if cvss_obj is not None: 

234 cvss_info = CvssMetric.parse_cve_db_metric(cvss_obj, version=cvss_ver) 

235 if cvss_info is not None: 

236 m.append(cvss_info) 

237 

238 cvss_metrics: list[CvssMetric] = [] 

239 for c in self._iterate_containers(): 

240 for metric in c.get("metrics", []): 

241 add_cvss_to_set(cvss_metrics, metric.get("cvssV2_0"), CvssVersion.V2_0) 

242 add_cvss_to_set(cvss_metrics, metric.get("cvssV3_0"), CvssVersion.V3_0) 

243 add_cvss_to_set(cvss_metrics, metric.get("cvssV3_1"), CvssVersion.V3_1) 

244 add_cvss_to_set(cvss_metrics, metric.get("cvssV4_0"), CvssVersion.V4_0) 

245 

246 return cvss_metrics 

247 

248 @property 

249 def external_refs(self) -> set[CveExtReference]: 

250 ext_refs = set() 

251 for c in self._iterate_containers(): 

252 for ext_ref in c.get("references", []): 

253 url = ext_ref.get("url") 

254 if url: 

255 ext_refs.add(CveExtReference(url)) 

256 

257 return ext_refs 

258 

259 @property 

260 def affected_sources(self) -> set[str]: 

261 sources = set() 

262 for c in self._iterate_containers(): 

263 for affected in c.get("affected", []): 

264 sources.update(affected.get("programFiles", [])) 

265 return sources 

266 

267 

268@register_cve_db("cve-db-cvelist") 

269class CveListCveDatabase(GitCveDatabase): 

270 GIT_FETCH_URL = "https://github.com/CVEProject/cvelistV5.git" 

271 

272 def __init__( 

273 self, path: pathlib.Path, name: str, git_url: str = GIT_FETCH_URL, **kwargs: Any 

274 ) -> None: 

275 super().__init__(path, name, git_url, **kwargs) 

276 

277 def get_default_priority(self, order: int, from_config: bool) -> int: 

278 return 50 

279 

280 def get_cve(self, cve_id: CveId) -> CveDbEntry | None: 

281 path_json = self._git_dir.joinpath( 

282 "cves", cve_id.year, cve_id.number[:-3] + "xxx", f"{cve_id.id}.json" 

283 ) 

284 if path_json.is_file(): 

285 with path_json.open(encoding="utf-8") as f: 

286 return CveListCveEntry(self, json.load(f)) 

287 return None 

288 

289 def iterate_cves(self) -> Generator[CveDbEntry, None, None]: 

290 for path_json in self._git_dir.glob("cves/*/*/CVE-*.json"): 

291 with path_json.open(encoding="utf-8") as f: 

292 yield CveListCveEntry(self, json.load(f))