Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/cli.py: 85%
230 statements
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
1# -*- coding: utf-8 -*-
2# PYTHON_ARGCOMPLETE_OK
3# SPDX-License-Identifier: GPL-2.0-only
5import argparse
6import logging
7import os
8import pathlib
9import tomllib
10from collections import defaultdict
11from collections.abc import Sequence
12from typing import Any, no_type_check
14from . import __version__
15from .cve_db.annot_base import AnnotDatabase
16from .cve_db.annot_yocto import YoctoAnnotDatabase
17from .cve_db.db_base import CveDatabase
18from .cve_db.manager import CveDbManager
19from .cve_db.registry import DbTypeRegistry
20from .export.manager import ExportManager
21from .export.registry import ExportTypeRegistry
22from .sbom.registry import SbomTypeRegistry
23from .utils.plugin import import_external_plugin
24from .vuln.cve import CveVexStatus
27def load_plugins(
28 cfgs_plugins: list[pathlib.Path], args_plugins: list[pathlib.Path]
29) -> None:
30 search_path = {p.resolve() for p in args_plugins}
31 search_path.update(cfgs_plugins)
32 plugins_env = os.environ.get("SBOM_CVE_CHECK_PLUGINS")
33 if plugins_env:
34 search_path.update(pathlib.Path(p).resolve() for p in plugins_env.split(":"))
36 import_external_plugin(search_path)
39def resolve_config_path(
40 path_to_resolve: str, path_start_file: str | pathlib.Path
41) -> pathlib.Path:
42 path_to_resolve = os.path.expandvars(path_to_resolve)
43 ret_path = pathlib.Path(path_to_resolve).expanduser()
44 if ret_path.is_absolute():
45 return ret_path.resolve()
46 return pathlib.Path(path_start_file).parent.joinpath(path_to_resolve).resolve()
49def create_database(
50 from_config: bool,
51 cfg_idx: int,
52 db_type: str | type[CveDatabase],
53 path: pathlib.Path,
54 section_name: str | None = None,
55 **kwargs: Any,
56) -> tuple[CveDatabase, int]:
57 prio_str: str | None = kwargs.pop("priority", None)
58 name = kwargs.pop("name", None)
59 if name is None:
60 name = section_name
61 if name is None:
62 name = path.name
64 if isinstance(db_type, str):
65 db = DbTypeRegistry().create_from_config(
66 db_type, name=name, path=path, **kwargs
67 )
68 else:
69 db = db_type.create_from_config(name=name, path=path, **kwargs)
71 if prio_str is None:
72 priority = db.get_default_priority(from_config=from_config, order=cfg_idx)
73 else:
74 priority = int(prio_str)
76 return db, priority
79def get_config_file_paths(args: argparse.Namespace) -> list[pathlib.Path]:
80 # Get the list of toml configuration files to read
81 cfg_files: list[pathlib.Path] = []
82 if not args.ignore_default_config:
83 cfg_files.append(resolve_config_path("db_default.toml", __file__))
84 if args.config:
85 cfg_files.extend(args.config)
86 return cfg_files
89def parse_early_config(cfg_files: list[pathlib.Path]) -> argparse.Namespace:
90 cfg = argparse.Namespace()
92 plugin_paths: list[pathlib.Path] = []
93 cfg.plugins = plugin_paths
95 # Read configuration file and extract the plugin paths
96 for cfg_file_path in cfg_files:
97 with cfg_file_path.open("rb") as f:
98 toml_data = tomllib.load(f)
100 paths = toml_data.get("plugins", [])
101 if isinstance(paths, str):
102 paths = [paths]
103 plugin_paths.extend(resolve_config_path(p, cfg_file_path) for p in paths)
105 return cfg
108def parse_config(cve_db_manager: CveDbManager, cfg_files: list[pathlib.Path]) -> None:
109 # Read configuration files, and merge overrides
110 databases_values: dict[str, dict[str, Any]] = defaultdict(dict)
111 for cfg_file_path in cfg_files:
112 with cfg_file_path.open("rb") as f:
113 toml_data = tomllib.load(f)
115 db_sections: dict[str, dict[str, Any]] = toml_data.get("databases", {})
116 for section_name, db_values in db_sections.items():
117 # Resolve relative database path
118 path_str: str | None = db_values.pop("path", None)
119 if path_str:
120 db_values["path"] = resolve_config_path(path_str, cfg_file_path)
122 # Merge configurations
123 databases_values[section_name].update(db_values)
125 # Create each declared databases
126 for idx, (section_name, db_values) in enumerate(databases_values.items()):
127 type_name: str | None = db_values.pop("type", None)
128 if type_name is None:
129 raise ValueError(f"[{section_name}] database type is missing")
130 if type_name not in DbTypeRegistry().type_names:
131 raise ValueError(f"[{section_name}] invalid database type: {type_name}")
133 path: pathlib.Path | None = db_values.pop("path", None)
134 if not path:
135 raise ValueError(f"[{section_name}] database path is missing")
137 db, prio = create_database(
138 True, idx, type_name, path, section_name=section_name, **db_values
139 )
140 cve_db_manager.add_db(db, prio)
143def create_databases_from_args(
144 cve_db_manager: CveDbManager,
145 db_args: list[tuple[str, pathlib.Path, dict[str, str]]],
146) -> None:
147 for idx, db_info in enumerate(db_args):
148 db, prio = create_database(
149 False, idx + 50, db_info[0], db_info[1], section_name=None, **db_info[2]
150 )
151 cve_db_manager.add_db(db, prio)
154def handle_autocomplete(parser: argparse.ArgumentParser) -> None:
155 try:
156 import argcomplete # noqa: PLC0415
158 argcomplete.autocomplete(parser)
159 except ImportError:
160 pass
163class DbConfigAction(argparse.Action):
164 # noinspection PyShadowingBuiltins
165 def __init__(
166 self,
167 option_strings: Sequence[str],
168 dest: str,
169 required: bool = False,
170 help: str | None = None, # noqa: A002
171 ) -> None:
172 super().__init__(
173 option_strings=option_strings,
174 dest=dest,
175 nargs="*",
176 required=required,
177 help=help,
178 metavar="TYPE PATH KEY=VALUE,",
179 )
180 self._init_autocomplete()
182 @no_type_check
183 def _init_autocomplete(self) -> None:
184 try:
185 from argcomplete.completers import FilesCompleter # noqa: PLC0415
186 except ImportError:
187 return
189 class DbConfigActionCompleter(FilesCompleter):
190 # noinspection PyNoneFunctionAssignment
191 def __call__(self, prefix: str, **kwargs: Any) -> dict[str, str]:
192 # noinspection PyTypeChecker
193 lst: list[str] = super().__call__(prefix, **kwargs)
194 d = dict.fromkeys(lst, "File path")
195 d.update((n, "Database type") for n in DbTypeRegistry().type_names)
196 return d
198 self.completer = DbConfigActionCompleter()
200 def __call__(
201 self,
202 parser: argparse.ArgumentParser,
203 namespace: argparse.Namespace,
204 values: str | Sequence[Any] | None,
205 option_string: str | None = None,
206 ) -> None:
207 if not isinstance(values, Sequence) or len(values) <= 1:
208 raise argparse.ArgumentError(self, "Unexpected number of arguments")
210 type_name = values[0]
211 if type_name not in DbTypeRegistry().type_names:
212 raise argparse.ArgumentError(self, f"Invalid database type: {type_name}")
214 path = pathlib.Path(values[1]).resolve()
215 cfg_values = {}
216 try:
217 for params in values[2:]:
218 k, v = params.split("=", 1)
219 cfg_values[k.strip()] = v.strip()
220 except ValueError as err:
221 raise argparse.ArgumentError(self, str(err)) from err
223 items = getattr(namespace, self.dest, None)
224 if not items:
225 items = []
226 items.append((type_name, path, cfg_values))
227 setattr(namespace, self.dest, items)
230def process_early_config(parser: argparse.ArgumentParser) -> None:
231 try:
232 args, _ = parser.parse_known_args()
233 except (argparse.ArgumentError, argparse.ArgumentTypeError):
234 # The main parser should catch again the error, so just ignore the error here
235 return
237 # Set logging level
238 levels = (logging.WARNING, logging.INFO, logging.DEBUG)
239 logging.basicConfig(level=levels[min(len(levels) - 1, args.verbose)])
241 # Parse configuration a first time to extract early config values
242 cfg = parse_early_config(get_config_file_paths(args))
244 # Inject environment variable with default value
245 env_name_db_dir = "SBOM_CVE_CHECK_DATABASES_DIR"
246 if args.databases_dir:
247 os.environ[env_name_db_dir] = args.databases_dir.resolve().as_posix()
248 elif env_name_db_dir not in os.environ:
249 cache_env = os.environ.get("XDG_CACHE_HOME", "")
250 if not cache_env.strip():
251 cache_path = pathlib.Path("~/.cache").expanduser()
252 else:
253 cache_path = pathlib.Path(cache_env)
254 os.environ[env_name_db_dir] = cache_path.joinpath(
255 "sbom_cve_check", "databases"
256 ).as_posix()
258 # Load plugins from environment variable, from configuration files,
259 # and from arguments
260 load_plugins(cfg.plugins, args.plugins or [])
263def main() -> None:
264 early_p = argparse.ArgumentParser(
265 add_help=False, exit_on_error=False, fromfile_prefix_chars="@"
266 )
267 early_p.add_argument(
268 "--verbose",
269 "-v",
270 action="count",
271 default=0,
272 help="Increase logging level, can be specified multiple times",
273 )
274 early_p.add_argument(
275 "--plugins",
276 metavar="PATH",
277 type=pathlib.Path,
278 action="append",
279 help="Path to plugin module or plugin directory",
280 )
281 early_p.add_argument(
282 "--ignore-default-config",
283 action="store_true",
284 help="Do not load default database configuration",
285 )
286 early_p.add_argument(
287 "--config",
288 metavar="PATH",
289 type=pathlib.Path,
290 action="append",
291 help="Path to one TOML configuration file",
292 )
293 early_p.add_argument(
294 "--databases-dir",
295 metavar="PATH",
296 type=pathlib.Path,
297 help="Path to the directory containing the databases to be downloaded. "
298 "Override SBOM_CVE_CHECK_DATABASES_DIR",
299 )
301 parser = argparse.ArgumentParser(parents=[early_p], fromfile_prefix_chars="@")
302 process_early_config(early_p)
304 parser.add_argument(
305 "--version", "-V", action="version", version=f"%(prog)s {__version__}"
306 )
308 sbom_p = parser.add_argument_group("Sbom")
309 sbom_p.add_argument(
310 "--sbom-type",
311 "--sbom-format",
312 "-F",
313 choices=SbomTypeRegistry().type_names,
314 help="Specify the format of the SBOM input file",
315 )
316 sbom_p.add_argument(
317 "--sbom-path",
318 "--sbom",
319 "-S",
320 metavar="PATH",
321 required=True,
322 type=pathlib.Path,
323 help="Path to the SBOM input file",
324 )
325 sbom_p.add_argument(
326 "--ignore-sbom-annotations",
327 action="store_true",
328 help="If set, do not load annotations from the SBOM file",
329 )
330 sbom_p.add_argument(
331 "--sbom-annotation-priority",
332 metavar="PRIO",
333 type=int,
334 help="Priority to use for the annotations read from SBOM input file",
335 )
336 sbom_p.add_argument(
337 "--sbom-obsolete-assessment-check",
338 action=argparse.BooleanOptionalAction,
339 help="Enable or disable checks of obsolete assessments specified in SBOM",
340 )
342 conf_p = parser.add_argument_group("Database configuration")
343 conf_p.add_argument(
344 "--add-db",
345 action=DbConfigAction,
346 help="Allow to add CVE or annotation database from command line",
347 )
348 conf_p.add_argument(
349 "--yocto-vex-manifest",
350 metavar="PATH",
351 type=pathlib.Path,
352 action="append",
353 help="Shortcut to specify a Yocto Vex manifest, with default configuration",
354 )
355 conf_p.add_argument(
356 "--check-obsolete-assessment-by-default",
357 action="store_true",
358 help="If set, by default, check for obsolete assessment",
359 )
361 exp_p = parser.add_argument_group("Export file")
362 exp_p.add_argument(
363 "--export-type",
364 "--export-format",
365 "-f",
366 required=True,
367 action="append",
368 choices=ExportTypeRegistry().type_names,
369 help="Export format of generated file",
370 )
371 exp_p.add_argument(
372 "--export-path",
373 "--output",
374 "-o",
375 metavar="PATH",
376 required=True,
377 action="append",
378 type=pathlib.Path,
379 help="Path to the exported file",
380 )
382 filter_p = parser.add_argument_group("Export configuration")
383 filter_p.add_argument(
384 "--export-filter-vex-status",
385 choices=[s.value for s in CveVexStatus if s.value is not None],
386 nargs="+",
387 help="Only add, in exported file, CVEs with the following VEX status",
388 )
389 filter_p.add_argument(
390 "--export-filter-vulnerable",
391 action="store_true",
392 help="Only add, in exported file, CVEs which are considered vulnerable",
393 )
394 filter_p.add_argument(
395 "--export-filter-cve-without-cvss-score",
396 action="store_true",
397 help="If set, do not add CVE without CVSS score, in exported file",
398 )
399 filter_p.add_argument(
400 "--export-filter-cve-min-cvss-score",
401 type=float,
402 help="Only add, in exported file, CVEs with a minimum CVSS score",
403 )
404 filter_p.add_argument(
405 "--export-filter-cve-without-versions",
406 action="store_true",
407 help="If set, do not add CVE without versions, in exported file",
408 )
409 filter_p.add_argument(
410 "--export-add-kernel-modules",
411 action="store_true",
412 help="If set, add CVE information to kernel modules, in exported file",
413 )
414 filter_p.add_argument(
415 "--export-list-rejected-cve",
416 action="store_true",
417 help="If set, list CVEs that are rejected, in exported file",
418 )
420 handle_autocomplete(parser)
421 args = parser.parse_args()
423 # First get default configuration values for databases
424 AnnotDatabase.DEFAULT_OBSOLETE_ASSESSMENT_CHECK = (
425 args.check_obsolete_assessment_by_default
426 )
427 CveDatabase.INDEX_REJECTED_CVE = args.export_list_rejected_cve
429 # Check that export arguments are OK
430 if len(args.export_type) != len(args.export_path):
431 parser.error("Format and path must be specified for each exported file")
433 # Instantiate CVE database manager to register all databases
434 cve_db_manager = CveDbManager()
436 # Get CVE and annotation databases
437 try:
438 if args.add_db:
439 create_databases_from_args(cve_db_manager, args.add_db)
440 parse_config(cve_db_manager, get_config_file_paths(args))
442 for idx, m in enumerate(args.yocto_vex_manifest or []):
443 path: pathlib.Path = m
444 db, prio = create_database(False, idx + 1, YoctoAnnotDatabase, path)
445 cve_db_manager.add_db(db, prio)
447 except (ValueError, TypeError, FileNotFoundError) as err:
448 parser.error(str(err))
450 # Get input SBOM
451 sbom_path: pathlib.Path = args.sbom_path
452 try:
453 sbom = SbomTypeRegistry().create(args.sbom_type, sbom_path.resolve(strict=True))
454 except (FileNotFoundError, ValueError) as err:
455 parser.error(str(err))
457 # Create annotation database from SBOM
458 if not args.ignore_sbom_annotations:
459 sbom_annot_opts = {}
460 if args.sbom_obsolete_assessment_check:
461 sbom_annot_opts["obsolete_assessment_check"] = True
462 elif args.sbom_obsolete_assessment_check is False:
463 sbom_annot_opts["obsolete_assessment_check"] = False
464 sbom_annot_db = sbom.create_annot_database(
465 name=sbom_path.name, **sbom_annot_opts
466 )
467 if sbom_annot_db is not None:
468 sbom_annot_prio: int | None = args.sbom_annotation_priority
469 if sbom_annot_prio is None:
470 sbom_annot_prio = sbom_annot_db.get_default_priority(
471 order=0, from_config=False
472 )
473 cve_db_manager.add_db(sbom_annot_db, sbom_annot_prio)
475 # Initialize and Index databases
476 cve_db_manager.create_index()
478 # Initialize and configure export
479 export_mng = ExportManager(sbom, cve_db_manager)
481 for exp_type, exp_path in zip(args.export_type, args.export_path, strict=True):
482 export_mng.add_exporter(ExportTypeRegistry().create(exp_type, sbom, exp_path))
484 if args.export_filter_vulnerable:
485 export_mng.cfg_filter_status(
486 [CveVexStatus.UNDER_INVESTIGATION, CveVexStatus.AFFECTED]
487 )
489 if args.export_filter_vex_status:
490 export_mng.cfg_filter_status(
491 [CveVexStatus(n) for n in args.export_filter_vex_status]
492 )
494 export_mng.cfg_filter_rejected_cve(not args.export_list_rejected_cve)
495 export_mng.cfg_filter_no_cvss_score(args.export_filter_cve_without_cvss_score)
496 export_mng.cfg_filter_cvss_score(args.export_filter_cve_min_cvss_score)
497 export_mng.cfg_filter_missing_versions(args.export_filter_cve_without_versions)
498 export_mng.cfg_add_kernel_modules(args.export_add_kernel_modules)
500 # And finally generate the export
501 export_mng.process_export()