Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/export/manager.py: 97%

72 statements  

« prev     ^ index     » next       coverage.py v7.11.1, created at 2025-11-28 15:37 +0100

1# -*- coding: utf-8 -*- 

2# SPDX-License-Identifier: GPL-2.0-only 

3 

4import contextlib 

5import logging 

6from collections.abc import Generator, Iterable 

7 

8from ..cve_db.annot_aggregate import AggregateAnnotEntry 

9from ..cve_db.manager import CveDbManager 

10from ..sbom.sbom_base import Sbom 

11from ..vuln.cve import ( 

12 CveVexMissingVersionsAssessment, 

13 CveVexRejectedAssessment, 

14 CveVexStatus, 

15) 

16from .export_base import BaseExport 

17 

18_logger = logging.getLogger(__name__) 

19 

20 

21class ExportManager: 

22 def __init__(self, sbom: Sbom, cve_db_manager: CveDbManager) -> None: 

23 self._sbom = sbom 

24 self._manager = cve_db_manager 

25 self._exports: list[BaseExport] = [] 

26 self._filter_rejected_cve: bool = True 

27 self._filter_status: tuple[CveVexStatus, ...] | None = None 

28 self._filter_no_cvss_score: bool = False 

29 self._filter_cvss_score: float | None = None 

30 self._filter_missing_versions: bool = False 

31 self._add_kernel_modules: bool = False 

32 

33 def cfg_filter_rejected_cve(self, enable: bool) -> None: 

34 """If enable is true, do not export rejected CVE (which is the default)""" 

35 self._filter_rejected_cve = enable 

36 

37 def cfg_filter_status(self, status: Iterable[CveVexStatus]) -> None: 

38 """ 

39 Allow to configure export filter, to only export CVEs with specific VEX status 

40 """ 

41 self._filter_status = tuple(status) 

42 

43 def cfg_filter_no_cvss_score(self, enable: bool) -> None: 

44 """If enable is true, do not export CVE with missing CVSS score""" 

45 self._filter_no_cvss_score = enable 

46 

47 def cfg_filter_cvss_score(self, threshold: float | None) -> None: 

48 """ 

49 Only export CVE with CVSS score greater or equal to the specified threshold 

50 """ 

51 self._filter_cvss_score = threshold 

52 

53 def cfg_filter_missing_versions(self, enable: bool) -> None: 

54 """ 

55 If enable is true, do not export CVE with missing version ranges information 

56 """ 

57 self._filter_missing_versions = enable 

58 

59 def cfg_add_kernel_modules(self, enable: bool) -> None: 

60 """If enable is true, add CVE annotations to all kernel modules""" 

61 self._add_kernel_modules = enable 

62 

63 def _is_cve_filtered(self, annotation: AggregateAnnotEntry) -> bool: 

64 """Allow to check if this CVE needs to be exported""" 

65 assessment = annotation.vex_assessment 

66 

67 if self._filter_rejected_cve and isinstance( 

68 assessment, CveVexRejectedAssessment 

69 ): 

70 return True 

71 

72 if self._filter_status and (assessment.status not in self._filter_status): 

73 return True 

74 

75 if self._filter_missing_versions and isinstance( 

76 assessment, CveVexMissingVersionsAssessment 

77 ): 

78 return True 

79 

80 if self._filter_no_cvss_score and (not annotation.cvss_metrics): 

81 return True 

82 

83 return bool( 

84 self._filter_cvss_score is not None 

85 and not any( 

86 metric.score >= self._filter_cvss_score 

87 for metric in annotation.cvss_metrics 

88 ) 

89 ) 

90 

91 def add_exporter(self, e: BaseExport) -> None: 

92 """ 

93 Add an exporter. 

94 

95 Each exporter should generate an output (file, or something on stdout, ...) 

96 """ 

97 self._exports.append(e) 

98 

99 def process_export(self) -> None: 

100 """ 

101 Process the export, use as input the SBOM to extract the components, and 

102 then for each component get applicable CVE. 

103 """ 

104 

105 main_gens: dict[BaseExport, Generator[None, None, None]] = {} 

106 

107 for e in self._exports: 

108 _logger.info("Starting the export to %s", e.output_path.name) 

109 e.cfg_add_kernel_modules(self._add_kernel_modules) 

110 main_gen = e.start_export() 

111 next(main_gen) 

112 main_gens[e] = main_gen 

113 

114 for comp_build in self._sbom.iterate_component_builds(): 

115 _logger.info("Processing build: %s", comp_build.build_name) 

116 # Initialize generator 

117 gens: list[Generator[None, tuple[bool, AggregateAnnotEntry], None]] = [] 

118 for e in self._exports: 

119 with contextlib.suppress(StopIteration): 

120 gen = e.export_comp_info(comp_build) 

121 next(gen) 

122 gens.append(gen) 

123 

124 # Export each annotation 

125 for annotation in self._manager.get_applicable_cves(comp_build): 

126 is_filtered = self._is_cve_filtered(annotation) 

127 _logger.debug( 

128 " - Exporting %s%s", 

129 annotation.identifier, 

130 "" if is_filtered else " (filtered)", 

131 ) 

132 for gen in gens: 

133 gen.send((is_filtered, annotation)) 

134 

135 # Close generator 

136 for gen in gens: 

137 gen.close() 

138 

139 for e, main_gen in main_gens.items(): 

140 _logger.info("Finalizing the export to %s", e.output_path.name) 

141 with contextlib.suppress(StopIteration): 

142 next(main_gen)