Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/cve_db/annot_base.py: 91%
75 statements
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
1# -*- coding: utf-8 -*-
2# SPDX-License-Identifier: GPL-2.0-only
4import abc
5from collections import defaultdict
6from collections.abc import Generator, Iterable
7from typing import Any
9from sbom_cve_check.vuln.cpe import Cpe23
10from sbom_cve_check.vuln.version import SemVerRange
12from ..sbom.component import CompId
13from ..utils import parsing
14from ..vuln.cve import CveVexAssessment
15from .db_base import CveDatabase, CveDatabaseT, CveDbEntry
18class AnnotDbEntry(CveDbEntry, abc.ABC):
19 """Represent one abstract annotation"""
21 def is_annotation(self) -> bool:
22 return True
24 @property
25 @abc.abstractmethod
26 def vex_assessment(self) -> CveVexAssessment | None:
27 """Provide the CVE VEX assessment"""
29 @property
30 def vulnerable(self) -> bool | None:
31 a = self.vex_assessment
32 if a:
33 return a.status.is_vulnerable()
34 return None
36 def _iterate_cpes(self, pkg: Any) -> Generator[Cpe23, None, None]:
37 """
38 Iterate over all CPEs that are provided by this annotation
39 """
40 raise NotImplementedError
42 def _get_vers_comp_ids(self, pkg: Any) -> dict[str | None, set[CompId]]:
43 """
44 Get all associated component identifiers with version that can be constructed
45 for example from CPEs
46 """
47 vers_comp_ids: dict[str | None, set[CompId]] = defaultdict(set)
49 for cpe in self._iterate_cpes(pkg):
50 comp_id = CompId.build_from_cpe(cpe)
51 if comp_id is not None:
52 vers = cpe.version if isinstance(cpe.version, str) else None
53 vers_comp_ids[vers].add(comp_id)
55 return vers_comp_ids
57 def _get_associated_sem_ver_ranges(
58 self, comp_ids: Iterable[CompId], pkg_version: str | None, pkg: Any = None
59 ) -> set[SemVerRange]:
60 vers_ranges: set[SemVerRange] = set()
61 is_vulnerable = self.vulnerable
62 is_vuln = True if is_vulnerable is None else is_vulnerable
64 add_pkg_version = False
65 for vers, ids in self._get_vers_comp_ids(pkg).items():
66 for comp_id in ids:
67 if comp_id.is_matching_one_of(comp_ids):
68 if vers:
69 vers_ranges.add(
70 SemVerRange.build_single_vers(is_vuln, vers, True)
71 )
72 else:
73 add_pkg_version = True
75 if add_pkg_version and pkg_version:
76 vers_ranges.add(SemVerRange.build_single_vers(is_vuln, pkg_version, False))
78 return vers_ranges
80 def get_associated_sem_ver_ranges(
81 self, comp_ids: Iterable[CompId]
82 ) -> Iterable[SemVerRange]:
83 return self._get_associated_sem_ver_ranges(comp_ids, None)
85 def _iterate_applicable_comp_ids(self) -> Generator[CompId, None, None]:
86 for ids in self._get_vers_comp_ids(None).values():
87 yield from ids
89 def is_version_applicable(
90 self, comp_ids: Iterable[CompId], version: str | None
91 ) -> bool:
92 """
93 Indicates if this CVE annotation is applicable to the specified version:
94 The specified version must strictly match with one of the annotation's versions
95 """
96 if version is None:
97 return True
98 for version_range in self.get_associated_sem_ver_ranges(comp_ids):
99 if version_range.check_strictly_equal(version):
100 return True
101 return False
104class AnnotDatabase(CveDatabase, abc.ABC):
105 # By default, do not check for obsolete assessment, since it can be slow
106 DEFAULT_OBSOLETE_ASSESSMENT_CHECK = False
108 @classmethod
109 def create_from_config(cls: type[CveDatabaseT], **kwargs: object) -> CveDatabaseT:
110 parsing.update_boolean_param(kwargs, "obsolete_assessment_check")
111 return super().create_from_config(**kwargs)
113 def __init__(
114 self, name: str, obsolete_assessment_check: bool | None = None
115 ) -> None:
116 super().__init__(name)
117 if obsolete_assessment_check is None:
118 obsolete_assessment_check = self.DEFAULT_OBSOLETE_ASSESSMENT_CHECK
119 self.enable_obsolete_assessment_check = obsolete_assessment_check
121 @property
122 def has_annotations(self) -> bool:
123 return True
125 @property
126 def obsolete_assessment_check_enabled(self) -> bool:
127 return self.enable_obsolete_assessment_check