Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/cve_db/annot_aggregate.py: 78%
278 statements
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
1# -*- coding: utf-8 -*-
2# SPDX-License-Identifier: GPL-2.0-only
4import pathlib
5from collections.abc import Generator, Iterable
6from datetime import datetime
7from typing import Any
9from ..sbom.component import CompBuild, CompId
10from ..utils.reproducible import ReproducibleDateTime
11from ..vuln.cve import (
12 CveExtReference,
13 CveId,
14 CveVexAffectedAssessment,
15 CveVexAssessment,
16 CveVexFixedAssessment,
17 CveVexJustification,
18 CveVexMissingVersionsAssessment,
19 CveVexNotAffectedAssessment,
20 CveVexOldVersNotVulnAssessment,
21 CveVexRejectedAssessment,
22)
23from ..vuln.cvss import CvssMetric, CvssVersion
24from ..vuln.version import SemVerRange, Version, VersRangeStatus
25from .annot_base import AnnotDbEntry
26from .db_base import CveDbEntry
29class ObsoleteAssessment:
30 def __init__(
31 self,
32 obs_assess: CveVexAssessment,
33 obs_cve_entries: tuple[CveDbEntry, ...],
34 sec_assess: CveVexAssessment,
35 sec_cve_entries: tuple[CveDbEntry, ...],
36 msg_tpl: str | None,
37 ) -> None:
38 # Information about obsolete assessment (higher priority)
39 self._obs_assess = obs_assess
40 self._obs_cve_entries = obs_cve_entries
41 # Information about assessment with lower priority
42 self._sec_assess = sec_assess
43 self._sec_cve_entries = sec_cve_entries
44 # Message template to generate a description
45 if msg_tpl is None:
46 msg_tpl = (
47 "{cve_id} assessment {obs_assess}, from {obs_db_name}, is obsolete. "
48 "Override assessment {sec_assess} from {sec_db_name}"
49 )
50 self._msg_tpl: str = msg_tpl
52 def __str__(self) -> str:
53 cve_id = self._obs_cve_entries[0].identifier.id
54 obs_db_name = ", ".join(
55 cve_entry.database.name for cve_entry in self._obs_cve_entries
56 )
57 sec_db_name = ", ".join(
58 cve_entry.database.name for cve_entry in self._sec_cve_entries
59 )
61 return self._msg_tpl.format(
62 cve_id=cve_id,
63 obs_assess=self._obs_assess,
64 obs_db_name=obs_db_name,
65 sec_assess=self._sec_assess,
66 sec_db_name=sec_db_name,
67 )
70class AggregateAnnotEntry(AnnotDbEntry):
71 def __init__(
72 self, comp_build: CompBuild, cve_db_entries: tuple[tuple[CveDbEntry, ...], ...]
73 ) -> None:
74 super().__init__(parent_db=None)
75 self._comp_build = comp_build
76 self._cve_db_entries = cve_db_entries
77 self._timestamp = ReproducibleDateTime().now
78 self._cvss_metrics: list[CvssMetric] | None = None
79 self._assessment: CveVexAssessment | None = None
80 self._obsolete_assessments: list[ObsoleteAssessment] = []
81 self._cve_id: CveId = self._find_cve_id()
83 def _find_cve_id(self) -> CveId:
84 cve_id: CveId | None = None
85 for cve_entry in self._iterate_cve_entries():
86 if cve_id is None:
87 cve_id = cve_entry.identifier
88 elif cve_entry.identifier != cve_id:
89 raise RuntimeError("Aggregation of different CVE identifiers")
90 if cve_id is None:
91 raise ValueError("This object must contain at least one database entry")
92 return cve_id
94 @property
95 def identifier(self) -> CveId:
96 return self._cve_id
98 def is_rejected(self) -> bool | None:
99 rejected = None
100 for cve_entry in self._iterate_cve_entries():
101 r = cve_entry.is_rejected()
102 if r:
103 return True
104 if r is False:
105 rejected = False
106 return rejected
108 @property
109 def date_published(self) -> datetime | None:
110 date_published: datetime | None = None
111 for cve_entry in self._iterate_cve_entries():
112 entry_date = cve_entry.date_published
113 if entry_date and ((not date_published) or (entry_date < date_published)):
114 date_published = entry_date
115 return date_published
117 @property
118 def date_modified(self) -> datetime | None:
119 last_modified: datetime | None = None
120 for cve_entries in self._cve_db_entries:
121 for cve_entry in cve_entries:
122 entry_date = cve_entry.date_modified
123 if entry_date and ((not last_modified) or (entry_date > last_modified)):
124 last_modified = entry_date
125 if last_modified:
126 return last_modified
127 return None
129 def _iterate_cve_entries(self) -> Generator[CveDbEntry, None, None]:
130 for cve_entries in self._cve_db_entries:
131 yield from cve_entries
133 @property
134 def description(self) -> str | None:
135 for cve_entry in self._iterate_cve_entries():
136 desc = cve_entry.description
137 if desc:
138 return desc
139 return None
141 def get_associated_sem_ver_ranges(
142 self, comp_ids: Iterable[CompId]
143 ) -> Iterable[SemVerRange]:
144 """
145 Provides version ranges from the highest priority of CVE database entry
146 (excluding annotation)
147 """
148 vers_ranges: list[SemVerRange] = []
150 for cve_entries in self._cve_db_entries:
151 for cve_entry in cve_entries:
152 if not cve_entry.is_annotation():
153 vers_ranges.extend(
154 cve_entry.get_associated_sem_ver_ranges(comp_ids)
155 )
156 if vers_ranges:
157 break
159 return vers_ranges
161 @property
162 def cvss_metrics(self) -> list[CvssMetric]:
163 if self._cvss_metrics is not None:
164 return self._cvss_metrics
166 # Indicates the CVSS versions that were added from higher priorities
167 metrics_vers: set[CvssVersion] = set()
168 # We want to prioritize metrics with a source if this is the same metric
169 metrics_dict: dict[tuple[Any, ...], CvssMetric] = {}
171 for cve_entries in self._cve_db_entries:
172 # Indicates the CVSS versions that were added for this database priority
173 prio_vers: set[CvssVersion] = set()
174 for cve_entry in cve_entries:
175 for metric in cve_entry.cvss_metrics:
176 cmp_key = metric.cmp_key()
177 # If this CVSS version was not added in previous higher priorities
178 if metric.cvss_ver not in metrics_vers:
179 prev_metric = metrics_dict.get(cmp_key)
180 if (prev_metric is None) or (prev_metric.source is None):
181 metrics_dict[cmp_key] = metric
182 prio_vers.add(metric.cvss_ver)
183 metrics_vers.update(prio_vers)
185 self._cvss_metrics = list(metrics_dict.values())
186 return self._cvss_metrics
188 @property
189 def external_refs(self) -> set[CveExtReference]:
190 ext_refs = set()
191 for cve_entry in self._iterate_cve_entries():
192 ext_refs.update(cve_entry.external_refs)
193 return ext_refs
195 @property
196 def affected_sources(self) -> set[str]:
197 sources = set()
198 for cve_entry in self._iterate_cve_entries():
199 sources.update(cve_entry.affected_sources)
200 return sources
202 def _iterate_applicable_comp_ids(self) -> Generator[CompId, None, None]:
203 for cve_entry in self._iterate_cve_entries():
204 yield from cve_entry._iterate_applicable_comp_ids()
206 def _add_obsolete_assessment(
207 self,
208 obs_assess: CveVexAssessment,
209 obs_cve_entries: tuple[CveDbEntry, ...],
210 sec_assess: CveVexAssessment,
211 sec_cve_entries: tuple[CveDbEntry, ...],
212 msg_tpl: str | None = None,
213 ) -> None:
214 self._obsolete_assessments.append(
215 ObsoleteAssessment(
216 obs_assess, obs_cve_entries, sec_assess, sec_cve_entries, msg_tpl
217 )
218 )
220 def _vex_assessment_from_vers_ranges(
221 self, vers_ranges: Iterable[SemVerRange]
222 ) -> CveVexAssessment | None:
223 is_vuln_match = False
224 is_vuln_smaller = False
225 is_vuln_greater = False
226 is_not_vuln_match = False
227 is_not_vuln_trusted = False
228 is_maybe_vuln = False
229 version = Version(self._comp_build.version)
231 for vers_range in vers_ranges:
232 s = vers_range.check_in_range(version)
233 if vers_range.vulnerable:
234 if s == VersRangeStatus.IN_RANGE:
235 is_vuln_match = True
236 elif s == VersRangeStatus.SMALLER:
237 is_vuln_smaller = True
238 elif s == VersRangeStatus.GREATER:
239 is_vuln_greater = True
240 elif s == VersRangeStatus.IN_RANGE:
241 is_not_vuln_match = True
242 is_not_vuln_trusted = (
243 is_not_vuln_trusted or vers_range.is_not_vuln_trusted()
244 )
245 elif s != VersRangeStatus.NO_INFO:
246 is_maybe_vuln = True
248 if is_not_vuln_trusted:
249 # Consider that this version was patched
250 return CveVexFixedAssessment(
251 vex_version="1", status_notes="version-not-in-range"
252 )
254 if is_vuln_match:
255 # The version is within a vulnerable version range
256 return CveVexAffectedAssessment(
257 vex_version="1",
258 action_statement="Mitigation action unknown",
259 status_notes="version-in-range",
260 action_statement_time=self._timestamp,
261 )
263 if (
264 is_maybe_vuln
265 and (not is_vuln_smaller)
266 and (not is_vuln_greater)
267 and (not is_not_vuln_match)
268 ):
269 # This version is maybe vulnerable: This is very unlikely to enter this
270 # code path...
271 return CveVexAffectedAssessment(
272 vex_version="1",
273 action_statement="Check if really vulnerable",
274 status_notes="version-maybe-in-range",
275 action_statement_time=self._timestamp,
276 )
278 if is_vuln_smaller and (not is_vuln_greater) and (not is_not_vuln_match):
279 # Consider that this version was never impacted by this CVE (since too old)
280 return CveVexOldVersNotVulnAssessment(
281 vex_version="1",
282 status_notes="version-not-in-range",
283 )
285 if is_vuln_greater or is_not_vuln_match:
286 # Consider that this version was patched
287 return CveVexFixedAssessment(
288 vex_version="1", status_notes="version-not-in-range"
289 )
291 return None
293 def _vex_assessment_from_compiled_sources(
294 self, cve_entries: tuple[CveDbEntry, ...]
295 ) -> CveVexAssessment | None:
296 affected_files: set[str] = set()
297 for cve_entry in cve_entries:
298 affected_files.update(cve_entry.affected_sources)
300 if not affected_files:
301 return None
303 compiled_files = self._comp_build.compiled_sources_index
304 if not compiled_files:
305 return None
307 for affected_file in affected_files:
308 name = pathlib.Path(affected_file).name
309 paths = compiled_files.get(name)
310 if not paths:
311 continue
312 p = f"/{affected_file}" if (affected_file[:1] != "/") else affected_file
313 for path in paths:
314 if f"/{path}".endswith(p):
315 return None
317 return CveVexNotAffectedAssessment(
318 vex_version="1",
319 impact_statement="Code not compiled",
320 status_notes="not-applicable-platform",
321 impact_statement_time=self._timestamp,
322 justification=CveVexJustification.VULNERABLE_CODE_NOT_PRESENT,
323 )
325 def _vex_assessment_from_rejected_cve(self) -> CveVexAssessment | None:
326 # If the CVE is considered rejected by any databases (ignore priorities),
327 # generate an assessment.
328 assessment: CveVexAssessment | None = None
329 rej_cve_entries: list[CveDbEntry] = []
330 for cve_entries in reversed(self._cve_db_entries):
331 if not cve_entries:
332 continue
334 if not rej_cve_entries:
335 # Search if a database indicates that the CVE is rejected
336 rej_cve_entries.extend(
337 cve_entry for cve_entry in cve_entries if cve_entry.is_rejected()
338 )
339 if rej_cve_entries:
340 assessment = CveVexRejectedAssessment(
341 vex_version="1",
342 impact_statement="CVE rejected",
343 status_notes="cve-rejected",
344 impact_statement_time=self._timestamp,
345 justification=CveVexJustification.VULNERABLE_CODE_NOT_PRESENT,
346 )
347 else:
348 # Verify if an annotation database requires checking for obsolete
349 # assessments
350 if not self._is_obsolete_check_needed(cve_entries):
351 continue
353 cve_entry = cve_entries[0]
354 if not isinstance(cve_entry, AnnotDbEntry):
355 continue
357 assert assessment is not None
358 a = cve_entry.vex_assessment
359 if a is not None:
360 self._add_obsolete_assessment(
361 a,
362 cve_entries,
363 assessment,
364 tuple(rej_cve_entries),
365 "{cve_id} assessment {obs_assess}, from {obs_db_name}, "
366 "is obsolete since CVE is rejected by {sec_db_name}",
367 )
368 return assessment
370 @staticmethod
371 def _is_obsolete_check_needed(cve_entries: tuple[CveDbEntry, ...]) -> bool:
372 # Indicates if the obsolescence of the assessment, generated from these
373 # database entries, needs to be verified. Also check if this group of CVE
374 # entries contains annotation or CVE database entries and there are no mix up.
375 is_annotation: bool | None = None
376 obsolete_check: bool = False
378 for cve_entry in cve_entries:
379 obsolete_check = (
380 obsolete_check or cve_entry.database.obsolete_assessment_check_enabled
381 )
382 entry_is_annot = cve_entry.is_annotation()
383 if is_annotation is None:
384 is_annotation = entry_is_annot
385 elif is_annotation and entry_is_annot:
386 raise RuntimeError(
387 "Cannot have multiple annotation databases with same priority"
388 )
389 elif is_annotation != entry_is_annot:
390 raise RuntimeError(
391 "Cannot mix annotation with CVE database entry in the same priority"
392 )
394 assert is_annotation is not None
395 return obsolete_check
397 def _compute_assessment_from_cve_entries(
398 self, cve_entries: tuple[CveDbEntry, ...], obsolete_check: bool
399 ) -> tuple[CveVexAssessment | None, bool]:
400 assessment: CveVexAssessment | None = None
402 cve_entry = cve_entries[0]
403 if isinstance(cve_entry, AnnotDbEntry):
404 # If this is an annotation, just provide the annotation VEX assessment
405 assessment = cve_entry.vex_assessment
406 else:
407 # For CVE database entries, version ranges need to be merged
408 vers_ranges: list[SemVerRange] = []
409 for cve_entry in cve_entries:
410 vers_ranges.extend(
411 cve_entry.get_associated_sem_ver_ranges(
412 self._comp_build.identifiers
413 )
414 )
415 if vers_ranges:
416 assessment = self._vex_assessment_from_vers_ranges(vers_ranges)
418 # If no assessment was computed or if the component is vulnerable
419 if (assessment is None) or assessment.status.is_vulnerable():
420 # Then, if the CVE provides affected sources and if the components provide
421 # compiled sources files, use that to generate an assessment that indicates
422 # that the components are not affected
423 a = self._vex_assessment_from_compiled_sources(cve_entries)
424 if a is not None:
425 if obsolete_check and (assessment is not None):
426 # Indicates that the previously obtained assessment is obsolete,
427 # since affected sources are not compiled
428 self._add_obsolete_assessment(
429 assessment,
430 cve_entries,
431 a,
432 cve_entries,
433 "{cve_id} assessment {obs_assess}, from {obs_db_name}, "
434 "is obsolete since affected sources are not compiled",
435 )
436 # There is no need to continue to check for obsolete assessments
437 obsolete_check = False
438 assessment = a
440 return assessment, (obsolete_check if assessment is not None else False)
442 def _compute_assessment(self) -> None:
443 # First check if the CVE is rejected, which is a special case
444 assessment = self._vex_assessment_from_rejected_cve()
445 if assessment is not None:
446 self._assessment = assessment
447 return
449 # Variables associated with the computation of obsolete assessments
450 compute_next_assess: bool = True
451 prev_cve_entries: tuple[CveDbEntry, ...] = ()
452 prev_assessment: CveVexAssessment | None = None
454 # Next, iterate through each database priority (starting from the highest)
455 for cve_entries in self._cve_db_entries:
456 if not cve_entries:
457 continue
459 obsolete_check = self._is_obsolete_check_needed(cve_entries)
460 if compute_next_assess or obsolete_check:
461 a, obsolete_check = self._compute_assessment_from_cve_entries(
462 cve_entries, obsolete_check
463 )
464 # Keep only the first computed assessment
465 if assessment is None:
466 assessment = a
468 # Check if we need to compute next assessment
469 if obsolete_check:
470 compute_next_assess = True
471 elif a is not None:
472 compute_next_assess = False
474 # Verify if previously computed assessment is not obsolete
475 if (prev_assessment is not None) and (a is not None):
476 if (
477 prev_assessment.status.is_vulnerable()
478 == a.status.is_vulnerable()
479 ):
480 self._add_obsolete_assessment(
481 prev_assessment, prev_cve_entries, a, cve_entries
482 )
483 prev_cve_entries = ()
484 prev_assessment = None
486 if obsolete_check:
487 prev_cve_entries = cve_entries
488 prev_assessment = a
490 # If there is no assessment, provide a default one
491 if assessment is None:
492 assessment = CveVexMissingVersionsAssessment(
493 vex_version="1",
494 action_statement="Check package version",
495 status_notes="no-version-ranges",
496 action_statement_time=self._timestamp,
497 )
498 self._assessment = assessment
500 @property
501 def vex_assessment(self) -> CveVexAssessment:
502 if self._assessment is None:
503 self._compute_assessment()
504 assert self._assessment is not None
505 return self._assessment
507 @property
508 def obsolete_assessments(self) -> list[ObsoleteAssessment]:
509 if self._assessment is None:
510 self._compute_assessment()
511 return self._obsolete_assessments