Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/cve_db/annot_yocto.py: 79%
112 statements
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
1# -*- coding: utf-8 -*-
2# SPDX-License-Identifier: GPL-2.0-only
4import json
5import pathlib
6from collections.abc import Generator, Iterable
7from datetime import UTC, datetime
8from typing import Any
10from ..sbom.component import CompId
11from ..utils import parsing
12from ..vuln.cpe import Cpe23
13from ..vuln.cve import (
14 CveId,
15 CveVexAffectedAssessment,
16 CveVexAssessment,
17 CveVexFixedAssessment,
18 CveVexJustification,
19 CveVexNotAffectedAssessment,
20)
21from ..vuln.cvss import CvssMetric, CvssVersion
22from ..vuln.version import SemVerRange
23from .annot_base import AnnotDatabase, AnnotDbEntry
24from .db_base import CveDatabase
25from .registry import register_cve_db
28class YoctoAnnotEntry(AnnotDbEntry):
29 """Represent one Yocto statement"""
31 def __init__(
32 self,
33 parent_db: CveDatabase,
34 pkg_obj: dict[str, Any],
35 issue_obj: dict[str, Any],
36 doc_timestamp: datetime,
37 ) -> None:
38 super().__init__(parent_db)
39 self._pkg_obj = pkg_obj
40 self._issue_obj = issue_obj
41 self._timestamp = (
42 parsing.datetime_from_iso_format(self._issue_obj.get("modified"))
43 or doc_timestamp
44 )
46 @property
47 def identifier(self) -> CveId:
48 return CveId(self._issue_obj["id"])
50 @property
51 def date_modified(self) -> datetime | None:
52 return self._timestamp
54 @property
55 def description(self) -> str | None:
56 return self._issue_obj.get("summary")
58 @property
59 def cvss_metrics(self) -> list[CvssMetric]:
60 vect: str | None = self._issue_obj.get("vectorString")
61 if not vect:
62 return []
64 score_str: str | None = self._issue_obj.get("scorev4")
65 if score_str and score_str != "0.0" and vect.startswith("CVSS:4.0/"):
66 score = float(score_str)
67 sev = CvssMetric.compute_severity_from_score(score)
68 return [CvssMetric(CvssVersion.V4_0, score, vect, sev)]
70 score_str = self._issue_obj.get("scorev3")
71 if score_str and score_str != "0.0":
72 score = float(score_str)
73 sev = CvssMetric.compute_severity_from_score(score)
74 if vect.startswith("CVSS:3.1/"):
75 return [CvssMetric(CvssVersion.V3_1, score, vect, sev)]
76 if vect.startswith("CVSS:3.0/"):
77 return [CvssMetric(CvssVersion.V3_0, score, vect, sev)]
79 score_str = self._issue_obj.get("scorev2")
80 if score_str and score_str != "0.0":
81 score = float(score_str)
82 return [CvssMetric(CvssVersion.V2_0, score, vect)]
84 return []
86 def _get_patch_files_msg(self, detail: str | None) -> str | None:
87 if detail != "fix-file-included":
88 return None
90 files = self._issue_obj.get("patch-file")
91 if not files:
92 return None
94 lst_files: list[str] = []
95 if isinstance(files, str):
96 lst_files.append(pathlib.Path(files).name)
97 elif isinstance(files, list):
98 lst_files.extend(pathlib.Path(f).name for f in files)
100 return f"Fixed by: {lst_files}" if lst_files else None
102 @property
103 def vex_assessment(self) -> CveVexAssessment | None:
104 status = self._issue_obj.get("status")
105 detail = self._issue_obj.get("detail")
106 description = self._issue_obj.get("description")
108 if status == "Patched":
109 patches = self._get_patch_files_msg(detail)
110 return CveVexFixedAssessment(
111 status_notes=": ".join(v for v in (detail, description, patches) if v)
112 )
114 if status == "Unpatched":
115 return CveVexAffectedAssessment(
116 status_notes=": ".join(v for v in (detail, description) if v),
117 action_statement="Mitigation action unknown",
118 action_statement_time=self._timestamp,
119 )
121 if status == "Ignored":
122 justification = None
123 if detail == "cpe-incorrect":
124 justification = CveVexJustification.COMPONENT_NOT_PRESENT
125 elif detail == "disputed":
126 justification = CveVexJustification.VULNERABLE_CODE_NOT_PRESENT
127 elif detail in ("not-applicable-config", "not-applicable-platform"):
128 justification = CveVexJustification.VULNERABLE_CODE_NOT_IN_EXECUTE_PATH
130 return CveVexNotAffectedAssessment(
131 status_notes=detail,
132 impact_statement=description or "",
133 impact_statement_time=self._timestamp,
134 justification=justification,
135 )
137 return None
139 def _iterate_cpes(self, pkg: Any) -> Generator[Cpe23, None, None]:
140 for cpe_str in self._pkg_obj.get("cpes", []):
141 cpe = Cpe23.parse(cpe_str)
142 if cpe:
143 yield cpe
145 def get_associated_sem_ver_ranges(
146 self, comp_ids: Iterable[CompId]
147 ) -> Iterable[SemVerRange]:
148 return self._get_associated_sem_ver_ranges(
149 comp_ids, self._pkg_obj.get("version")
150 )
153@register_cve_db("yocto-vex-manifest")
154class YoctoAnnotDatabase(AnnotDatabase):
155 def __init__(self, path: pathlib.Path, name: str, **kwargs: Any) -> None:
156 super().__init__(name, **kwargs)
157 self._manifest_file = path.resolve(strict=True)
158 self._statements: dict[CveId, YoctoAnnotEntry] = {}
160 def _initialize(self) -> None:
161 self._statements.clear()
163 with self._manifest_file.open(encoding="utf-8") as f:
164 doc = json.load(f)
166 doc_timestamp = datetime.fromtimestamp(
167 self._manifest_file.stat().st_mtime, tz=UTC
168 )
170 for pkg in doc.get("package", []):
171 for issue in pkg.get("issue", []):
172 a = YoctoAnnotEntry(self, pkg, issue, doc_timestamp)
173 if a.identifier.id in self._statements:
174 raise RuntimeError(
175 f"Multiple statements for the same CVE: {a.identifier.id}"
176 )
177 self._statements[a.identifier] = a
179 def get_cve(self, cve_id: CveId) -> YoctoAnnotEntry | None:
180 return self._statements.get(cve_id)
182 def iterate_cves(self) -> Generator[YoctoAnnotEntry, None, None]:
183 yield from self._statements.values()