Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/cve_db/annot_openvex.py: 95%
97 statements
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
1# -*- coding: utf-8 -*-
2# SPDX-License-Identifier: GPL-2.0-only
4import json
5import pathlib
6from collections.abc import Generator
7from datetime import datetime
8from typing import Any
10from ..utils import parsing
11from ..vuln.cpe import Cpe23
12from ..vuln.cve import (
13 CveId,
14 CveVexAffectedAssessment,
15 CveVexAssessment,
16 CveVexFixedAssessment,
17 CveVexJustification,
18 CveVexNotAffectedAssessment,
19 CveVexStatus,
20 CveVexUnderInvestigationAssessment,
21)
22from .annot_base import AnnotDbEntry
23from .db_base import CveDatabase, CveDatabaseT
24from .db_git import GitAnnotDatabase
25from .registry import register_cve_db
28def cve_vex_justification_from_str(s: str | None) -> CveVexJustification | None:
29 map_justification: dict[str, CveVexJustification] = {
30 "component_not_present": CveVexJustification.COMPONENT_NOT_PRESENT,
31 "vulnerable_code_not_present": CveVexJustification.VULNERABLE_CODE_NOT_PRESENT,
32 "inline_mitigations_already_exist":
33 CveVexJustification.INLINE_MITIGATIONS_ALREADY_EXIST,
34 "vulnerable_code_cannot_be_controlled_by_adversary":
35 CveVexJustification.VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY,
36 "vulnerable_code_not_in_execute_path":
37 CveVexJustification.VULNERABLE_CODE_NOT_IN_EXECUTE_PATH,
38 } # fmt: skip
40 return map_justification.get(s) if s else None
43class OpenVexAnnotEntry(AnnotDbEntry):
44 """Represent one OpenVEX statement"""
46 def __init__(
47 self,
48 parent_db: CveDatabase,
49 json_obj: dict[str, Any],
50 doc_timestamp: datetime | None = None,
51 ) -> None:
52 super().__init__(parent_db)
53 self._json = json_obj
54 self._doc_timestamp = doc_timestamp
56 @property
57 def identifier(self) -> CveId:
58 return CveId(self._json.get("vulnerability", {}).get("name"))
60 @property
61 def date_modified(self) -> datetime | None:
62 return parsing.datetime_from_iso_format(self._json.get("last_updated"))
64 @property
65 def description(self) -> str | None:
66 d = self._json.get("vulnerability", {}).get("description")
67 return d if isinstance(d, str) else None
69 @property
70 def vex_assessment(self) -> CveVexAssessment | None:
71 version = self._json.get("version")
72 notes = self._json.get("status_notes")
73 status = CveVexStatus(self._json.get("status"))
74 timestamp = (
75 parsing.datetime_from_iso_format(self._json.get("timestamp"))
76 or self._doc_timestamp
77 )
79 if status == CveVexStatus.NOT_AFFECTED:
80 statement = self._json.get("impact_statement", "")
81 justification = self._json.get("justification")
83 return CveVexNotAffectedAssessment(
84 vex_version=version,
85 status_notes=notes,
86 impact_statement=statement,
87 impact_statement_time=timestamp,
88 justification=cve_vex_justification_from_str(justification),
89 )
91 if status == CveVexStatus.AFFECTED:
92 statement = self._json.get("action_statement", "")
93 statement_time = self._json.get("action_statement_timestamp")
94 if not statement_time:
95 statement_time = timestamp
96 return CveVexAffectedAssessment(
97 vex_version=version,
98 status_notes=notes,
99 action_statement=statement,
100 action_statement_time=statement_time,
101 )
103 if status == CveVexStatus.FIXED:
104 return CveVexFixedAssessment(vex_version=version, status_notes=notes)
106 if status == CveVexStatus.UNDER_INVESTIGATION:
107 return CveVexUnderInvestigationAssessment(
108 vex_version=version, status_notes=notes
109 )
111 return None
113 def _iterate_cpes(self, pkg: Any) -> Generator[Cpe23, None, None]:
114 for product in self._json.get("products", []):
115 cpe = Cpe23.parse(product.get("identifiers", {}).get("cpe23"))
116 if cpe:
117 yield cpe
120@register_cve_db("openvex-dir")
121class OpenVexAnnotDatabase(GitAnnotDatabase):
122 @classmethod
123 def create_from_config(cls: type[CveDatabaseT], **kwargs: Any) -> CveDatabaseT:
124 parsing.update_list_param(kwargs, "globs", required=True)
125 return super().create_from_config(**kwargs)
127 def __init__(
128 self, path: pathlib.Path, name: str, globs: list[str], **kwargs: Any
129 ) -> None:
130 super().__init__(path, name, **kwargs)
131 self._vex_globs = globs
132 self._statements: dict[CveId, OpenVexAnnotEntry] = {}
134 def iterate_file_statements(
135 self, path_vex: pathlib.Path
136 ) -> Generator[OpenVexAnnotEntry, None, None]:
137 with path_vex.open(encoding="utf-8") as f:
138 doc = json.load(f)
139 doc_timestamp = parsing.datetime_from_iso_format(doc.get("timestamp"))
140 for s in doc.get("statements", []):
141 yield OpenVexAnnotEntry(self, s, doc_timestamp)
143 def _iterate_vex_files(self) -> Generator[pathlib.Path, None, None]:
144 """Iterate over all VEX files managed by this database"""
145 for vex_glob in self._vex_globs:
146 yield from self._db_dir.glob(vex_glob)
148 def _initialize(self) -> None:
149 self._statements.clear()
150 for path_json in self._iterate_vex_files():
151 for a in self.iterate_file_statements(path_json):
152 if a.identifier.id in self._statements:
153 raise RuntimeError(
154 f"Multiple statements for the same CVE: {a.identifier.id}"
155 )
156 self._statements[a.identifier] = a
158 def _get_index_cache_invalidation_data(self) -> bytes:
159 return str(self._vex_globs).encode()
161 def get_cve(self, cve_id: CveId) -> OpenVexAnnotEntry | None:
162 return self._statements.get(cve_id)
164 def iterate_cves(self) -> Generator[OpenVexAnnotEntry, None, None]:
165 yield from self._statements.values()
168@register_cve_db("openvex-file")
169class OpenVexFileAnnotDatabase(OpenVexAnnotDatabase):
170 @classmethod
171 def create_from_config(cls: type[CveDatabaseT], **kwargs: Any) -> CveDatabaseT:
172 kwargs["globs"] = []
173 return super().create_from_config(**kwargs)
175 def __init__(self, path: pathlib.Path, name: str, **kwargs: Any) -> None:
176 kwargs.pop("globs", None)
177 super().__init__(path.parent, name, globs=[path.name], **kwargs)