Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/cve_db/annot_simple.py: 95%
76 statements
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
1# -*- coding: utf-8 -*-
3import pathlib
4from collections import defaultdict
5from collections.abc import Generator
6from datetime import datetime
7from typing import Any
9import yaml # type: ignore[import-untyped]
11from ..sbom.component import CompId
12from ..utils import parsing
13from ..vuln.cve import (
14 CveId,
15 CveVexAffectedAssessment,
16 CveVexAssessment,
17 CveVexNotAffectedAssessment,
18)
19from .annot_base import AnnotDbEntry
20from .db_base import CveDatabase, CveDatabaseT
21from .db_git import GitAnnotDatabase
22from .registry import register_cve_db
25class SimpleCveAnnotEntry(AnnotDbEntry):
26 """
27 Represent one annotation. The file contains the following mandatory fields:
28 - comment:
29 - cve-product:
30 - last-review:
31 - versions:
32 - vulnerable:
33 """
35 def __init__(
36 self, parent_db: CveDatabase, path_yml: pathlib.Path, annot_info: dict[str, str]
37 ) -> None:
38 super().__init__(parent_db)
39 self._cve_id = CveId(path_yml.stem)
40 self._annot = annot_info
42 @property
43 def identifier(self) -> CveId:
44 return self._cve_id
46 @property
47 def date_modified(self) -> datetime | None:
48 return datetime.fromisoformat(self._annot["last-review"])
50 @property
51 def description(self) -> str | None:
52 return None
54 @property
55 def vulnerable(self) -> bool | None:
56 vuln_val = self._annot.get("vulnerable")
57 return vuln_val if isinstance(vuln_val, bool) else (vuln_val != "no")
59 @property
60 def vex_assessment(self) -> CveVexAssessment | None:
61 comment = self._annot.get("comment", "")
62 if self.vulnerable:
63 return CveVexAffectedAssessment(
64 action_statement=comment, action_statement_time=self.date_modified
65 )
66 return CveVexNotAffectedAssessment(
67 impact_statement=comment, impact_statement_time=self.date_modified
68 )
70 def _get_vers_comp_ids(self, pkg: Any) -> dict[str | None, set[CompId]]:
71 vers_comp_ids: dict[str | None, set[CompId]] = defaultdict(set)
72 comp_id = CompId.build_from_vendor_product_str(self._annot["cve-product"])
74 for vers in self._annot["versions"]:
75 vers_comp_ids[str(vers)].add(comp_id)
77 return vers_comp_ids
80@register_cve_db("simple-annotations")
81class SimpleCveAnnotDatabase(GitAnnotDatabase):
82 @classmethod
83 def create_from_config(cls: type[CveDatabaseT], **kwargs: Any) -> CveDatabaseT:
84 parsing.update_list_param(kwargs, "globs", required=True)
85 return super().create_from_config(**kwargs)
87 def __init__(
88 self, path: pathlib.Path, name: str, globs: list[str], **kwargs: Any
89 ) -> None:
90 super().__init__(path, name, **kwargs)
91 self._annotations: dict[CveId, SimpleCveAnnotEntry] = {}
92 self._annot_globs = globs
94 def _create_glob_pattern(self, annot_glob: str) -> str:
95 if annot_glob.endswith((".yaml", ".yml")):
96 return annot_glob
97 if annot_glob.endswith("/"):
98 return f"{annot_glob}CVE-*-*.yaml"
99 if self._db_dir.joinpath(annot_glob).is_dir():
100 return f"{annot_glob}/CVE-*-*.yaml"
101 return annot_glob
103 def _initialize(self) -> None:
104 super()._initialize()
105 self._annotations.clear()
107 for annot_glob in self._annot_globs:
108 glob_pattern = self._create_glob_pattern(annot_glob)
109 for path_yml in self._db_dir.glob(glob_pattern):
110 with path_yml.open(encoding="utf-8") as f:
111 a = SimpleCveAnnotEntry(self, path_yml, yaml.safe_load(f))
112 self._annotations[a.identifier] = a
114 def _get_index_cache_invalidation_data(self) -> bytes:
115 return str(self._annot_globs).encode()
117 def get_cve(self, cve_id: CveId) -> SimpleCveAnnotEntry | None:
118 return self._annotations.get(cve_id)
120 def iterate_cves(self) -> Generator[SimpleCveAnnotEntry, None, None]:
121 yield from self._annotations.values()