Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/cve_db/db_base.py: 98%

99 statements  

« prev     ^ index     » next       coverage.py v7.11.1, created at 2025-11-28 15:37 +0100

1# -*- coding: utf-8 -*- 

2# SPDX-License-Identifier: GPL-2.0-only 

3 

4import abc 

5import datetime 

6from collections import defaultdict 

7from collections.abc import Callable, Generator, Iterable 

8from typing import Any, Optional, Self, TypeVar, final 

9 

10from ..sbom.component import CompId 

11from ..vuln.cve import CveExtReference, CveId 

12from ..vuln.cvss import CvssMetric, GroupByT, group_cvss_metrics 

13from ..vuln.version import SemVerRange 

14 

15 

16class CveDbEntry(abc.ABC): 

17 def __init__(self, parent_db: Optional["CveDatabase"]) -> None: 

18 self._parent_db = parent_db 

19 

20 @property 

21 def database(self) -> "CveDatabase": 

22 if self._parent_db is None: 

23 raise RuntimeError("This entry has no associated database") 

24 return self._parent_db 

25 

26 @property 

27 @abc.abstractmethod 

28 def identifier(self) -> CveId: 

29 """The CVE unique identifier""" 

30 

31 def is_annotation(self) -> bool: 

32 """Return True if this is an annotation, False if this a CVE database entry""" 

33 return False 

34 

35 def is_rejected(self) -> bool | None: 

36 """Indicates if the CVE entry is in rejected state. None if unknown.""" 

37 return None 

38 

39 @property 

40 def date_published(self) -> datetime.datetime | None: 

41 """Specifies the time when the vulnerability was published.""" 

42 return None 

43 

44 @property 

45 @abc.abstractmethod 

46 def date_modified(self) -> datetime.datetime | None: 

47 """Specifies a time when the vulnerability assessment was modified""" 

48 

49 @property 

50 @abc.abstractmethod 

51 def description(self) -> str | None: 

52 """Return the CVE description, and if possible in english""" 

53 

54 @abc.abstractmethod 

55 def get_associated_sem_ver_ranges( 

56 self, comp_ids: Iterable[CompId] 

57 ) -> Iterable[SemVerRange]: 

58 """ 

59 For the specified component identifiers, which identify one package/product, 

60 list all associated version ranges. 

61 :return: A list of version ranges 

62 """ 

63 

64 @property 

65 def cvss_metrics(self) -> list[CvssMetric]: 

66 """Returns a list of CVSS metrics associated with this CVE""" 

67 return [] 

68 

69 def group_cvss_metrics( 

70 self, key: Callable[[CvssMetric], GroupByT] 

71 ) -> dict[GroupByT, tuple[CvssMetric, ...]]: 

72 return group_cvss_metrics(self.cvss_metrics, key) 

73 

74 @property 

75 def external_refs(self) -> set[CveExtReference]: 

76 """Provides external references: URLs associated with this CVE""" 

77 return set() 

78 

79 @property 

80 def affected_sources(self) -> set[str]: 

81 """A list of the affected source code files""" 

82 return set() 

83 

84 @abc.abstractmethod 

85 def _iterate_applicable_comp_ids(self) -> Generator[CompId, None, None]: 

86 """ 

87 Iterate over all "applicable" component identifiers that can be constructed for 

88 example from CPEs: 

89 - For a CVE database this list only the vulnerable CPE. 

90 - For an annotation this list everything (vulnerable or not-vulnerable 

91 identifiers). 

92 """ 

93 

94 def get_loosely_applicable_comp_ids(self, name: str) -> set[CompId]: 

95 """ 

96 Find all the component identifiers that loosely match (and which are applicable) 

97 with only component name specified as input. 

98 """ 

99 applicable_ids: set[CompId] = set() 

100 

101 for comp_id_cve in self._iterate_applicable_comp_ids(): 

102 if comp_id_cve.name == name: 

103 applicable_ids.add(comp_id_cve) 

104 

105 return applicable_ids 

106 

107 def get_applicable_comp_names(self) -> set[str]: 

108 """ 

109 Provides the list of component names, of "applicable" packages, that can be 

110 extracted from referenced CPEs. 

111 This function is used to index CVE by product name (ignoring vendor part, ...): 

112 The goal is to reduce the number of CVEs that need to be checked for a 

113 particular component (package/product). 

114 """ 

115 products = set() 

116 for comp_id in self._iterate_applicable_comp_ids(): 

117 if isinstance(comp_id.name, str): 

118 products.add(comp_id.name) 

119 return products 

120 

121 

122CveDatabaseT = TypeVar("CveDatabaseT", bound="CveDatabase") 

123 

124 

125class CveDatabase(abc.ABC): 

126 # Do not index rejected CVE since there are not exported (By default). 

127 # Also, the database entry is empty, at least for CVEList and NVD databases. 

128 INDEX_REJECTED_CVE = False 

129 

130 @classmethod 

131 def create_from_config(cls, **kwargs: Any) -> Self: 

132 return cls(**kwargs) 

133 

134 def __init__(self, name: str) -> None: 

135 self._name: str = name 

136 self.__initialized = False 

137 

138 @property 

139 def name(self) -> str: 

140 """:return: A description or name of the database""" 

141 return self._name 

142 

143 def get_default_priority(self, order: int, from_config: bool) -> int: 

144 """ 

145 :return: Get default priority value based from where the database was declared 

146 (Command line or config file), and from the declaration order (last has a 

147 higher priority) 

148 """ 

149 assert 0 <= order < 100 

150 return order + (200 if from_config else 100) 

151 

152 @final 

153 def initialize(self) -> None: 

154 """Initialize this database if not already done""" 

155 if not self.__initialized: 

156 self._initialize() 

157 self.__initialized = True 

158 

159 @abc.abstractmethod 

160 def _initialize(self) -> None: 

161 """Initialize this database""" 

162 

163 @property 

164 def has_annotations(self) -> bool: 

165 """ 

166 Return True if this database manage a pool of annotations, False if it manages 

167 CVE database entries 

168 """ 

169 return False 

170 

171 @property 

172 def obsolete_assessment_check_enabled(self) -> bool: 

173 """ 

174 Return True if the CVE database entry, associated with this database, should be 

175 checked for obsolescence: 

176 A CVE entry of a lower priority provides the same kind of VEX assessment. 

177 This doesn't make much sense for a CVE database, but for annotation it makes a 

178 lot of sense. 

179 """ 

180 return False 

181 

182 @abc.abstractmethod 

183 def get_cve(self, cve_id: CveId) -> CveDbEntry | None: 

184 """ 

185 Return the CVE object representing this CVE identifier 

186 (if present in the database). 

187 """ 

188 

189 @abc.abstractmethod 

190 def iterate_cves(self) -> Generator[CveDbEntry, None, None]: 

191 """Iterate over all CVE present in the database""" 

192 

193 def create_index(self) -> dict[str, set[str]]: 

194 """ 

195 Create a database index, mapping component/product name to a set of CVE 

196 identifier. The goal of this index is to speed up CVE look-up 

197 """ 

198 index_comp_cve: dict[str, set[str]] = defaultdict(set) 

199 for cve_entry in self.iterate_cves(): 

200 if self.INDEX_REJECTED_CVE or (not cve_entry.is_rejected()): 

201 for comp_name in cve_entry.get_applicable_comp_names(): 

202 index_comp_cve[comp_name].add(cve_entry.identifier.id) 

203 return index_comp_cve