Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/cve_db/db_cvelist.py: 90%
198 statements
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
1# -*- coding: utf-8 -*-
2# SPDX-License-Identifier: GPL-2.0-only
4import json
5import pathlib
6from collections.abc import Generator, Iterable
7from datetime import datetime
8from typing import Any
10from ..sbom.component import CompId
11from ..utils import parsing
12from ..vuln.cpe import parse_cpe_match
13from ..vuln.cve import CveExtReference, CveId
14from ..vuln.cvss import CvssMetric, CvssVersion
15from ..vuln.version import SemVerRange, VersionRange, VersionRangeBuilder
16from .db_base import CveDatabase, CveDbEntry
17from .db_git import GitCveDatabase
18from .registry import register_cve_db
21class CveListCveEntry(CveDbEntry):
22 def __init__(self, parent_db: CveDatabase, json_obj: dict[str, Any]) -> None:
23 super().__init__(parent_db)
24 self._json = json_obj
26 @property
27 def identifier(self) -> CveId:
28 return CveId(self._json.get("cveMetadata", {}).get("cveId"))
30 def is_annotation(self) -> bool:
31 return False
33 def is_rejected(self) -> bool:
34 state = str(self._json.get("cveMetadata", {}).get("state"))
35 return state == "REJECTED"
37 @property
38 def date_published(self) -> datetime | None:
39 return parsing.datetime_from_iso_format(
40 self._json.get("cveMetadata", {}).get("datePublished")
41 )
43 @property
44 def date_modified(self) -> datetime | None:
45 return parsing.datetime_from_iso_format(
46 self._json.get("cveMetadata", {}).get("dateUpdated")
47 )
49 def _iterate_containers(self) -> Generator[dict[str, Any], None, None]:
50 containers_obj = self._json.get("containers", {})
51 cna = containers_obj.get("cna")
52 if cna is not None:
53 yield cna
54 yield from containers_obj.get("adp", [])
56 @staticmethod
57 def _container_description(container: dict[str, Any]) -> str | None:
58 descriptions = container.get("descriptions")
59 if not descriptions:
60 return None
62 for desc in descriptions:
63 if desc.get("lang") == "en":
64 txt = desc.get("value")
65 if txt:
66 return str(txt)
68 d = descriptions[0].get("value")
69 return d if isinstance(d, str) else None
71 @staticmethod
72 def _container_cna_org(container: dict[str, Any]) -> str | None:
73 """:return: CNA organization identifier for this container"""
74 org = container.get("providerMetadata", {}).get("orgId")
75 return org if isinstance(org, str) else None
77 @property
78 def description(self) -> str | None:
79 for c in self._iterate_containers():
80 desc = self._container_description(c)
81 if desc is not None:
82 return desc
84 return None
86 @staticmethod
87 def _iterate_cpe_matches(
88 container: dict[str, Any],
89 ) -> Generator[dict[str, Any], None, None]:
90 for cpe_applicability in container.get("cpeApplicability", []):
91 for node in cpe_applicability.get("nodes", []):
92 yield from node.get("cpeMatch", [])
94 @staticmethod
95 def _parse_affected_version(
96 json_obj: dict[str, Any], cna_org: str | None
97 ) -> VersionRange | None:
98 status = json_obj.get("status")
99 if status == "affected":
100 is_vuln = True
101 elif status == "unaffected":
102 is_vuln = False
103 else:
104 is_vuln = None
106 vers_type: str | None = json_obj.get("versionType")
107 vers_range_builder = VersionRangeBuilder(
108 vulnerable=is_vuln, from_cpe=False, from_cna=cna_org
109 )
111 v_end_le = json_obj.get("lessThanOrEqual")
112 if v_end_le:
113 vers_range_builder.set_end_version(v_end_le, including=True)
115 v_end_lt = json_obj.get("lessThan")
116 if v_end_lt:
117 vers_range_builder.set_end_version(v_end_lt, including=False)
119 vers_val = json_obj.get("version")
120 if vers_val is None:
121 return None
123 if v_end_le or v_end_lt or (vers_type == "git"):
124 vers_range_builder.set_start_version(vers_val, including=True)
125 else:
126 vers_range_builder.set_start_version(vers_val, equal=True)
128 if vers_range_builder.is_valid():
129 if vers_type == "git":
130 return vers_range_builder.git_ver_range
131 return vers_range_builder.sem_ver_range
133 return None
135 def _iterate_applicable_comp_ids(self) -> Generator[CompId, None, None]:
136 for c in self._iterate_containers():
137 # Process affected blocks
138 for affected in c.get("affected", []):
139 # Check if there is at least one affected version
140 is_vuln = False
141 for version in affected.get("versions", []):
142 if version.get("status") == "affected":
143 is_vuln = True
144 break
146 # If there is no vulnerable version, do not check CPEs
147 if not is_vuln:
148 continue
150 # Extract CPEs and package/component name
151 for cpe in affected.get("cpes", []):
152 comp_id = CompId.build_from_cpe(cpe)
153 if comp_id is not None:
154 yield comp_id
156 pkg_name = affected.get("packageName")
157 if pkg_name:
158 yield CompId(name=pkg_name)
160 # Process cpeMatch blocks
161 for cpe_match in self._iterate_cpe_matches(c):
162 if cpe_match.get("vulnerable", True):
163 comp_id = CompId.build_from_cpe(cpe_match.get("criteria"))
164 if comp_id is not None:
165 yield comp_id
167 def get_associated_sem_ver_ranges(
168 self, comp_ids: Iterable[CompId]
169 ) -> Iterable[SemVerRange]:
170 vers_ranges: list[SemVerRange] = []
171 cpe_applicable_comp_ids: set[CompId] = set()
173 # First iterate over cpeMatch blocks
174 for c in self._iterate_containers():
175 cna_org = self._container_cna_org(c)
177 for cpe_match_json in self._iterate_cpe_matches(c):
178 cpe, vers_range = parse_cpe_match(cpe_match_json, cna_org)
179 comp_id = CompId.build_from_cpe(cpe)
180 if (comp_id is not None) and (vers_range is not None):
181 if vers_range.vulnerable:
182 cpe_applicable_comp_ids.add(comp_id)
183 if isinstance(
184 vers_range, SemVerRange
185 ) and comp_id.is_matching_one_of(comp_ids):
186 vers_ranges.append(vers_range)
188 # Then, if there is only a unique component identifier specified, we could use
189 # this component identifier for affected versions with missing CPE
190 accept_version_without_comp_id = (len(cpe_applicable_comp_ids) == 1) and (
191 len(vers_ranges) > 0
192 )
194 # Iterate over affected blocks
195 for c in self._iterate_containers():
196 cna_org = self._container_cna_org(c)
198 for affected in c.get("affected", []):
199 # Extract CPEs and package name to check if this block is applicable to
200 # specified component identifier
201 is_applicable = None if accept_version_without_comp_id else False
202 for cpe in affected.get("cpes", []):
203 comp_id = CompId.build_from_cpe(cpe)
204 if comp_id is not None:
205 is_applicable = comp_id.is_matching_one_of(comp_ids)
206 if is_applicable:
207 break
209 if not is_applicable:
210 pkg_name = affected.get("packageName")
211 if pkg_name:
212 is_applicable = CompId(name=pkg_name).is_matching_one_of(
213 comp_ids
214 )
216 # If this block is not applicable, do not parse it, skip it
217 if is_applicable is False:
218 continue
220 # And finally parse each version
221 for version in affected.get("versions", []):
222 vers_range = self._parse_affected_version(version, cna_org)
223 if (vers_range is not None) and isinstance(vers_range, SemVerRange):
224 vers_ranges.append(vers_range)
226 return vers_ranges
228 @property
229 def cvss_metrics(self) -> list[CvssMetric]:
230 def add_cvss_to_set(
231 m: list[CvssMetric], cvss_obj: dict[str, Any], cvss_ver: CvssVersion
232 ) -> None:
233 if cvss_obj is not None:
234 cvss_info = CvssMetric.parse_cve_db_metric(cvss_obj, version=cvss_ver)
235 if cvss_info is not None:
236 m.append(cvss_info)
238 cvss_metrics: list[CvssMetric] = []
239 for c in self._iterate_containers():
240 for metric in c.get("metrics", []):
241 add_cvss_to_set(cvss_metrics, metric.get("cvssV2_0"), CvssVersion.V2_0)
242 add_cvss_to_set(cvss_metrics, metric.get("cvssV3_0"), CvssVersion.V3_0)
243 add_cvss_to_set(cvss_metrics, metric.get("cvssV3_1"), CvssVersion.V3_1)
244 add_cvss_to_set(cvss_metrics, metric.get("cvssV4_0"), CvssVersion.V4_0)
246 return cvss_metrics
248 @property
249 def external_refs(self) -> set[CveExtReference]:
250 ext_refs = set()
251 for c in self._iterate_containers():
252 for ext_ref in c.get("references", []):
253 url = ext_ref.get("url")
254 if url:
255 ext_refs.add(CveExtReference(url))
257 return ext_refs
259 @property
260 def affected_sources(self) -> set[str]:
261 sources = set()
262 for c in self._iterate_containers():
263 for affected in c.get("affected", []):
264 sources.update(affected.get("programFiles", []))
265 return sources
268@register_cve_db("cve-db-cvelist")
269class CveListCveDatabase(GitCveDatabase):
270 GIT_FETCH_URL = "https://github.com/CVEProject/cvelistV5.git"
272 def __init__(
273 self, path: pathlib.Path, name: str, git_url: str = GIT_FETCH_URL, **kwargs: Any
274 ) -> None:
275 super().__init__(path, name, git_url, **kwargs)
277 def get_default_priority(self, order: int, from_config: bool) -> int:
278 return 50
280 def get_cve(self, cve_id: CveId) -> CveDbEntry | None:
281 path_json = self._git_dir.joinpath(
282 "cves", cve_id.year, cve_id.number[:-3] + "xxx", f"{cve_id.id}.json"
283 )
284 if path_json.is_file():
285 with path_json.open(encoding="utf-8") as f:
286 return CveListCveEntry(self, json.load(f))
287 return None
289 def iterate_cves(self) -> Generator[CveDbEntry, None, None]:
290 for path_json in self._git_dir.glob("cves/*/*/CVE-*.json"):
291 with path_json.open(encoding="utf-8") as f:
292 yield CveListCveEntry(self, json.load(f))