Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/export/export_csv.py: 97%

73 statements  

« prev     ^ index     » next       coverage.py v7.11.1, created at 2025-11-28 15:37 +0100

1# -*- coding: utf-8 -*- 

2# SPDX-License-Identifier: GPL-2.0-only 

3 

4import csv 

5import pathlib 

6from _csv import Writer 

7from collections.abc import Generator 

8from enum import IntEnum 

9 

10from ..cve_db.annot_aggregate import AggregateAnnotEntry 

11from ..sbom.component import CompBuild 

12from ..sbom.sbom_base import Sbom 

13from ..vuln.cve import CveVexAffectedAssessment, CveVexNotAffectedAssessment 

14from ..vuln.cvss import CvssMetric, CvssVersion 

15from .export_base import BaseExport 

16from .registry import register_export 

17 

18_CsvRow = list[str | float] 

19 

20 

21class _Column(IntEnum): 

22 """ 

23 Map colum names to an integer, to be used for storing the corresponding 

24 information in the generated CSV's rows. 

25 Necessary because CSVs are organized as rows of 0-indexed fields, this 

26 integer will be the column's index in each row. 

27 """ 

28 

29 BUILD_NAME = 0 

30 PKG_NAMES = 1 

31 VERSION = 2 

32 PRODUCT_NAMES = 3 

33 CVE_ID = 4 

34 SCOREV2 = 5 

35 SCOREV3 = 6 

36 SCOREV4 = 7 

37 STATUS = 8 

38 STATEMENT = 9 

39 NOTES = 10 

40 OBSOLETES = 11 

41 NB_COLS = 12 

42 

43 

44def _group_cvss_metrics_by_col(metric: CvssMetric) -> _Column | None: 

45 if metric.cvss_ver == CvssVersion.V2_0: 

46 return _Column.SCOREV2 

47 if metric.cvss_ver in (CvssVersion.V3_0, CvssVersion.V3_1): 

48 return _Column.SCOREV3 

49 if metric.cvss_ver == CvssVersion.V4_0: 

50 return _Column.SCOREV4 

51 return None 

52 

53 

54def _gen_cve_info(row_common: _CsvRow, annotation: AggregateAnnotEntry) -> _CsvRow: 

55 row = row_common.copy() 

56 row[_Column.CVE_ID] = annotation.identifier.id 

57 

58 for cvss_col, metrics in annotation.group_cvss_metrics( 

59 key=_group_cvss_metrics_by_col 

60 ).items(): 

61 if cvss_col is not None: 

62 row[cvss_col] = metrics[0].score 

63 

64 assessment = annotation.vex_assessment 

65 row[_Column.STATUS] = str(assessment.status.value) 

66 

67 if isinstance(assessment, CveVexAffectedAssessment): 

68 row[_Column.STATEMENT] = assessment.action_statement 

69 elif isinstance(assessment, CveVexNotAffectedAssessment): 

70 row[_Column.STATEMENT] = assessment.impact_statement 

71 

72 row[_Column.NOTES] = assessment.status_notes or "" 

73 row[_Column.OBSOLETES] = ";\n".join(str(o) for o in annotation.obsolete_assessments) 

74 return row 

75 

76 

77@register_export("csv") 

78class CsvExport(BaseExport): 

79 """Export the CVE information in CSV format.""" 

80 

81 def __init__(self, sbom: Sbom, out_path: pathlib.Path) -> None: 

82 super().__init__(sbom, out_path) 

83 self._csv_writer: Writer | None = None 

84 

85 def start_export(self) -> Generator[None, None, None]: 

86 with self._out_path.open("w", newline="", encoding="utf-8") as f: 

87 self._csv_writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL) 

88 self._csv_writer.writerow( 

89 [ 

90 "build", 

91 "package names", 

92 "version", 

93 "vendor-product", 

94 "cve-id", 

95 "scorev2", 

96 "scorev3", 

97 "scorev4", 

98 "status", 

99 "statement", 

100 "notes", 

101 "obsolete assessments", 

102 ] 

103 ) 

104 yield 

105 

106 def export_comp_info( 

107 self, comp_build: CompBuild 

108 ) -> Generator[None, tuple[bool, AggregateAnnotEntry], None]: 

109 assert self._csv_writer is not None 

110 

111 if not comp_build.identifiers: 

112 return 

113 

114 row_common: _CsvRow = [""] * _Column.NB_COLS 

115 row_common[_Column.BUILD_NAME] = comp_build.build_name or "" 

116 row_common[_Column.PKG_NAMES] = "; ".join( 

117 comp.name for comp in self._filter_components(comp_build) 

118 ) 

119 row_common[_Column.VERSION] = comp_build.version 

120 row_common[_Column.PRODUCT_NAMES] = "; ".join( 

121 comp_id.to_str() for comp_id in comp_build.identifiers 

122 ) 

123 

124 while True: 

125 is_filtered, annotation = yield 

126 if not is_filtered: 

127 self._csv_writer.writerow(_gen_cve_info(row_common, annotation))