Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/cve_db/db_nvd.py: 96%
101 statements
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
1# -*- coding: utf-8 -*-
2# SPDX-License-Identifier: GPL-2.0-only
4import json
5import pathlib
6from collections.abc import Generator, Iterable
7from datetime import datetime
8from typing import Any
10from ..sbom.component import CompId
11from ..utils import parsing
12from ..vuln.cpe import parse_cpe_match
13from ..vuln.cve import CveExtReference, CveId
14from ..vuln.cvss import CvssMetric, CvssVersion
15from ..vuln.version import SemVerRange
16from .db_base import CveDatabase, CveDbEntry
17from .db_git import GitCveDatabase
18from .registry import register_cve_db
21class NvdCveEntry(CveDbEntry):
22 def __init__(self, parent_db: CveDatabase, json_obj: dict[str, Any]) -> None:
23 super().__init__(parent_db)
24 self._json = json_obj
26 @property
27 def identifier(self) -> CveId:
28 return CveId(self._json["id"])
30 def is_annotation(self) -> bool:
31 return False
33 def is_rejected(self) -> bool:
34 return self._json.get("vulnStatus") == "Rejected"
36 @property
37 def date_published(self) -> datetime | None:
38 return parsing.datetime_from_iso_format(self._json.get("published"))
40 @property
41 def date_modified(self) -> datetime | None:
42 return parsing.datetime_from_iso_format(self._json.get("lastModified"))
44 @property
45 def description(self) -> str | None:
46 descriptions = self._json.get("descriptions")
47 if not descriptions:
48 return None
50 for desc in descriptions:
51 if desc.get("lang") == "en":
52 txt = desc.get("value")
53 if txt:
54 return str(txt)
56 d = descriptions[0].get("value")
57 return d if isinstance(d, str) else None
59 def _iterate_cpe_matches(self) -> Generator[dict[str, Any], None, None]:
60 for cpe_applicability in self._json.get("configurations", []):
61 for node in cpe_applicability.get("nodes", []):
62 yield from node.get("cpeMatch", [])
64 def _iterate_applicable_comp_ids(self) -> Generator[CompId, None, None]:
65 for cpe_match in self._iterate_cpe_matches():
66 if cpe_match.get("vulnerable", True):
67 comp_id = CompId.build_from_cpe(cpe_match.get("criteria"))
68 if comp_id is not None:
69 yield comp_id
71 def get_associated_sem_ver_ranges(
72 self, comp_ids: Iterable[CompId]
73 ) -> Iterable[SemVerRange]:
74 vers_ranges: list[SemVerRange] = []
76 # Get CNA organization identifier
77 cna_org = self._json.get("sourceIdentifier")
79 for cpe_match_json in self._iterate_cpe_matches():
80 cpe, vers_range = parse_cpe_match(cpe_match_json, cna_org)
81 comp_id = CompId.build_from_cpe(cpe)
82 if (
83 (comp_id is not None)
84 and isinstance(vers_range, SemVerRange)
85 and comp_id.is_matching_one_of(comp_ids)
86 ):
87 vers_ranges.append(vers_range)
89 return vers_ranges
91 @property
92 def cvss_metrics(self) -> list[CvssMetric]:
93 def add_cvss_to_set(
94 m: list[CvssMetric], cvss_objs: list[dict[str, Any]], cvss_ver: CvssVersion
95 ) -> None:
96 for cvss_obj in cvss_objs:
97 cvss_info = CvssMetric.parse_cve_db_metric(
98 cvss_obj.get("cvssData"),
99 source=cvss_obj.get("source"),
100 version=cvss_ver,
101 )
102 if cvss_info is not None:
103 m.append(cvss_info)
105 cvss_metrics: list[CvssMetric] = []
106 metric = self._json.get("metrics", {})
107 add_cvss_to_set(cvss_metrics, metric.get("cvssMetricV2", []), CvssVersion.V2_0)
108 add_cvss_to_set(cvss_metrics, metric.get("cvssMetricV30", []), CvssVersion.V3_0)
109 add_cvss_to_set(cvss_metrics, metric.get("cvssMetricV31", []), CvssVersion.V3_1)
110 add_cvss_to_set(cvss_metrics, metric.get("cvssMetricV40", []), CvssVersion.V4_0)
112 return cvss_metrics
114 @property
115 def external_refs(self) -> set[CveExtReference]:
116 ext_refs = set()
118 for ext_ref in self._json.get("references", []):
119 url = ext_ref.get("url")
120 if url:
121 ext_refs.add(CveExtReference(url))
123 return ext_refs
126@register_cve_db("cve-db-nvd-fkie")
127class NvdFkieCveDatabase(GitCveDatabase):
128 GIT_FETCH_URL = "https://github.com/fkie-cad/nvd-json-data-feeds.git"
130 def __init__(
131 self, path: pathlib.Path, name: str, git_url: str = GIT_FETCH_URL, **kwargs: Any
132 ) -> None:
133 super().__init__(path, name, git_url, **kwargs)
135 def get_default_priority(self, order: int, from_config: bool) -> int:
136 return 50
138 def get_cve(self, cve_id: CveId) -> CveDbEntry | None:
139 path_json = self._git_dir.joinpath(
140 f"CVE-{cve_id.year}", cve_id.id[:-2] + "xx", f"{cve_id.id}.json"
141 )
142 if path_json.is_file():
143 with path_json.open(encoding="utf-8") as f:
144 return NvdCveEntry(self, json.load(f))
145 return None
147 def iterate_cves(self) -> Generator[CveDbEntry, None, None]:
148 for path_json in self._git_dir.glob("CVE-*/CVE-*xx/CVE-*.json"):
149 with path_json.open(encoding="utf-8") as f:
150 yield NvdCveEntry(self, json.load(f))