Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/cve_db/annot_simple.py: 95%

76 statements  

« prev     ^ index     » next       coverage.py v7.11.1, created at 2025-11-28 15:37 +0100

1# -*- coding: utf-8 -*- 

2 

3import pathlib 

4from collections import defaultdict 

5from collections.abc import Generator 

6from datetime import datetime 

7from typing import Any 

8 

9import yaml # type: ignore[import-untyped] 

10 

11from ..sbom.component import CompId 

12from ..utils import parsing 

13from ..vuln.cve import ( 

14 CveId, 

15 CveVexAffectedAssessment, 

16 CveVexAssessment, 

17 CveVexNotAffectedAssessment, 

18) 

19from .annot_base import AnnotDbEntry 

20from .db_base import CveDatabase, CveDatabaseT 

21from .db_git import GitAnnotDatabase 

22from .registry import register_cve_db 

23 

24 

25class SimpleCveAnnotEntry(AnnotDbEntry): 

26 """ 

27 Represent one annotation. The file contains the following mandatory fields: 

28 - comment: 

29 - cve-product: 

30 - last-review: 

31 - versions: 

32 - vulnerable: 

33 """ 

34 

35 def __init__( 

36 self, parent_db: CveDatabase, path_yml: pathlib.Path, annot_info: dict[str, str] 

37 ) -> None: 

38 super().__init__(parent_db) 

39 self._cve_id = CveId(path_yml.stem) 

40 self._annot = annot_info 

41 

42 @property 

43 def identifier(self) -> CveId: 

44 return self._cve_id 

45 

46 @property 

47 def date_modified(self) -> datetime | None: 

48 return datetime.fromisoformat(self._annot["last-review"]) 

49 

50 @property 

51 def description(self) -> str | None: 

52 return None 

53 

54 @property 

55 def vulnerable(self) -> bool | None: 

56 vuln_val = self._annot.get("vulnerable") 

57 return vuln_val if isinstance(vuln_val, bool) else (vuln_val != "no") 

58 

59 @property 

60 def vex_assessment(self) -> CveVexAssessment | None: 

61 comment = self._annot.get("comment", "") 

62 if self.vulnerable: 

63 return CveVexAffectedAssessment( 

64 action_statement=comment, action_statement_time=self.date_modified 

65 ) 

66 return CveVexNotAffectedAssessment( 

67 impact_statement=comment, impact_statement_time=self.date_modified 

68 ) 

69 

70 def _get_vers_comp_ids(self, pkg: Any) -> dict[str | None, set[CompId]]: 

71 vers_comp_ids: dict[str | None, set[CompId]] = defaultdict(set) 

72 comp_id = CompId.build_from_vendor_product_str(self._annot["cve-product"]) 

73 

74 for vers in self._annot["versions"]: 

75 vers_comp_ids[str(vers)].add(comp_id) 

76 

77 return vers_comp_ids 

78 

79 

80@register_cve_db("simple-annotations") 

81class SimpleCveAnnotDatabase(GitAnnotDatabase): 

82 @classmethod 

83 def create_from_config(cls: type[CveDatabaseT], **kwargs: Any) -> CveDatabaseT: 

84 parsing.update_list_param(kwargs, "globs", required=True) 

85 return super().create_from_config(**kwargs) 

86 

87 def __init__( 

88 self, path: pathlib.Path, name: str, globs: list[str], **kwargs: Any 

89 ) -> None: 

90 super().__init__(path, name, **kwargs) 

91 self._annotations: dict[CveId, SimpleCveAnnotEntry] = {} 

92 self._annot_globs = globs 

93 

94 def _create_glob_pattern(self, annot_glob: str) -> str: 

95 if annot_glob.endswith((".yaml", ".yml")): 

96 return annot_glob 

97 if annot_glob.endswith("/"): 

98 return f"{annot_glob}CVE-*-*.yaml" 

99 if self._db_dir.joinpath(annot_glob).is_dir(): 

100 return f"{annot_glob}/CVE-*-*.yaml" 

101 return annot_glob 

102 

103 def _initialize(self) -> None: 

104 super()._initialize() 

105 self._annotations.clear() 

106 

107 for annot_glob in self._annot_globs: 

108 glob_pattern = self._create_glob_pattern(annot_glob) 

109 for path_yml in self._db_dir.glob(glob_pattern): 

110 with path_yml.open(encoding="utf-8") as f: 

111 a = SimpleCveAnnotEntry(self, path_yml, yaml.safe_load(f)) 

112 self._annotations[a.identifier] = a 

113 

114 def _get_index_cache_invalidation_data(self) -> bytes: 

115 return str(self._annot_globs).encode() 

116 

117 def get_cve(self, cve_id: CveId) -> SimpleCveAnnotEntry | None: 

118 return self._annotations.get(cve_id) 

119 

120 def iterate_cves(self) -> Generator[SimpleCveAnnotEntry, None, None]: 

121 yield from self._annotations.values()