Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/cli.py: 85%

230 statements  

« prev     ^ index     » next       coverage.py v7.11.1, created at 2025-11-28 15:37 +0100

1# -*- coding: utf-8 -*- 

2# PYTHON_ARGCOMPLETE_OK 

3# SPDX-License-Identifier: GPL-2.0-only 

4 

5import argparse 

6import logging 

7import os 

8import pathlib 

9import tomllib 

10from collections import defaultdict 

11from collections.abc import Sequence 

12from typing import Any, no_type_check 

13 

14from . import __version__ 

15from .cve_db.annot_base import AnnotDatabase 

16from .cve_db.annot_yocto import YoctoAnnotDatabase 

17from .cve_db.db_base import CveDatabase 

18from .cve_db.manager import CveDbManager 

19from .cve_db.registry import DbTypeRegistry 

20from .export.manager import ExportManager 

21from .export.registry import ExportTypeRegistry 

22from .sbom.registry import SbomTypeRegistry 

23from .utils.plugin import import_external_plugin 

24from .vuln.cve import CveVexStatus 

25 

26 

27def load_plugins( 

28 cfgs_plugins: list[pathlib.Path], args_plugins: list[pathlib.Path] 

29) -> None: 

30 search_path = {p.resolve() for p in args_plugins} 

31 search_path.update(cfgs_plugins) 

32 plugins_env = os.environ.get("SBOM_CVE_CHECK_PLUGINS") 

33 if plugins_env: 

34 search_path.update(pathlib.Path(p).resolve() for p in plugins_env.split(":")) 

35 

36 import_external_plugin(search_path) 

37 

38 

39def resolve_config_path( 

40 path_to_resolve: str, path_start_file: str | pathlib.Path 

41) -> pathlib.Path: 

42 path_to_resolve = os.path.expandvars(path_to_resolve) 

43 ret_path = pathlib.Path(path_to_resolve).expanduser() 

44 if ret_path.is_absolute(): 

45 return ret_path.resolve() 

46 return pathlib.Path(path_start_file).parent.joinpath(path_to_resolve).resolve() 

47 

48 

49def create_database( 

50 from_config: bool, 

51 cfg_idx: int, 

52 db_type: str | type[CveDatabase], 

53 path: pathlib.Path, 

54 section_name: str | None = None, 

55 **kwargs: Any, 

56) -> tuple[CveDatabase, int]: 

57 prio_str: str | None = kwargs.pop("priority", None) 

58 name = kwargs.pop("name", None) 

59 if name is None: 

60 name = section_name 

61 if name is None: 

62 name = path.name 

63 

64 if isinstance(db_type, str): 

65 db = DbTypeRegistry().create_from_config( 

66 db_type, name=name, path=path, **kwargs 

67 ) 

68 else: 

69 db = db_type.create_from_config(name=name, path=path, **kwargs) 

70 

71 if prio_str is None: 

72 priority = db.get_default_priority(from_config=from_config, order=cfg_idx) 

73 else: 

74 priority = int(prio_str) 

75 

76 return db, priority 

77 

78 

79def get_config_file_paths(args: argparse.Namespace) -> list[pathlib.Path]: 

80 # Get the list of toml configuration files to read 

81 cfg_files: list[pathlib.Path] = [] 

82 if not args.ignore_default_config: 

83 cfg_files.append(resolve_config_path("db_default.toml", __file__)) 

84 if args.config: 

85 cfg_files.extend(args.config) 

86 return cfg_files 

87 

88 

89def parse_early_config(cfg_files: list[pathlib.Path]) -> argparse.Namespace: 

90 cfg = argparse.Namespace() 

91 

92 plugin_paths: list[pathlib.Path] = [] 

93 cfg.plugins = plugin_paths 

94 

95 # Read configuration file and extract the plugin paths 

96 for cfg_file_path in cfg_files: 

97 with cfg_file_path.open("rb") as f: 

98 toml_data = tomllib.load(f) 

99 

100 paths = toml_data.get("plugins", []) 

101 if isinstance(paths, str): 

102 paths = [paths] 

103 plugin_paths.extend(resolve_config_path(p, cfg_file_path) for p in paths) 

104 

105 return cfg 

106 

107 

108def parse_config(cve_db_manager: CveDbManager, cfg_files: list[pathlib.Path]) -> None: 

109 # Read configuration files, and merge overrides 

110 databases_values: dict[str, dict[str, Any]] = defaultdict(dict) 

111 for cfg_file_path in cfg_files: 

112 with cfg_file_path.open("rb") as f: 

113 toml_data = tomllib.load(f) 

114 

115 db_sections: dict[str, dict[str, Any]] = toml_data.get("databases", {}) 

116 for section_name, db_values in db_sections.items(): 

117 # Resolve relative database path 

118 path_str: str | None = db_values.pop("path", None) 

119 if path_str: 

120 db_values["path"] = resolve_config_path(path_str, cfg_file_path) 

121 

122 # Merge configurations 

123 databases_values[section_name].update(db_values) 

124 

125 # Create each declared databases 

126 for idx, (section_name, db_values) in enumerate(databases_values.items()): 

127 type_name: str | None = db_values.pop("type", None) 

128 if type_name is None: 

129 raise ValueError(f"[{section_name}] database type is missing") 

130 if type_name not in DbTypeRegistry().type_names: 

131 raise ValueError(f"[{section_name}] invalid database type: {type_name}") 

132 

133 path: pathlib.Path | None = db_values.pop("path", None) 

134 if not path: 

135 raise ValueError(f"[{section_name}] database path is missing") 

136 

137 db, prio = create_database( 

138 True, idx, type_name, path, section_name=section_name, **db_values 

139 ) 

140 cve_db_manager.add_db(db, prio) 

141 

142 

143def create_databases_from_args( 

144 cve_db_manager: CveDbManager, 

145 db_args: list[tuple[str, pathlib.Path, dict[str, str]]], 

146) -> None: 

147 for idx, db_info in enumerate(db_args): 

148 db, prio = create_database( 

149 False, idx + 50, db_info[0], db_info[1], section_name=None, **db_info[2] 

150 ) 

151 cve_db_manager.add_db(db, prio) 

152 

153 

154def handle_autocomplete(parser: argparse.ArgumentParser) -> None: 

155 try: 

156 import argcomplete # noqa: PLC0415 

157 

158 argcomplete.autocomplete(parser) 

159 except ImportError: 

160 pass 

161 

162 

163class DbConfigAction(argparse.Action): 

164 # noinspection PyShadowingBuiltins 

165 def __init__( 

166 self, 

167 option_strings: Sequence[str], 

168 dest: str, 

169 required: bool = False, 

170 help: str | None = None, # noqa: A002 

171 ) -> None: 

172 super().__init__( 

173 option_strings=option_strings, 

174 dest=dest, 

175 nargs="*", 

176 required=required, 

177 help=help, 

178 metavar="TYPE PATH KEY=VALUE,", 

179 ) 

180 self._init_autocomplete() 

181 

182 @no_type_check 

183 def _init_autocomplete(self) -> None: 

184 try: 

185 from argcomplete.completers import FilesCompleter # noqa: PLC0415 

186 except ImportError: 

187 return 

188 

189 class DbConfigActionCompleter(FilesCompleter): 

190 # noinspection PyNoneFunctionAssignment 

191 def __call__(self, prefix: str, **kwargs: Any) -> dict[str, str]: 

192 # noinspection PyTypeChecker 

193 lst: list[str] = super().__call__(prefix, **kwargs) 

194 d = dict.fromkeys(lst, "File path") 

195 d.update((n, "Database type") for n in DbTypeRegistry().type_names) 

196 return d 

197 

198 self.completer = DbConfigActionCompleter() 

199 

200 def __call__( 

201 self, 

202 parser: argparse.ArgumentParser, 

203 namespace: argparse.Namespace, 

204 values: str | Sequence[Any] | None, 

205 option_string: str | None = None, 

206 ) -> None: 

207 if not isinstance(values, Sequence) or len(values) <= 1: 

208 raise argparse.ArgumentError(self, "Unexpected number of arguments") 

209 

210 type_name = values[0] 

211 if type_name not in DbTypeRegistry().type_names: 

212 raise argparse.ArgumentError(self, f"Invalid database type: {type_name}") 

213 

214 path = pathlib.Path(values[1]).resolve() 

215 cfg_values = {} 

216 try: 

217 for params in values[2:]: 

218 k, v = params.split("=", 1) 

219 cfg_values[k.strip()] = v.strip() 

220 except ValueError as err: 

221 raise argparse.ArgumentError(self, str(err)) from err 

222 

223 items = getattr(namespace, self.dest, None) 

224 if not items: 

225 items = [] 

226 items.append((type_name, path, cfg_values)) 

227 setattr(namespace, self.dest, items) 

228 

229 

230def process_early_config(parser: argparse.ArgumentParser) -> None: 

231 try: 

232 args, _ = parser.parse_known_args() 

233 except (argparse.ArgumentError, argparse.ArgumentTypeError): 

234 # The main parser should catch again the error, so just ignore the error here 

235 return 

236 

237 # Set logging level 

238 levels = (logging.WARNING, logging.INFO, logging.DEBUG) 

239 logging.basicConfig(level=levels[min(len(levels) - 1, args.verbose)]) 

240 

241 # Parse configuration a first time to extract early config values 

242 cfg = parse_early_config(get_config_file_paths(args)) 

243 

244 # Inject environment variable with default value 

245 env_name_db_dir = "SBOM_CVE_CHECK_DATABASES_DIR" 

246 if args.databases_dir: 

247 os.environ[env_name_db_dir] = args.databases_dir.resolve().as_posix() 

248 elif env_name_db_dir not in os.environ: 

249 cache_env = os.environ.get("XDG_CACHE_HOME", "") 

250 if not cache_env.strip(): 

251 cache_path = pathlib.Path("~/.cache").expanduser() 

252 else: 

253 cache_path = pathlib.Path(cache_env) 

254 os.environ[env_name_db_dir] = cache_path.joinpath( 

255 "sbom_cve_check", "databases" 

256 ).as_posix() 

257 

258 # Load plugins from environment variable, from configuration files, 

259 # and from arguments 

260 load_plugins(cfg.plugins, args.plugins or []) 

261 

262 

263def main() -> None: 

264 early_p = argparse.ArgumentParser( 

265 add_help=False, exit_on_error=False, fromfile_prefix_chars="@" 

266 ) 

267 early_p.add_argument( 

268 "--verbose", 

269 "-v", 

270 action="count", 

271 default=0, 

272 help="Increase logging level, can be specified multiple times", 

273 ) 

274 early_p.add_argument( 

275 "--plugins", 

276 metavar="PATH", 

277 type=pathlib.Path, 

278 action="append", 

279 help="Path to plugin module or plugin directory", 

280 ) 

281 early_p.add_argument( 

282 "--ignore-default-config", 

283 action="store_true", 

284 help="Do not load default database configuration", 

285 ) 

286 early_p.add_argument( 

287 "--config", 

288 metavar="PATH", 

289 type=pathlib.Path, 

290 action="append", 

291 help="Path to one TOML configuration file", 

292 ) 

293 early_p.add_argument( 

294 "--databases-dir", 

295 metavar="PATH", 

296 type=pathlib.Path, 

297 help="Path to the directory containing the databases to be downloaded. " 

298 "Override SBOM_CVE_CHECK_DATABASES_DIR", 

299 ) 

300 

301 parser = argparse.ArgumentParser(parents=[early_p], fromfile_prefix_chars="@") 

302 process_early_config(early_p) 

303 

304 parser.add_argument( 

305 "--version", "-V", action="version", version=f"%(prog)s {__version__}" 

306 ) 

307 

308 sbom_p = parser.add_argument_group("Sbom") 

309 sbom_p.add_argument( 

310 "--sbom-type", 

311 "--sbom-format", 

312 "-F", 

313 choices=SbomTypeRegistry().type_names, 

314 help="Specify the format of the SBOM input file", 

315 ) 

316 sbom_p.add_argument( 

317 "--sbom-path", 

318 "--sbom", 

319 "-S", 

320 metavar="PATH", 

321 required=True, 

322 type=pathlib.Path, 

323 help="Path to the SBOM input file", 

324 ) 

325 sbom_p.add_argument( 

326 "--ignore-sbom-annotations", 

327 action="store_true", 

328 help="If set, do not load annotations from the SBOM file", 

329 ) 

330 sbom_p.add_argument( 

331 "--sbom-annotation-priority", 

332 metavar="PRIO", 

333 type=int, 

334 help="Priority to use for the annotations read from SBOM input file", 

335 ) 

336 sbom_p.add_argument( 

337 "--sbom-obsolete-assessment-check", 

338 action=argparse.BooleanOptionalAction, 

339 help="Enable or disable checks of obsolete assessments specified in SBOM", 

340 ) 

341 

342 conf_p = parser.add_argument_group("Database configuration") 

343 conf_p.add_argument( 

344 "--add-db", 

345 action=DbConfigAction, 

346 help="Allow to add CVE or annotation database from command line", 

347 ) 

348 conf_p.add_argument( 

349 "--yocto-vex-manifest", 

350 metavar="PATH", 

351 type=pathlib.Path, 

352 action="append", 

353 help="Shortcut to specify a Yocto Vex manifest, with default configuration", 

354 ) 

355 conf_p.add_argument( 

356 "--check-obsolete-assessment-by-default", 

357 action="store_true", 

358 help="If set, by default, check for obsolete assessment", 

359 ) 

360 

361 exp_p = parser.add_argument_group("Export file") 

362 exp_p.add_argument( 

363 "--export-type", 

364 "--export-format", 

365 "-f", 

366 required=True, 

367 action="append", 

368 choices=ExportTypeRegistry().type_names, 

369 help="Export format of generated file", 

370 ) 

371 exp_p.add_argument( 

372 "--export-path", 

373 "--output", 

374 "-o", 

375 metavar="PATH", 

376 required=True, 

377 action="append", 

378 type=pathlib.Path, 

379 help="Path to the exported file", 

380 ) 

381 

382 filter_p = parser.add_argument_group("Export configuration") 

383 filter_p.add_argument( 

384 "--export-filter-vex-status", 

385 choices=[s.value for s in CveVexStatus if s.value is not None], 

386 nargs="+", 

387 help="Only add, in exported file, CVEs with the following VEX status", 

388 ) 

389 filter_p.add_argument( 

390 "--export-filter-vulnerable", 

391 action="store_true", 

392 help="Only add, in exported file, CVEs which are considered vulnerable", 

393 ) 

394 filter_p.add_argument( 

395 "--export-filter-cve-without-cvss-score", 

396 action="store_true", 

397 help="If set, do not add CVE without CVSS score, in exported file", 

398 ) 

399 filter_p.add_argument( 

400 "--export-filter-cve-min-cvss-score", 

401 type=float, 

402 help="Only add, in exported file, CVEs with a minimum CVSS score", 

403 ) 

404 filter_p.add_argument( 

405 "--export-filter-cve-without-versions", 

406 action="store_true", 

407 help="If set, do not add CVE without versions, in exported file", 

408 ) 

409 filter_p.add_argument( 

410 "--export-add-kernel-modules", 

411 action="store_true", 

412 help="If set, add CVE information to kernel modules, in exported file", 

413 ) 

414 filter_p.add_argument( 

415 "--export-list-rejected-cve", 

416 action="store_true", 

417 help="If set, list CVEs that are rejected, in exported file", 

418 ) 

419 

420 handle_autocomplete(parser) 

421 args = parser.parse_args() 

422 

423 # First get default configuration values for databases 

424 AnnotDatabase.DEFAULT_OBSOLETE_ASSESSMENT_CHECK = ( 

425 args.check_obsolete_assessment_by_default 

426 ) 

427 CveDatabase.INDEX_REJECTED_CVE = args.export_list_rejected_cve 

428 

429 # Check that export arguments are OK 

430 if len(args.export_type) != len(args.export_path): 

431 parser.error("Format and path must be specified for each exported file") 

432 

433 # Instantiate CVE database manager to register all databases 

434 cve_db_manager = CveDbManager() 

435 

436 # Get CVE and annotation databases 

437 try: 

438 if args.add_db: 

439 create_databases_from_args(cve_db_manager, args.add_db) 

440 parse_config(cve_db_manager, get_config_file_paths(args)) 

441 

442 for idx, m in enumerate(args.yocto_vex_manifest or []): 

443 path: pathlib.Path = m 

444 db, prio = create_database(False, idx + 1, YoctoAnnotDatabase, path) 

445 cve_db_manager.add_db(db, prio) 

446 

447 except (ValueError, TypeError, FileNotFoundError) as err: 

448 parser.error(str(err)) 

449 

450 # Get input SBOM 

451 sbom_path: pathlib.Path = args.sbom_path 

452 try: 

453 sbom = SbomTypeRegistry().create(args.sbom_type, sbom_path.resolve(strict=True)) 

454 except (FileNotFoundError, ValueError) as err: 

455 parser.error(str(err)) 

456 

457 # Create annotation database from SBOM 

458 if not args.ignore_sbom_annotations: 

459 sbom_annot_opts = {} 

460 if args.sbom_obsolete_assessment_check: 

461 sbom_annot_opts["obsolete_assessment_check"] = True 

462 elif args.sbom_obsolete_assessment_check is False: 

463 sbom_annot_opts["obsolete_assessment_check"] = False 

464 sbom_annot_db = sbom.create_annot_database( 

465 name=sbom_path.name, **sbom_annot_opts 

466 ) 

467 if sbom_annot_db is not None: 

468 sbom_annot_prio: int | None = args.sbom_annotation_priority 

469 if sbom_annot_prio is None: 

470 sbom_annot_prio = sbom_annot_db.get_default_priority( 

471 order=0, from_config=False 

472 ) 

473 cve_db_manager.add_db(sbom_annot_db, sbom_annot_prio) 

474 

475 # Initialize and Index databases 

476 cve_db_manager.create_index() 

477 

478 # Initialize and configure export 

479 export_mng = ExportManager(sbom, cve_db_manager) 

480 

481 for exp_type, exp_path in zip(args.export_type, args.export_path, strict=True): 

482 export_mng.add_exporter(ExportTypeRegistry().create(exp_type, sbom, exp_path)) 

483 

484 if args.export_filter_vulnerable: 

485 export_mng.cfg_filter_status( 

486 [CveVexStatus.UNDER_INVESTIGATION, CveVexStatus.AFFECTED] 

487 ) 

488 

489 if args.export_filter_vex_status: 

490 export_mng.cfg_filter_status( 

491 [CveVexStatus(n) for n in args.export_filter_vex_status] 

492 ) 

493 

494 export_mng.cfg_filter_rejected_cve(not args.export_list_rejected_cve) 

495 export_mng.cfg_filter_no_cvss_score(args.export_filter_cve_without_cvss_score) 

496 export_mng.cfg_filter_cvss_score(args.export_filter_cve_min_cvss_score) 

497 export_mng.cfg_filter_missing_versions(args.export_filter_cve_without_versions) 

498 export_mng.cfg_add_kernel_modules(args.export_add_kernel_modules) 

499 

500 # And finally generate the export 

501 export_mng.process_export()