Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/export/manager.py: 97%
72 statements
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
1# -*- coding: utf-8 -*-
2# SPDX-License-Identifier: GPL-2.0-only
4import contextlib
5import logging
6from collections.abc import Generator, Iterable
8from ..cve_db.annot_aggregate import AggregateAnnotEntry
9from ..cve_db.manager import CveDbManager
10from ..sbom.sbom_base import Sbom
11from ..vuln.cve import (
12 CveVexMissingVersionsAssessment,
13 CveVexRejectedAssessment,
14 CveVexStatus,
15)
16from .export_base import BaseExport
18_logger = logging.getLogger(__name__)
21class ExportManager:
22 def __init__(self, sbom: Sbom, cve_db_manager: CveDbManager) -> None:
23 self._sbom = sbom
24 self._manager = cve_db_manager
25 self._exports: list[BaseExport] = []
26 self._filter_rejected_cve: bool = True
27 self._filter_status: tuple[CveVexStatus, ...] | None = None
28 self._filter_no_cvss_score: bool = False
29 self._filter_cvss_score: float | None = None
30 self._filter_missing_versions: bool = False
31 self._add_kernel_modules: bool = False
33 def cfg_filter_rejected_cve(self, enable: bool) -> None:
34 """If enable is true, do not export rejected CVE (which is the default)"""
35 self._filter_rejected_cve = enable
37 def cfg_filter_status(self, status: Iterable[CveVexStatus]) -> None:
38 """
39 Allow to configure export filter, to only export CVEs with specific VEX status
40 """
41 self._filter_status = tuple(status)
43 def cfg_filter_no_cvss_score(self, enable: bool) -> None:
44 """If enable is true, do not export CVE with missing CVSS score"""
45 self._filter_no_cvss_score = enable
47 def cfg_filter_cvss_score(self, threshold: float | None) -> None:
48 """
49 Only export CVE with CVSS score greater or equal to the specified threshold
50 """
51 self._filter_cvss_score = threshold
53 def cfg_filter_missing_versions(self, enable: bool) -> None:
54 """
55 If enable is true, do not export CVE with missing version ranges information
56 """
57 self._filter_missing_versions = enable
59 def cfg_add_kernel_modules(self, enable: bool) -> None:
60 """If enable is true, add CVE annotations to all kernel modules"""
61 self._add_kernel_modules = enable
63 def _is_cve_filtered(self, annotation: AggregateAnnotEntry) -> bool:
64 """Allow to check if this CVE needs to be exported"""
65 assessment = annotation.vex_assessment
67 if self._filter_rejected_cve and isinstance(
68 assessment, CveVexRejectedAssessment
69 ):
70 return True
72 if self._filter_status and (assessment.status not in self._filter_status):
73 return True
75 if self._filter_missing_versions and isinstance(
76 assessment, CveVexMissingVersionsAssessment
77 ):
78 return True
80 if self._filter_no_cvss_score and (not annotation.cvss_metrics):
81 return True
83 return bool(
84 self._filter_cvss_score is not None
85 and not any(
86 metric.score >= self._filter_cvss_score
87 for metric in annotation.cvss_metrics
88 )
89 )
91 def add_exporter(self, e: BaseExport) -> None:
92 """
93 Add an exporter.
95 Each exporter should generate an output (file, or something on stdout, ...)
96 """
97 self._exports.append(e)
99 def process_export(self) -> None:
100 """
101 Process the export, use as input the SBOM to extract the components, and
102 then for each component get applicable CVE.
103 """
105 main_gens: dict[BaseExport, Generator[None, None, None]] = {}
107 for e in self._exports:
108 _logger.info("Starting the export to %s", e.output_path.name)
109 e.cfg_add_kernel_modules(self._add_kernel_modules)
110 main_gen = e.start_export()
111 next(main_gen)
112 main_gens[e] = main_gen
114 for comp_build in self._sbom.iterate_component_builds():
115 _logger.info("Processing build: %s", comp_build.build_name)
116 # Initialize generator
117 gens: list[Generator[None, tuple[bool, AggregateAnnotEntry], None]] = []
118 for e in self._exports:
119 with contextlib.suppress(StopIteration):
120 gen = e.export_comp_info(comp_build)
121 next(gen)
122 gens.append(gen)
124 # Export each annotation
125 for annotation in self._manager.get_applicable_cves(comp_build):
126 is_filtered = self._is_cve_filtered(annotation)
127 _logger.debug(
128 " - Exporting %s%s",
129 annotation.identifier,
130 "" if is_filtered else " (filtered)",
131 )
132 for gen in gens:
133 gen.send((is_filtered, annotation))
135 # Close generator
136 for gen in gens:
137 gen.close()
139 for e, main_gen in main_gens.items():
140 _logger.info("Finalizing the export to %s", e.output_path.name)
141 with contextlib.suppress(StopIteration):
142 next(main_gen)