Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/cve_db/annot_aggregate.py: 78%

278 statements  

« prev     ^ index     » next       coverage.py v7.11.1, created at 2025-11-28 15:37 +0100

1# -*- coding: utf-8 -*- 

2# SPDX-License-Identifier: GPL-2.0-only 

3 

4import pathlib 

5from collections.abc import Generator, Iterable 

6from datetime import datetime 

7from typing import Any 

8 

9from ..sbom.component import CompBuild, CompId 

10from ..utils.reproducible import ReproducibleDateTime 

11from ..vuln.cve import ( 

12 CveExtReference, 

13 CveId, 

14 CveVexAffectedAssessment, 

15 CveVexAssessment, 

16 CveVexFixedAssessment, 

17 CveVexJustification, 

18 CveVexMissingVersionsAssessment, 

19 CveVexNotAffectedAssessment, 

20 CveVexOldVersNotVulnAssessment, 

21 CveVexRejectedAssessment, 

22) 

23from ..vuln.cvss import CvssMetric, CvssVersion 

24from ..vuln.version import SemVerRange, Version, VersRangeStatus 

25from .annot_base import AnnotDbEntry 

26from .db_base import CveDbEntry 

27 

28 

29class ObsoleteAssessment: 

30 def __init__( 

31 self, 

32 obs_assess: CveVexAssessment, 

33 obs_cve_entries: tuple[CveDbEntry, ...], 

34 sec_assess: CveVexAssessment, 

35 sec_cve_entries: tuple[CveDbEntry, ...], 

36 msg_tpl: str | None, 

37 ) -> None: 

38 # Information about obsolete assessment (higher priority) 

39 self._obs_assess = obs_assess 

40 self._obs_cve_entries = obs_cve_entries 

41 # Information about assessment with lower priority 

42 self._sec_assess = sec_assess 

43 self._sec_cve_entries = sec_cve_entries 

44 # Message template to generate a description 

45 if msg_tpl is None: 

46 msg_tpl = ( 

47 "{cve_id} assessment {obs_assess}, from {obs_db_name}, is obsolete. " 

48 "Override assessment {sec_assess} from {sec_db_name}" 

49 ) 

50 self._msg_tpl: str = msg_tpl 

51 

52 def __str__(self) -> str: 

53 cve_id = self._obs_cve_entries[0].identifier.id 

54 obs_db_name = ", ".join( 

55 cve_entry.database.name for cve_entry in self._obs_cve_entries 

56 ) 

57 sec_db_name = ", ".join( 

58 cve_entry.database.name for cve_entry in self._sec_cve_entries 

59 ) 

60 

61 return self._msg_tpl.format( 

62 cve_id=cve_id, 

63 obs_assess=self._obs_assess, 

64 obs_db_name=obs_db_name, 

65 sec_assess=self._sec_assess, 

66 sec_db_name=sec_db_name, 

67 ) 

68 

69 

70class AggregateAnnotEntry(AnnotDbEntry): 

71 def __init__( 

72 self, comp_build: CompBuild, cve_db_entries: tuple[tuple[CveDbEntry, ...], ...] 

73 ) -> None: 

74 super().__init__(parent_db=None) 

75 self._comp_build = comp_build 

76 self._cve_db_entries = cve_db_entries 

77 self._timestamp = ReproducibleDateTime().now 

78 self._cvss_metrics: list[CvssMetric] | None = None 

79 self._assessment: CveVexAssessment | None = None 

80 self._obsolete_assessments: list[ObsoleteAssessment] = [] 

81 self._cve_id: CveId = self._find_cve_id() 

82 

83 def _find_cve_id(self) -> CveId: 

84 cve_id: CveId | None = None 

85 for cve_entry in self._iterate_cve_entries(): 

86 if cve_id is None: 

87 cve_id = cve_entry.identifier 

88 elif cve_entry.identifier != cve_id: 

89 raise RuntimeError("Aggregation of different CVE identifiers") 

90 if cve_id is None: 

91 raise ValueError("This object must contain at least one database entry") 

92 return cve_id 

93 

94 @property 

95 def identifier(self) -> CveId: 

96 return self._cve_id 

97 

98 def is_rejected(self) -> bool | None: 

99 rejected = None 

100 for cve_entry in self._iterate_cve_entries(): 

101 r = cve_entry.is_rejected() 

102 if r: 

103 return True 

104 if r is False: 

105 rejected = False 

106 return rejected 

107 

108 @property 

109 def date_published(self) -> datetime | None: 

110 date_published: datetime | None = None 

111 for cve_entry in self._iterate_cve_entries(): 

112 entry_date = cve_entry.date_published 

113 if entry_date and ((not date_published) or (entry_date < date_published)): 

114 date_published = entry_date 

115 return date_published 

116 

117 @property 

118 def date_modified(self) -> datetime | None: 

119 last_modified: datetime | None = None 

120 for cve_entries in self._cve_db_entries: 

121 for cve_entry in cve_entries: 

122 entry_date = cve_entry.date_modified 

123 if entry_date and ((not last_modified) or (entry_date > last_modified)): 

124 last_modified = entry_date 

125 if last_modified: 

126 return last_modified 

127 return None 

128 

129 def _iterate_cve_entries(self) -> Generator[CveDbEntry, None, None]: 

130 for cve_entries in self._cve_db_entries: 

131 yield from cve_entries 

132 

133 @property 

134 def description(self) -> str | None: 

135 for cve_entry in self._iterate_cve_entries(): 

136 desc = cve_entry.description 

137 if desc: 

138 return desc 

139 return None 

140 

141 def get_associated_sem_ver_ranges( 

142 self, comp_ids: Iterable[CompId] 

143 ) -> Iterable[SemVerRange]: 

144 """ 

145 Provides version ranges from the highest priority of CVE database entry 

146 (excluding annotation) 

147 """ 

148 vers_ranges: list[SemVerRange] = [] 

149 

150 for cve_entries in self._cve_db_entries: 

151 for cve_entry in cve_entries: 

152 if not cve_entry.is_annotation(): 

153 vers_ranges.extend( 

154 cve_entry.get_associated_sem_ver_ranges(comp_ids) 

155 ) 

156 if vers_ranges: 

157 break 

158 

159 return vers_ranges 

160 

161 @property 

162 def cvss_metrics(self) -> list[CvssMetric]: 

163 if self._cvss_metrics is not None: 

164 return self._cvss_metrics 

165 

166 # Indicates the CVSS versions that were added from higher priorities 

167 metrics_vers: set[CvssVersion] = set() 

168 # We want to prioritize metrics with a source if this is the same metric 

169 metrics_dict: dict[tuple[Any, ...], CvssMetric] = {} 

170 

171 for cve_entries in self._cve_db_entries: 

172 # Indicates the CVSS versions that were added for this database priority 

173 prio_vers: set[CvssVersion] = set() 

174 for cve_entry in cve_entries: 

175 for metric in cve_entry.cvss_metrics: 

176 cmp_key = metric.cmp_key() 

177 # If this CVSS version was not added in previous higher priorities 

178 if metric.cvss_ver not in metrics_vers: 

179 prev_metric = metrics_dict.get(cmp_key) 

180 if (prev_metric is None) or (prev_metric.source is None): 

181 metrics_dict[cmp_key] = metric 

182 prio_vers.add(metric.cvss_ver) 

183 metrics_vers.update(prio_vers) 

184 

185 self._cvss_metrics = list(metrics_dict.values()) 

186 return self._cvss_metrics 

187 

188 @property 

189 def external_refs(self) -> set[CveExtReference]: 

190 ext_refs = set() 

191 for cve_entry in self._iterate_cve_entries(): 

192 ext_refs.update(cve_entry.external_refs) 

193 return ext_refs 

194 

195 @property 

196 def affected_sources(self) -> set[str]: 

197 sources = set() 

198 for cve_entry in self._iterate_cve_entries(): 

199 sources.update(cve_entry.affected_sources) 

200 return sources 

201 

202 def _iterate_applicable_comp_ids(self) -> Generator[CompId, None, None]: 

203 for cve_entry in self._iterate_cve_entries(): 

204 yield from cve_entry._iterate_applicable_comp_ids() 

205 

206 def _add_obsolete_assessment( 

207 self, 

208 obs_assess: CveVexAssessment, 

209 obs_cve_entries: tuple[CveDbEntry, ...], 

210 sec_assess: CveVexAssessment, 

211 sec_cve_entries: tuple[CveDbEntry, ...], 

212 msg_tpl: str | None = None, 

213 ) -> None: 

214 self._obsolete_assessments.append( 

215 ObsoleteAssessment( 

216 obs_assess, obs_cve_entries, sec_assess, sec_cve_entries, msg_tpl 

217 ) 

218 ) 

219 

220 def _vex_assessment_from_vers_ranges( 

221 self, vers_ranges: Iterable[SemVerRange] 

222 ) -> CveVexAssessment | None: 

223 is_vuln_match = False 

224 is_vuln_smaller = False 

225 is_vuln_greater = False 

226 is_not_vuln_match = False 

227 is_not_vuln_trusted = False 

228 is_maybe_vuln = False 

229 version = Version(self._comp_build.version) 

230 

231 for vers_range in vers_ranges: 

232 s = vers_range.check_in_range(version) 

233 if vers_range.vulnerable: 

234 if s == VersRangeStatus.IN_RANGE: 

235 is_vuln_match = True 

236 elif s == VersRangeStatus.SMALLER: 

237 is_vuln_smaller = True 

238 elif s == VersRangeStatus.GREATER: 

239 is_vuln_greater = True 

240 elif s == VersRangeStatus.IN_RANGE: 

241 is_not_vuln_match = True 

242 is_not_vuln_trusted = ( 

243 is_not_vuln_trusted or vers_range.is_not_vuln_trusted() 

244 ) 

245 elif s != VersRangeStatus.NO_INFO: 

246 is_maybe_vuln = True 

247 

248 if is_not_vuln_trusted: 

249 # Consider that this version was patched 

250 return CveVexFixedAssessment( 

251 vex_version="1", status_notes="version-not-in-range" 

252 ) 

253 

254 if is_vuln_match: 

255 # The version is within a vulnerable version range 

256 return CveVexAffectedAssessment( 

257 vex_version="1", 

258 action_statement="Mitigation action unknown", 

259 status_notes="version-in-range", 

260 action_statement_time=self._timestamp, 

261 ) 

262 

263 if ( 

264 is_maybe_vuln 

265 and (not is_vuln_smaller) 

266 and (not is_vuln_greater) 

267 and (not is_not_vuln_match) 

268 ): 

269 # This version is maybe vulnerable: This is very unlikely to enter this 

270 # code path... 

271 return CveVexAffectedAssessment( 

272 vex_version="1", 

273 action_statement="Check if really vulnerable", 

274 status_notes="version-maybe-in-range", 

275 action_statement_time=self._timestamp, 

276 ) 

277 

278 if is_vuln_smaller and (not is_vuln_greater) and (not is_not_vuln_match): 

279 # Consider that this version was never impacted by this CVE (since too old) 

280 return CveVexOldVersNotVulnAssessment( 

281 vex_version="1", 

282 status_notes="version-not-in-range", 

283 ) 

284 

285 if is_vuln_greater or is_not_vuln_match: 

286 # Consider that this version was patched 

287 return CveVexFixedAssessment( 

288 vex_version="1", status_notes="version-not-in-range" 

289 ) 

290 

291 return None 

292 

293 def _vex_assessment_from_compiled_sources( 

294 self, cve_entries: tuple[CveDbEntry, ...] 

295 ) -> CveVexAssessment | None: 

296 affected_files: set[str] = set() 

297 for cve_entry in cve_entries: 

298 affected_files.update(cve_entry.affected_sources) 

299 

300 if not affected_files: 

301 return None 

302 

303 compiled_files = self._comp_build.compiled_sources_index 

304 if not compiled_files: 

305 return None 

306 

307 for affected_file in affected_files: 

308 name = pathlib.Path(affected_file).name 

309 paths = compiled_files.get(name) 

310 if not paths: 

311 continue 

312 p = f"/{affected_file}" if (affected_file[:1] != "/") else affected_file 

313 for path in paths: 

314 if f"/{path}".endswith(p): 

315 return None 

316 

317 return CveVexNotAffectedAssessment( 

318 vex_version="1", 

319 impact_statement="Code not compiled", 

320 status_notes="not-applicable-platform", 

321 impact_statement_time=self._timestamp, 

322 justification=CveVexJustification.VULNERABLE_CODE_NOT_PRESENT, 

323 ) 

324 

325 def _vex_assessment_from_rejected_cve(self) -> CveVexAssessment | None: 

326 # If the CVE is considered rejected by any databases (ignore priorities), 

327 # generate an assessment. 

328 assessment: CveVexAssessment | None = None 

329 rej_cve_entries: list[CveDbEntry] = [] 

330 for cve_entries in reversed(self._cve_db_entries): 

331 if not cve_entries: 

332 continue 

333 

334 if not rej_cve_entries: 

335 # Search if a database indicates that the CVE is rejected 

336 rej_cve_entries.extend( 

337 cve_entry for cve_entry in cve_entries if cve_entry.is_rejected() 

338 ) 

339 if rej_cve_entries: 

340 assessment = CveVexRejectedAssessment( 

341 vex_version="1", 

342 impact_statement="CVE rejected", 

343 status_notes="cve-rejected", 

344 impact_statement_time=self._timestamp, 

345 justification=CveVexJustification.VULNERABLE_CODE_NOT_PRESENT, 

346 ) 

347 else: 

348 # Verify if an annotation database requires checking for obsolete 

349 # assessments 

350 if not self._is_obsolete_check_needed(cve_entries): 

351 continue 

352 

353 cve_entry = cve_entries[0] 

354 if not isinstance(cve_entry, AnnotDbEntry): 

355 continue 

356 

357 assert assessment is not None 

358 a = cve_entry.vex_assessment 

359 if a is not None: 

360 self._add_obsolete_assessment( 

361 a, 

362 cve_entries, 

363 assessment, 

364 tuple(rej_cve_entries), 

365 "{cve_id} assessment {obs_assess}, from {obs_db_name}, " 

366 "is obsolete since CVE is rejected by {sec_db_name}", 

367 ) 

368 return assessment 

369 

370 @staticmethod 

371 def _is_obsolete_check_needed(cve_entries: tuple[CveDbEntry, ...]) -> bool: 

372 # Indicates if the obsolescence of the assessment, generated from these 

373 # database entries, needs to be verified. Also check if this group of CVE 

374 # entries contains annotation or CVE database entries and there are no mix up. 

375 is_annotation: bool | None = None 

376 obsolete_check: bool = False 

377 

378 for cve_entry in cve_entries: 

379 obsolete_check = ( 

380 obsolete_check or cve_entry.database.obsolete_assessment_check_enabled 

381 ) 

382 entry_is_annot = cve_entry.is_annotation() 

383 if is_annotation is None: 

384 is_annotation = entry_is_annot 

385 elif is_annotation and entry_is_annot: 

386 raise RuntimeError( 

387 "Cannot have multiple annotation databases with same priority" 

388 ) 

389 elif is_annotation != entry_is_annot: 

390 raise RuntimeError( 

391 "Cannot mix annotation with CVE database entry in the same priority" 

392 ) 

393 

394 assert is_annotation is not None 

395 return obsolete_check 

396 

397 def _compute_assessment_from_cve_entries( 

398 self, cve_entries: tuple[CveDbEntry, ...], obsolete_check: bool 

399 ) -> tuple[CveVexAssessment | None, bool]: 

400 assessment: CveVexAssessment | None = None 

401 

402 cve_entry = cve_entries[0] 

403 if isinstance(cve_entry, AnnotDbEntry): 

404 # If this is an annotation, just provide the annotation VEX assessment 

405 assessment = cve_entry.vex_assessment 

406 else: 

407 # For CVE database entries, version ranges need to be merged 

408 vers_ranges: list[SemVerRange] = [] 

409 for cve_entry in cve_entries: 

410 vers_ranges.extend( 

411 cve_entry.get_associated_sem_ver_ranges( 

412 self._comp_build.identifiers 

413 ) 

414 ) 

415 if vers_ranges: 

416 assessment = self._vex_assessment_from_vers_ranges(vers_ranges) 

417 

418 # If no assessment was computed or if the component is vulnerable 

419 if (assessment is None) or assessment.status.is_vulnerable(): 

420 # Then, if the CVE provides affected sources and if the components provide 

421 # compiled sources files, use that to generate an assessment that indicates 

422 # that the components are not affected 

423 a = self._vex_assessment_from_compiled_sources(cve_entries) 

424 if a is not None: 

425 if obsolete_check and (assessment is not None): 

426 # Indicates that the previously obtained assessment is obsolete, 

427 # since affected sources are not compiled 

428 self._add_obsolete_assessment( 

429 assessment, 

430 cve_entries, 

431 a, 

432 cve_entries, 

433 "{cve_id} assessment {obs_assess}, from {obs_db_name}, " 

434 "is obsolete since affected sources are not compiled", 

435 ) 

436 # There is no need to continue to check for obsolete assessments 

437 obsolete_check = False 

438 assessment = a 

439 

440 return assessment, (obsolete_check if assessment is not None else False) 

441 

442 def _compute_assessment(self) -> None: 

443 # First check if the CVE is rejected, which is a special case 

444 assessment = self._vex_assessment_from_rejected_cve() 

445 if assessment is not None: 

446 self._assessment = assessment 

447 return 

448 

449 # Variables associated with the computation of obsolete assessments 

450 compute_next_assess: bool = True 

451 prev_cve_entries: tuple[CveDbEntry, ...] = () 

452 prev_assessment: CveVexAssessment | None = None 

453 

454 # Next, iterate through each database priority (starting from the highest) 

455 for cve_entries in self._cve_db_entries: 

456 if not cve_entries: 

457 continue 

458 

459 obsolete_check = self._is_obsolete_check_needed(cve_entries) 

460 if compute_next_assess or obsolete_check: 

461 a, obsolete_check = self._compute_assessment_from_cve_entries( 

462 cve_entries, obsolete_check 

463 ) 

464 # Keep only the first computed assessment 

465 if assessment is None: 

466 assessment = a 

467 

468 # Check if we need to compute next assessment 

469 if obsolete_check: 

470 compute_next_assess = True 

471 elif a is not None: 

472 compute_next_assess = False 

473 

474 # Verify if previously computed assessment is not obsolete 

475 if (prev_assessment is not None) and (a is not None): 

476 if ( 

477 prev_assessment.status.is_vulnerable() 

478 == a.status.is_vulnerable() 

479 ): 

480 self._add_obsolete_assessment( 

481 prev_assessment, prev_cve_entries, a, cve_entries 

482 ) 

483 prev_cve_entries = () 

484 prev_assessment = None 

485 

486 if obsolete_check: 

487 prev_cve_entries = cve_entries 

488 prev_assessment = a 

489 

490 # If there is no assessment, provide a default one 

491 if assessment is None: 

492 assessment = CveVexMissingVersionsAssessment( 

493 vex_version="1", 

494 action_statement="Check package version", 

495 status_notes="no-version-ranges", 

496 action_statement_time=self._timestamp, 

497 ) 

498 self._assessment = assessment 

499 

500 @property 

501 def vex_assessment(self) -> CveVexAssessment: 

502 if self._assessment is None: 

503 self._compute_assessment() 

504 assert self._assessment is not None 

505 return self._assessment 

506 

507 @property 

508 def obsolete_assessments(self) -> list[ObsoleteAssessment]: 

509 if self._assessment is None: 

510 self._compute_assessment() 

511 return self._obsolete_assessments