Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/cve_db/manager.py: 90%
99 statements
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
1# -*- coding: utf-8 -*-
2# SPDX-License-Identifier: GPL-2.0-only
4import dataclasses
5import logging
6import subprocess
7import threading
8from collections import defaultdict
9from collections.abc import Generator
11from ..sbom.component import CompBuild, CompId, comp_ids_is_matching_one_of
12from ..vuln.cve import CveId
13from .annot_aggregate import AggregateAnnotEntry
14from .annot_base import AnnotDbEntry
15from .db_base import CveDatabase, CveDbEntry
17_logger = logging.getLogger(__name__)
20@dataclasses.dataclass
21class _ThreadDbInitData:
22 db: CveDatabase
23 exec: BaseException | None = None
26def _init_and_index_database(
27 data: _ThreadDbInitData,
28 lock: threading.Lock,
29 index_comp_cve: dict[str, set[str]],
30 uniq_cve_ids: dict[str, str],
31) -> None:
32 try:
33 _logger.info("Initializing and indexing the %s database", data.db.name)
35 # First initialize the database
36 data.db.initialize()
38 # Create database index
39 index_db = data.db.create_index()
41 _logger.info("%s database indexing is complete", data.db.name)
42 except BaseException as e: # noqa: BLE001
43 data.exec = e
44 return
46 # Merge indexes
47 with lock:
48 for comp_name, cve_ids in index_db.items():
49 for cve_id in cve_ids:
50 # Allow to reduce memory at the end of indexing:
51 # Prevent duplication of CVE-ID strings
52 index_comp_cve[comp_name].add(uniq_cve_ids.setdefault(cve_id, cve_id))
55class CveDbManager:
56 def __init__(self) -> None:
57 self._databases: dict[int, list[CveDatabase]] = {}
58 self._index_comp_cve: dict[str, tuple[str, ...]] = {}
60 def add_db(self, db: CveDatabase, priority: int) -> None:
61 # Register the database
62 lst_db = self._databases.setdefault(priority, [])
63 if lst_db:
64 prev_name = lst_db[0].name
65 if db.has_annotations:
66 raise RuntimeError(
67 f"Annotation database {db.name} has same priority than {prev_name}"
68 )
69 if lst_db[0].has_annotations:
70 raise RuntimeError(
71 f"Annotation database {prev_name} has same priority "
72 f"than CVE db {db.name}"
73 )
75 lst_db.append(db)
77 # Clear index since no longer up-to-date
78 self._index_comp_cve.clear()
80 def create_index(self) -> None:
81 threads: dict[threading.Thread, _ThreadDbInitData] = {}
82 lock = threading.Lock()
83 index_comp_cve: dict[str, set[str]] = defaultdict(set)
84 uniq_cve_ids: dict[str, str] = {}
85 self._index_comp_cve.clear()
87 # First sort the databases by priority (highest to lowest)
88 self._databases = dict(
89 sorted(self._databases.items(), key=lambda d: d[0], reverse=True)
90 )
92 # Initialize and Index the various databases in "parallel"
93 for dbs in self._databases.values():
94 for db in dbs:
95 data = _ThreadDbInitData(db=db)
96 t = threading.Thread(
97 target=_init_and_index_database,
98 args=(data, lock, index_comp_cve, uniq_cve_ids),
99 )
100 threads[t] = data
101 t.start()
103 for t, data in threads.items():
104 t.join()
105 if data.exec:
106 msg = f"{data.db.name} database init failed"
107 if isinstance(data.exec, subprocess.CalledProcessError):
108 msg += "\n" + data.exec.stdout + "---\n" + data.exec.stderr
109 raise RuntimeError(msg) from data.exec
111 # Free non longer needed memory as soon as possible
112 del threads
113 del uniq_cve_ids
115 # And finally build the final index: Replace set by list to reduce memory usage
116 for comp_name, cve_ids in index_comp_cve.items():
117 self._index_comp_cve[comp_name] = tuple(cve_ids)
119 def get_applicable_cves(
120 self, comp_build: CompBuild
121 ) -> Generator[AggregateAnnotEntry, None, None]:
122 """
123 :return: The CVEs that is applicable to this component with associated
124 generated annotation
125 """
126 assert self._index_comp_cve
128 # First get a unique list of product name:
129 # Group component identifiers by product name
130 comp_ids_by_name: dict[str, set[CompId]] = defaultdict(set)
131 for comp_id in comp_build.identifiers:
132 comp_ids_by_name[comp_id.name].add(comp_id)
134 # Query the index for associated CVE for each product name in order to build
135 # a set of potential CVE IDs
136 potential_cve_ids: set[str] = set()
137 for comp_name in comp_ids_by_name:
138 for cve_id_str in self._index_comp_cve.get(comp_name, ()):
139 potential_cve_ids.add(cve_id_str)
141 # For each potential CVE ID (sorted to have something reproducible)...
142 for cve_id_str in sorted(potential_cve_ids):
143 cve_id = CveId(cve_id_str)
145 # Iterate over each database grouped by priority...
146 cve_entries_by_prio: dict[int, list[CveDbEntry]] = defaultdict(list)
147 cve_loosely_match_ids: dict[str, set[CompId]] = defaultdict(set)
149 for prio, dbs in self._databases.items():
150 for db in dbs:
151 cve_entry = db.get_cve(cve_id)
152 if cve_entry is None:
153 continue
155 # Build associated list of CVE entries (grouped by priority) but do
156 # not add entry if not applicable: For example if this is an
157 # annotation but the version does not match
158 if not isinstance(
159 cve_entry, AnnotDbEntry
160 ) or cve_entry.is_version_applicable(
161 comp_build.identifiers, comp_build.version
162 ):
163 cve_entries_by_prio[prio].append(cve_entry)
165 # Build a set of component identifiers, that loosely match,
166 # grouped by product name
167 for comp_name in comp_ids_by_name:
168 cve_loosely_match_ids[comp_name].update(
169 cve_entry.get_loosely_applicable_comp_ids(comp_name)
170 )
172 if not cve_entries_by_prio:
173 continue
175 # For each component identifier (specified as input) try to find if it is
176 # really associated with the CVE
177 is_applicable = False
178 for comp_name, comp_ids in comp_ids_by_name.items():
179 if comp_ids_is_matching_one_of(
180 comp_ids, CompId.filter_best_ids(cve_loosely_match_ids[comp_name])
181 ):
182 is_applicable = True
183 break
185 if not is_applicable:
186 continue
188 # This CVE is applicable to the component identified by component
189 # identifiers specified as input
190 yield AggregateAnnotEntry(
191 comp_build,
192 tuple(tuple(lst) for lst in cve_entries_by_prio.values()),
193 )