Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/cve_db/manager.py: 90%

99 statements  

« prev     ^ index     » next       coverage.py v7.11.1, created at 2025-11-28 15:37 +0100

1# -*- coding: utf-8 -*- 

2# SPDX-License-Identifier: GPL-2.0-only 

3 

4import dataclasses 

5import logging 

6import subprocess 

7import threading 

8from collections import defaultdict 

9from collections.abc import Generator 

10 

11from ..sbom.component import CompBuild, CompId, comp_ids_is_matching_one_of 

12from ..vuln.cve import CveId 

13from .annot_aggregate import AggregateAnnotEntry 

14from .annot_base import AnnotDbEntry 

15from .db_base import CveDatabase, CveDbEntry 

16 

17_logger = logging.getLogger(__name__) 

18 

19 

20@dataclasses.dataclass 

21class _ThreadDbInitData: 

22 db: CveDatabase 

23 exec: BaseException | None = None 

24 

25 

26def _init_and_index_database( 

27 data: _ThreadDbInitData, 

28 lock: threading.Lock, 

29 index_comp_cve: dict[str, set[str]], 

30 uniq_cve_ids: dict[str, str], 

31) -> None: 

32 try: 

33 _logger.info("Initializing and indexing the %s database", data.db.name) 

34 

35 # First initialize the database 

36 data.db.initialize() 

37 

38 # Create database index 

39 index_db = data.db.create_index() 

40 

41 _logger.info("%s database indexing is complete", data.db.name) 

42 except BaseException as e: # noqa: BLE001 

43 data.exec = e 

44 return 

45 

46 # Merge indexes 

47 with lock: 

48 for comp_name, cve_ids in index_db.items(): 

49 for cve_id in cve_ids: 

50 # Allow to reduce memory at the end of indexing: 

51 # Prevent duplication of CVE-ID strings 

52 index_comp_cve[comp_name].add(uniq_cve_ids.setdefault(cve_id, cve_id)) 

53 

54 

55class CveDbManager: 

56 def __init__(self) -> None: 

57 self._databases: dict[int, list[CveDatabase]] = {} 

58 self._index_comp_cve: dict[str, tuple[str, ...]] = {} 

59 

60 def add_db(self, db: CveDatabase, priority: int) -> None: 

61 # Register the database 

62 lst_db = self._databases.setdefault(priority, []) 

63 if lst_db: 

64 prev_name = lst_db[0].name 

65 if db.has_annotations: 

66 raise RuntimeError( 

67 f"Annotation database {db.name} has same priority than {prev_name}" 

68 ) 

69 if lst_db[0].has_annotations: 

70 raise RuntimeError( 

71 f"Annotation database {prev_name} has same priority " 

72 f"than CVE db {db.name}" 

73 ) 

74 

75 lst_db.append(db) 

76 

77 # Clear index since no longer up-to-date 

78 self._index_comp_cve.clear() 

79 

80 def create_index(self) -> None: 

81 threads: dict[threading.Thread, _ThreadDbInitData] = {} 

82 lock = threading.Lock() 

83 index_comp_cve: dict[str, set[str]] = defaultdict(set) 

84 uniq_cve_ids: dict[str, str] = {} 

85 self._index_comp_cve.clear() 

86 

87 # First sort the databases by priority (highest to lowest) 

88 self._databases = dict( 

89 sorted(self._databases.items(), key=lambda d: d[0], reverse=True) 

90 ) 

91 

92 # Initialize and Index the various databases in "parallel" 

93 for dbs in self._databases.values(): 

94 for db in dbs: 

95 data = _ThreadDbInitData(db=db) 

96 t = threading.Thread( 

97 target=_init_and_index_database, 

98 args=(data, lock, index_comp_cve, uniq_cve_ids), 

99 ) 

100 threads[t] = data 

101 t.start() 

102 

103 for t, data in threads.items(): 

104 t.join() 

105 if data.exec: 

106 msg = f"{data.db.name} database init failed" 

107 if isinstance(data.exec, subprocess.CalledProcessError): 

108 msg += "\n" + data.exec.stdout + "---\n" + data.exec.stderr 

109 raise RuntimeError(msg) from data.exec 

110 

111 # Free non longer needed memory as soon as possible 

112 del threads 

113 del uniq_cve_ids 

114 

115 # And finally build the final index: Replace set by list to reduce memory usage 

116 for comp_name, cve_ids in index_comp_cve.items(): 

117 self._index_comp_cve[comp_name] = tuple(cve_ids) 

118 

119 def get_applicable_cves( 

120 self, comp_build: CompBuild 

121 ) -> Generator[AggregateAnnotEntry, None, None]: 

122 """ 

123 :return: The CVEs that is applicable to this component with associated 

124 generated annotation 

125 """ 

126 assert self._index_comp_cve 

127 

128 # First get a unique list of product name: 

129 # Group component identifiers by product name 

130 comp_ids_by_name: dict[str, set[CompId]] = defaultdict(set) 

131 for comp_id in comp_build.identifiers: 

132 comp_ids_by_name[comp_id.name].add(comp_id) 

133 

134 # Query the index for associated CVE for each product name in order to build 

135 # a set of potential CVE IDs 

136 potential_cve_ids: set[str] = set() 

137 for comp_name in comp_ids_by_name: 

138 for cve_id_str in self._index_comp_cve.get(comp_name, ()): 

139 potential_cve_ids.add(cve_id_str) 

140 

141 # For each potential CVE ID (sorted to have something reproducible)... 

142 for cve_id_str in sorted(potential_cve_ids): 

143 cve_id = CveId(cve_id_str) 

144 

145 # Iterate over each database grouped by priority... 

146 cve_entries_by_prio: dict[int, list[CveDbEntry]] = defaultdict(list) 

147 cve_loosely_match_ids: dict[str, set[CompId]] = defaultdict(set) 

148 

149 for prio, dbs in self._databases.items(): 

150 for db in dbs: 

151 cve_entry = db.get_cve(cve_id) 

152 if cve_entry is None: 

153 continue 

154 

155 # Build associated list of CVE entries (grouped by priority) but do 

156 # not add entry if not applicable: For example if this is an 

157 # annotation but the version does not match 

158 if not isinstance( 

159 cve_entry, AnnotDbEntry 

160 ) or cve_entry.is_version_applicable( 

161 comp_build.identifiers, comp_build.version 

162 ): 

163 cve_entries_by_prio[prio].append(cve_entry) 

164 

165 # Build a set of component identifiers, that loosely match, 

166 # grouped by product name 

167 for comp_name in comp_ids_by_name: 

168 cve_loosely_match_ids[comp_name].update( 

169 cve_entry.get_loosely_applicable_comp_ids(comp_name) 

170 ) 

171 

172 if not cve_entries_by_prio: 

173 continue 

174 

175 # For each component identifier (specified as input) try to find if it is 

176 # really associated with the CVE 

177 is_applicable = False 

178 for comp_name, comp_ids in comp_ids_by_name.items(): 

179 if comp_ids_is_matching_one_of( 

180 comp_ids, CompId.filter_best_ids(cve_loosely_match_ids[comp_name]) 

181 ): 

182 is_applicable = True 

183 break 

184 

185 if not is_applicable: 

186 continue 

187 

188 # This CVE is applicable to the component identified by component 

189 # identifiers specified as input 

190 yield AggregateAnnotEntry( 

191 comp_build, 

192 tuple(tuple(lst) for lst in cve_entries_by_prio.values()), 

193 )