Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/cve_db/db_nvd.py: 96%

101 statements  

« prev     ^ index     » next       coverage.py v7.11.1, created at 2025-11-28 15:37 +0100

1# -*- coding: utf-8 -*- 

2# SPDX-License-Identifier: GPL-2.0-only 

3 

4import json 

5import pathlib 

6from collections.abc import Generator, Iterable 

7from datetime import datetime 

8from typing import Any 

9 

10from ..sbom.component import CompId 

11from ..utils import parsing 

12from ..vuln.cpe import parse_cpe_match 

13from ..vuln.cve import CveExtReference, CveId 

14from ..vuln.cvss import CvssMetric, CvssVersion 

15from ..vuln.version import SemVerRange 

16from .db_base import CveDatabase, CveDbEntry 

17from .db_git import GitCveDatabase 

18from .registry import register_cve_db 

19 

20 

21class NvdCveEntry(CveDbEntry): 

22 def __init__(self, parent_db: CveDatabase, json_obj: dict[str, Any]) -> None: 

23 super().__init__(parent_db) 

24 self._json = json_obj 

25 

26 @property 

27 def identifier(self) -> CveId: 

28 return CveId(self._json["id"]) 

29 

30 def is_annotation(self) -> bool: 

31 return False 

32 

33 def is_rejected(self) -> bool: 

34 return self._json.get("vulnStatus") == "Rejected" 

35 

36 @property 

37 def date_published(self) -> datetime | None: 

38 return parsing.datetime_from_iso_format(self._json.get("published")) 

39 

40 @property 

41 def date_modified(self) -> datetime | None: 

42 return parsing.datetime_from_iso_format(self._json.get("lastModified")) 

43 

44 @property 

45 def description(self) -> str | None: 

46 descriptions = self._json.get("descriptions") 

47 if not descriptions: 

48 return None 

49 

50 for desc in descriptions: 

51 if desc.get("lang") == "en": 

52 txt = desc.get("value") 

53 if txt: 

54 return str(txt) 

55 

56 d = descriptions[0].get("value") 

57 return d if isinstance(d, str) else None 

58 

59 def _iterate_cpe_matches(self) -> Generator[dict[str, Any], None, None]: 

60 for cpe_applicability in self._json.get("configurations", []): 

61 for node in cpe_applicability.get("nodes", []): 

62 yield from node.get("cpeMatch", []) 

63 

64 def _iterate_applicable_comp_ids(self) -> Generator[CompId, None, None]: 

65 for cpe_match in self._iterate_cpe_matches(): 

66 if cpe_match.get("vulnerable", True): 

67 comp_id = CompId.build_from_cpe(cpe_match.get("criteria")) 

68 if comp_id is not None: 

69 yield comp_id 

70 

71 def get_associated_sem_ver_ranges( 

72 self, comp_ids: Iterable[CompId] 

73 ) -> Iterable[SemVerRange]: 

74 vers_ranges: list[SemVerRange] = [] 

75 

76 # Get CNA organization identifier 

77 cna_org = self._json.get("sourceIdentifier") 

78 

79 for cpe_match_json in self._iterate_cpe_matches(): 

80 cpe, vers_range = parse_cpe_match(cpe_match_json, cna_org) 

81 comp_id = CompId.build_from_cpe(cpe) 

82 if ( 

83 (comp_id is not None) 

84 and isinstance(vers_range, SemVerRange) 

85 and comp_id.is_matching_one_of(comp_ids) 

86 ): 

87 vers_ranges.append(vers_range) 

88 

89 return vers_ranges 

90 

91 @property 

92 def cvss_metrics(self) -> list[CvssMetric]: 

93 def add_cvss_to_set( 

94 m: list[CvssMetric], cvss_objs: list[dict[str, Any]], cvss_ver: CvssVersion 

95 ) -> None: 

96 for cvss_obj in cvss_objs: 

97 cvss_info = CvssMetric.parse_cve_db_metric( 

98 cvss_obj.get("cvssData"), 

99 source=cvss_obj.get("source"), 

100 version=cvss_ver, 

101 ) 

102 if cvss_info is not None: 

103 m.append(cvss_info) 

104 

105 cvss_metrics: list[CvssMetric] = [] 

106 metric = self._json.get("metrics", {}) 

107 add_cvss_to_set(cvss_metrics, metric.get("cvssMetricV2", []), CvssVersion.V2_0) 

108 add_cvss_to_set(cvss_metrics, metric.get("cvssMetricV30", []), CvssVersion.V3_0) 

109 add_cvss_to_set(cvss_metrics, metric.get("cvssMetricV31", []), CvssVersion.V3_1) 

110 add_cvss_to_set(cvss_metrics, metric.get("cvssMetricV40", []), CvssVersion.V4_0) 

111 

112 return cvss_metrics 

113 

114 @property 

115 def external_refs(self) -> set[CveExtReference]: 

116 ext_refs = set() 

117 

118 for ext_ref in self._json.get("references", []): 

119 url = ext_ref.get("url") 

120 if url: 

121 ext_refs.add(CveExtReference(url)) 

122 

123 return ext_refs 

124 

125 

126@register_cve_db("cve-db-nvd-fkie") 

127class NvdFkieCveDatabase(GitCveDatabase): 

128 GIT_FETCH_URL = "https://github.com/fkie-cad/nvd-json-data-feeds.git" 

129 

130 def __init__( 

131 self, path: pathlib.Path, name: str, git_url: str = GIT_FETCH_URL, **kwargs: Any 

132 ) -> None: 

133 super().__init__(path, name, git_url, **kwargs) 

134 

135 def get_default_priority(self, order: int, from_config: bool) -> int: 

136 return 50 

137 

138 def get_cve(self, cve_id: CveId) -> CveDbEntry | None: 

139 path_json = self._git_dir.joinpath( 

140 f"CVE-{cve_id.year}", cve_id.id[:-2] + "xx", f"{cve_id.id}.json" 

141 ) 

142 if path_json.is_file(): 

143 with path_json.open(encoding="utf-8") as f: 

144 return NvdCveEntry(self, json.load(f)) 

145 return None 

146 

147 def iterate_cves(self) -> Generator[CveDbEntry, None, None]: 

148 for path_json in self._git_dir.glob("CVE-*/CVE-*xx/CVE-*.json"): 

149 with path_json.open(encoding="utf-8") as f: 

150 yield NvdCveEntry(self, json.load(f))