Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/cve_db/db_base.py: 98%
99 statements
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
1# -*- coding: utf-8 -*-
2# SPDX-License-Identifier: GPL-2.0-only
4import abc
5import datetime
6from collections import defaultdict
7from collections.abc import Callable, Generator, Iterable
8from typing import Any, Optional, Self, TypeVar, final
10from ..sbom.component import CompId
11from ..vuln.cve import CveExtReference, CveId
12from ..vuln.cvss import CvssMetric, GroupByT, group_cvss_metrics
13from ..vuln.version import SemVerRange
16class CveDbEntry(abc.ABC):
17 def __init__(self, parent_db: Optional["CveDatabase"]) -> None:
18 self._parent_db = parent_db
20 @property
21 def database(self) -> "CveDatabase":
22 if self._parent_db is None:
23 raise RuntimeError("This entry has no associated database")
24 return self._parent_db
26 @property
27 @abc.abstractmethod
28 def identifier(self) -> CveId:
29 """The CVE unique identifier"""
31 def is_annotation(self) -> bool:
32 """Return True if this is an annotation, False if this a CVE database entry"""
33 return False
35 def is_rejected(self) -> bool | None:
36 """Indicates if the CVE entry is in rejected state. None if unknown."""
37 return None
39 @property
40 def date_published(self) -> datetime.datetime | None:
41 """Specifies the time when the vulnerability was published."""
42 return None
44 @property
45 @abc.abstractmethod
46 def date_modified(self) -> datetime.datetime | None:
47 """Specifies a time when the vulnerability assessment was modified"""
49 @property
50 @abc.abstractmethod
51 def description(self) -> str | None:
52 """Return the CVE description, and if possible in english"""
54 @abc.abstractmethod
55 def get_associated_sem_ver_ranges(
56 self, comp_ids: Iterable[CompId]
57 ) -> Iterable[SemVerRange]:
58 """
59 For the specified component identifiers, which identify one package/product,
60 list all associated version ranges.
61 :return: A list of version ranges
62 """
64 @property
65 def cvss_metrics(self) -> list[CvssMetric]:
66 """Returns a list of CVSS metrics associated with this CVE"""
67 return []
69 def group_cvss_metrics(
70 self, key: Callable[[CvssMetric], GroupByT]
71 ) -> dict[GroupByT, tuple[CvssMetric, ...]]:
72 return group_cvss_metrics(self.cvss_metrics, key)
74 @property
75 def external_refs(self) -> set[CveExtReference]:
76 """Provides external references: URLs associated with this CVE"""
77 return set()
79 @property
80 def affected_sources(self) -> set[str]:
81 """A list of the affected source code files"""
82 return set()
84 @abc.abstractmethod
85 def _iterate_applicable_comp_ids(self) -> Generator[CompId, None, None]:
86 """
87 Iterate over all "applicable" component identifiers that can be constructed for
88 example from CPEs:
89 - For a CVE database this list only the vulnerable CPE.
90 - For an annotation this list everything (vulnerable or not-vulnerable
91 identifiers).
92 """
94 def get_loosely_applicable_comp_ids(self, name: str) -> set[CompId]:
95 """
96 Find all the component identifiers that loosely match (and which are applicable)
97 with only component name specified as input.
98 """
99 applicable_ids: set[CompId] = set()
101 for comp_id_cve in self._iterate_applicable_comp_ids():
102 if comp_id_cve.name == name:
103 applicable_ids.add(comp_id_cve)
105 return applicable_ids
107 def get_applicable_comp_names(self) -> set[str]:
108 """
109 Provides the list of component names, of "applicable" packages, that can be
110 extracted from referenced CPEs.
111 This function is used to index CVE by product name (ignoring vendor part, ...):
112 The goal is to reduce the number of CVEs that need to be checked for a
113 particular component (package/product).
114 """
115 products = set()
116 for comp_id in self._iterate_applicable_comp_ids():
117 if isinstance(comp_id.name, str):
118 products.add(comp_id.name)
119 return products
122CveDatabaseT = TypeVar("CveDatabaseT", bound="CveDatabase")
125class CveDatabase(abc.ABC):
126 # Do not index rejected CVE since there are not exported (By default).
127 # Also, the database entry is empty, at least for CVEList and NVD databases.
128 INDEX_REJECTED_CVE = False
130 @classmethod
131 def create_from_config(cls, **kwargs: Any) -> Self:
132 return cls(**kwargs)
134 def __init__(self, name: str) -> None:
135 self._name: str = name
136 self.__initialized = False
138 @property
139 def name(self) -> str:
140 """:return: A description or name of the database"""
141 return self._name
143 def get_default_priority(self, order: int, from_config: bool) -> int:
144 """
145 :return: Get default priority value based from where the database was declared
146 (Command line or config file), and from the declaration order (last has a
147 higher priority)
148 """
149 assert 0 <= order < 100
150 return order + (200 if from_config else 100)
152 @final
153 def initialize(self) -> None:
154 """Initialize this database if not already done"""
155 if not self.__initialized:
156 self._initialize()
157 self.__initialized = True
159 @abc.abstractmethod
160 def _initialize(self) -> None:
161 """Initialize this database"""
163 @property
164 def has_annotations(self) -> bool:
165 """
166 Return True if this database manage a pool of annotations, False if it manages
167 CVE database entries
168 """
169 return False
171 @property
172 def obsolete_assessment_check_enabled(self) -> bool:
173 """
174 Return True if the CVE database entry, associated with this database, should be
175 checked for obsolescence:
176 A CVE entry of a lower priority provides the same kind of VEX assessment.
177 This doesn't make much sense for a CVE database, but for annotation it makes a
178 lot of sense.
179 """
180 return False
182 @abc.abstractmethod
183 def get_cve(self, cve_id: CveId) -> CveDbEntry | None:
184 """
185 Return the CVE object representing this CVE identifier
186 (if present in the database).
187 """
189 @abc.abstractmethod
190 def iterate_cves(self) -> Generator[CveDbEntry, None, None]:
191 """Iterate over all CVE present in the database"""
193 def create_index(self) -> dict[str, set[str]]:
194 """
195 Create a database index, mapping component/product name to a set of CVE
196 identifier. The goal of this index is to speed up CVE look-up
197 """
198 index_comp_cve: dict[str, set[str]] = defaultdict(set)
199 for cve_entry in self.iterate_cves():
200 if self.INDEX_REJECTED_CVE or (not cve_entry.is_rejected()):
201 for comp_name in cve_entry.get_applicable_comp_names():
202 index_comp_cve[comp_name].add(cve_entry.identifier.id)
203 return index_comp_cve