Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/sbom/lib_spdx3.py: 88%

550 statements  

« prev     ^ index     » next       coverage.py v7.11.1, created at 2025-11-28 15:37 +0100

1# -*- coding: utf-8 -*- 

2# SPDX-License-Identifier: GPL-2.0-only 

3 

4import hashlib 

5import pathlib 

6import re 

7import uuid 

8from collections import defaultdict 

9from collections.abc import Generator, Iterable 

10from datetime import UTC, datetime 

11from typing import Any, TypeVar, Union, cast 

12 

13from spdx_python_model.bindings import v3_0_1 as spdx30 # type: ignore[import-untyped] 

14 

15from ..utils.reproducible import ReproducibleDateTime 

16from ..vuln.cpe import Cpe23 

17from ..vuln.cve import ( 

18 CveInfo, 

19 CveVexAffectedAssessment, 

20 CveVexAssessment, 

21 CveVexFixedAssessment, 

22 CveVexJustification, 

23 CveVexNotAffectedAssessment, 

24 CveVexUnderInvestigationAssessment, 

25) 

26from ..vuln.cvss import CvssMetric, CvssSeverity, CvssVersion 

27 

28SPDX_VERSION = "3.0.1" 

29 

30ListLikeSHACLObject = Union[ # noqa: UP007 

31 set[spdx30.SHACLObject], list[spdx30.SHACLObject], tuple[spdx30.SHACLObject] 

32] 

33 

34_RelationshipT = TypeVar("_RelationshipT", bound=spdx30.Relationship) 

35 

36 

37def spdxid_hash(*items: Any) -> str: 

38 h = hashlib.md5() 

39 for i in items: 

40 if isinstance(i, spdx30.Element): 

41 # noinspection PyProtectedMember 

42 h.update(i._id.encode("utf-8")) 

43 else: 

44 h.update(repr(i).encode("utf-8")) 

45 return h.hexdigest() 

46 

47 

48def to_list( 

49 lst: ListLikeSHACLObject, 

50) -> list[spdx30.SHACLObject] | tuple[spdx30.SHACLObject]: 

51 if isinstance(lst, set): 

52 lst = sorted(lst) 

53 

54 if not isinstance(lst, (list, tuple)): 

55 raise TypeError(f"Must be a list or tuple. Got {(type(lst))}") 

56 

57 return lst 

58 

59 

60def cvss_severity_from_spdx3_type(severity: str) -> CvssSeverity: 

61 for name, url in spdx30.security_CvssSeverityType.NAMED_INDIVIDUALS.items(): 

62 if severity == url: 

63 return CvssSeverity(name) 

64 raise ValueError(f"Unexpected CVSS spdx 3 severity: {severity}") 

65 

66 

67def cve_vex_justification_from_str(s: str) -> CveVexJustification | None: 

68 map_justification: dict[str, CveVexJustification] = { 

69 "componentNotPresent": CveVexJustification.COMPONENT_NOT_PRESENT, 

70 "vulnerableCodeNotPresent": CveVexJustification.VULNERABLE_CODE_NOT_PRESENT, 

71 "inlineMitigationsAlreadyExist": 

72 CveVexJustification.INLINE_MITIGATIONS_ALREADY_EXIST, 

73 "vulnerableCodeCannotBeControlledByAdversary": 

74 CveVexJustification.VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY, 

75 "vulnerableCodeNotInExecutePath": 

76 CveVexJustification.VULNERABLE_CODE_NOT_IN_EXECUTE_PATH, 

77 } # fmt: skip 

78 

79 return map_justification.get(s) 

80 

81 

82def cve_vex_justification_to_str(s: CveVexJustification) -> str: 

83 map_justification: dict[CveVexJustification, str] = { 

84 CveVexJustification.COMPONENT_NOT_PRESENT: "componentNotPresent", 

85 CveVexJustification.VULNERABLE_CODE_NOT_PRESENT: "vulnerableCodeNotPresent", 

86 CveVexJustification.INLINE_MITIGATIONS_ALREADY_EXIST: 

87 "inlineMitigationsAlreadyExist", 

88 CveVexJustification.VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY: 

89 "vulnerableCodeCannotBeControlledByAdversary", 

90 CveVexJustification.VULNERABLE_CODE_NOT_IN_EXECUTE_PATH: 

91 "vulnerableCodeNotInExecutePath", 

92 } # fmt: skip 

93 

94 return map_justification[s] 

95 

96 

97def cve_vex_assessment_from_vuln_relationship( 

98 r: spdx30.security_VulnAssessmentRelationship, 

99) -> CveVexAssessment | None: 

100 if not isinstance(r, spdx30.security_VexVulnAssessmentRelationship): 

101 return None 

102 

103 vex_version = r.security_vexVersion 

104 status_notes = r.security_statusNotes 

105 

106 if isinstance(r, spdx30.security_VexFixedVulnAssessmentRelationship): 

107 return CveVexFixedAssessment(vex_version=vex_version, status_notes=status_notes) 

108 

109 if isinstance(r, spdx30.security_VexAffectedVulnAssessmentRelationship): 

110 return CveVexAffectedAssessment( 

111 vex_version=vex_version, 

112 status_notes=status_notes, 

113 action_statement=r.security_actionStatement, 

114 action_statement_time=r.security_actionStatementTime, 

115 ) 

116 

117 if isinstance(r, spdx30.security_VexNotAffectedVulnAssessmentRelationship): 

118 return CveVexNotAffectedAssessment( 

119 vex_version=vex_version, 

120 status_notes=status_notes, 

121 impact_statement=r.security_impactStatement, 

122 impact_statement_time=r.security_impactStatementTime, 

123 justification=cve_vex_justification_from_str(r.security_justificationType), 

124 ) 

125 

126 if isinstance(r, spdx30.security_VexUnderInvestigationVulnAssessmentRelationship): 

127 return CveVexUnderInvestigationAssessment( 

128 vex_version=vex_version, status_notes=status_notes 

129 ) 

130 

131 return None 

132 

133 

134def cvss_metric_from_vuln_relationship( 

135 r: spdx30.security_VulnAssessmentRelationship, 

136) -> CvssMetric | None: 

137 cvss_ver = None 

138 

139 if isinstance(r, spdx30.security_CvssV2VulnAssessmentRelationship): 

140 cvss_ver = CvssVersion.V2_0 

141 elif isinstance(r, spdx30.security_CvssV3VulnAssessmentRelationship): 

142 if r.security_vectorString.startswith("CVSS:3.0/"): 

143 cvss_ver = CvssVersion.V3_0 

144 elif r.security_vectorString.startswith("CVSS:3.1/"): 

145 cvss_ver = CvssVersion.V3_1 

146 elif isinstance( 

147 r, spdx30.security_CvssV4VulnAssessmentRelationship 

148 ) and r.security_vectorString.startswith("CVSS:4.0/"): 

149 cvss_ver = CvssVersion.V4_0 

150 

151 if cvss_ver is not None: 

152 severity = ( 

153 cvss_severity_from_spdx3_type(r.security_severity) 

154 if cvss_ver != CvssVersion.V2_0 

155 else None 

156 ) 

157 return CvssMetric( 

158 cvss_ver=cvss_ver, 

159 score=r.security_score, 

160 vector_str=r.security_vectorString, 

161 severity=severity, 

162 ) 

163 return None 

164 

165 

166def vuln_get_cve_id(vuln: spdx30.security_Vulnerability) -> str | None: 

167 for ext_id in vuln.externalIdentifier: 

168 if ext_id.externalIdentifierType == spdx30.ExternalIdentifierType.cve: 

169 cve_id = ext_id.identifier 

170 if isinstance(cve_id, str): 

171 return cve_id 

172 return None 

173 

174 

175def _creation_info_cmp_key(obj: spdx30.CreationInfo) -> tuple[Any, ...]: 

176 return ( 

177 obj.created or datetime.fromtimestamp(0, UTC), 

178 tuple(obj.createdBy), 

179 tuple(obj.createdUsing), 

180 obj.comment or "", 

181 ) 

182 

183 

184def __creation_info_lt(self: spdx30.CreationInfo, other: object) -> bool: 

185 if not isinstance(other, spdx30.CreationInfo): 

186 return bool(super(spdx30.CreationInfo, self).__lt__(other)) 

187 return _creation_info_cmp_key(self) < _creation_info_cmp_key(other) 

188 

189 

190# Hack to have repeatability in SPDX generation 

191spdx30.CreationInfo.__lt__ = __creation_info_lt 

192 

193 

194class ObjectSet(spdx30.SHACLObjectSet): # type: ignore[misc] 

195 SPDX_UUID_NAMESPACE = "sbom-cve-check" 

196 SPDX_NAMESPACE_PREFIX = "http://spdx.org/spdxdocs" 

197 

198 def __init__(self) -> None: 

199 super().__init__() 

200 

201 # Unique spdx document and sbom object contained into this graph 

202 self._doc: spdx30.SpdxDocument | None = None 

203 self._sbom: spdx30.software_Sbom | None = None 

204 

205 # Map (cache) between 'from' Relationship and all associated Relationships 

206 self._map_relation: dict[spdx30.SHACLObject, set[spdx30.Relationship]] = ( 

207 defaultdict(set) 

208 ) 

209 # Map (cache) between 'to' Relationship and all associated Relationships 

210 self._map_relation_rev: dict[spdx30.SHACLObject, set[spdx30.Relationship]] = ( 

211 defaultdict(set) 

212 ) 

213 # Map (cache) between CVE identifier and associated vulnerabilities 

214 self._map_cve_vulns: dict[str, set[spdx30.security_Vulnerability]] = ( 

215 defaultdict(set) 

216 ) 

217 # Map (cache) between a package and associated vulnerabilities and relationships 

218 self._map_pkg_vulns: dict[ 

219 spdx30.software_Package, 

220 dict[ 

221 spdx30.security_Vulnerability, 

222 set[spdx30.security_VulnAssessmentRelationship], 

223 ], 

224 ] = defaultdict(dict) 

225 

226 def __add_assess_to_map_pkg_vulns( 

227 self, p: spdx30.software_Package, rel: spdx30.Relationship 

228 ) -> None: 

229 if rel.relationshipType == spdx30.RelationshipType.hasAssociatedVulnerability: 

230 for vuln in rel.to: 

231 assessment_rels = self._map_pkg_vulns[p].setdefault(vuln, set()) 

232 for assessment in self._map_relation.get(vuln, []): 

233 if ( 

234 isinstance( 

235 assessment, 

236 spdx30.security_VulnAssessmentRelationship, 

237 ) 

238 and p in assessment.to 

239 ): 

240 assessment_rels.add(assessment) 

241 

242 def _update_map_cache(self) -> None: 

243 self._map_relation.clear() 

244 self._map_relation_rev.clear() 

245 self._map_cve_vulns.clear() 

246 

247 for r in self.foreach_type(spdx30.Relationship): 

248 self._map_relation[r.from_].add(r) 

249 for o in r.to: 

250 self._map_relation_rev[o].add(r) 

251 

252 for v in self.foreach_type(spdx30.security_Vulnerability): 

253 cve_id = vuln_get_cve_id(v) 

254 if cve_id: 

255 self._map_cve_vulns[cve_id].add(v) 

256 

257 for p in self.iterate_install_packages(): 

258 rels = self._map_relation.get(p) 

259 if not rels: 

260 continue 

261 

262 for rel in rels: 

263 self.__add_assess_to_map_pkg_vulns(p, rel) 

264 

265 def _init_parsed_document(self) -> None: 

266 for d in self.foreach_type(spdx30.SpdxDocument): 

267 self._doc = d 

268 break 

269 

270 if not self._doc: 

271 raise ValueError("Graph is missing an SpdxDocument node") 

272 

273 self._sbom = None 

274 if len(self._doc.rootElement) == 1: 

275 sbom = self._doc.rootElement[0] 

276 if isinstance(sbom, spdx30.software_Sbom): 

277 self._sbom = sbom 

278 

279 if not self._sbom: 

280 raise ValueError("SpdxDocument does not contain an Sbom node") 

281 

282 self._update_map_cache() 

283 

284 @classmethod 

285 def parse_jsonld(cls, path_spdx: str | pathlib.Path) -> "ObjectSet": 

286 object_set = cls() 

287 with pathlib.Path(path_spdx).open() as f: 

288 d = spdx30.JSONLDDeserializer() 

289 d.read(f, object_set) 

290 object_set._init_parsed_document() 

291 return object_set 

292 

293 def _cleanup_creation_info(self) -> None: 

294 refed_ci: dict[tuple[Any, ...], spdx30.CreationInfo] = {} 

295 insta_ci: set[spdx30.CreationInfo] = set() 

296 

297 for o in self.objects: 

298 if isinstance(o, spdx30.CreationInfo): 

299 insta_ci.add(o) 

300 elif isinstance(o, spdx30.Element): 

301 ci: spdx30.CreationInfo = o.creationInfo 

302 if ci: 

303 key = _creation_info_cmp_key(ci) 

304 similar_ci = refed_ci.get(key) 

305 if similar_ci is not None: 

306 o.creationInfo = similar_ci 

307 else: 

308 refed_ci[key] = ci 

309 

310 insta_ci -= set(refed_ci.values()) 

311 self.objects -= insta_ci 

312 

313 def write_to_jsonld(self, path_spdx: str | pathlib.Path) -> None: 

314 self._cleanup_creation_info() 

315 with pathlib.Path(path_spdx).open("wb") as f: 

316 s = spdx30.JSONLDInlineSerializer() 

317 s.write(self, f) 

318 

319 def add(self, obj: spdx30.SHACLObject) -> spdx30.SHACLObject: 

320 if isinstance(obj, spdx30.Relationship): 

321 self._map_relation[obj.from_].add(obj) 

322 for o in obj.to: 

323 self._map_relation_rev[o].add(obj) 

324 

325 if isinstance(obj, spdx30.security_Vulnerability): 

326 cve_id = vuln_get_cve_id(obj) 

327 if cve_id: 

328 self._map_cve_vulns[cve_id].add(obj) 

329 

330 return super().add(obj) 

331 

332 @staticmethod 

333 def _remove_from_relation_map( 

334 rel_map: dict[spdx30.SHACLObject, set[spdx30.Relationship]], 

335 key: spdx30.SHACLObject, 

336 val: spdx30.Relationship, 

337 ) -> None: 

338 rels = rel_map.get(key) 

339 if rels: 

340 rels.discard(val) 

341 if rels: 

342 del rel_map[key] 

343 

344 def _remove(self, obj: spdx30.SHACLObject) -> None: 

345 if not isinstance(obj, spdx30.Relationship): 

346 raise TypeError(f"Removing this kind of object is not supported: {obj}") 

347 

348 self._remove_from_relation_map(self._map_relation, obj.from_, obj) 

349 for o in obj.to: 

350 self._remove_from_relation_map(self._map_relation_rev, o, obj) 

351 

352 self.objects.discard(obj) 

353 

354 def remove(self, obj: spdx30.SHACLObject) -> None: 

355 self._remove(obj) 

356 self.create_index() 

357 

358 def iterate_builds(self) -> Generator[spdx30.build_Build, None, None]: 

359 yield from self.foreach_type(spdx30.build_Build) 

360 

361 def iterate_install_packages( 

362 self, 

363 ) -> Generator[spdx30.software_Package, None, None]: 

364 return ( 

365 p 

366 for p in self.foreach_type(spdx30.software_Package) 

367 if p.software_primaryPurpose == spdx30.software_SoftwarePurpose.install 

368 ) 

369 

370 def find_package_by_cpe( 

371 self, cpe: str 

372 ) -> Generator[spdx30.software_Package, None, None]: 

373 for p in self.foreach_type(spdx30.software_Package): 

374 for e in p.externalIdentifier: 

375 if e.identifier == cpe: 

376 yield p 

377 

378 def find_package_by_name( 

379 self, name: str 

380 ) -> Generator[spdx30.software_Package, None, None]: 

381 for p in self.foreach_type(spdx30.software_Package): 

382 if p.name == name: 

383 yield p 

384 

385 @staticmethod 

386 def list_cpe23(element: spdx30.Element) -> set[Cpe23]: 

387 cpes = set() 

388 for e in element.externalIdentifier: 

389 if e.externalIdentifierType == spdx30.ExternalIdentifierType.cpe23: 

390 cpe = Cpe23.parse(e.identifier) 

391 if cpe is not None: 

392 cpes.add(cpe) 

393 return cpes 

394 

395 def get_package_vulns( 

396 self, pkg: spdx30.software_Package 

397 ) -> dict[ 

398 spdx30.security_Vulnerability, set[spdx30.security_VulnAssessmentRelationship] 

399 ]: 

400 """Get the list of Vulnerability associated with this package""" 

401 return self._map_pkg_vulns.get(pkg, {}) 

402 

403 def get_package_vuln( 

404 self, pkg: spdx30.software_Package, cve_id: str 

405 ) -> tuple[ 

406 spdx30.security_Vulnerability | None, 

407 set[spdx30.security_VulnAssessmentRelationship], 

408 ]: 

409 """Get the Vulnerability, associated with this package, with this CVE id""" 

410 for vuln, rels in self._map_pkg_vulns.get(pkg, {}).items(): 

411 if vuln_get_cve_id(vuln) == cve_id: 

412 return vuln, rels 

413 return None, set() 

414 

415 def get_all_vulns( 

416 self, 

417 ) -> dict[ 

418 spdx30.security_Vulnerability, 

419 dict[spdx30.software_Package, set[spdx30.security_VulnAssessmentRelationship]], 

420 ]: 

421 """ 

422 Get the list of all Vulnerabilities, with associated packages and assessments 

423 """ 

424 vulns: dict[ 

425 spdx30.security_Vulnerability, 

426 dict[ 

427 spdx30.software_Package, set[spdx30.security_VulnAssessmentRelationship] 

428 ], 

429 ] = {} 

430 for pkg, vulns_rels in self._map_pkg_vulns.items(): 

431 for vuln, rels in vulns_rels.items(): 

432 pkgs_rels = vulns.setdefault(vuln, {}) 

433 pkgs_rels.setdefault(pkg, set()).update(rels) 

434 

435 return vulns 

436 

437 def get_package_vuln_assessment_rels( 

438 self, pkg: spdx30.software_Package, vuln: spdx30.security_Vulnerability 

439 ) -> set[spdx30.security_VulnAssessmentRelationship] | None: 

440 """ 

441 Get the vulnerability assessment relationships associated with a package and a 

442 vulnerability. 

443 :return: None if there is no relationship between this package and this 

444 vulnerability 

445 """ 

446 vulns_rels = self._map_pkg_vulns.get(pkg) 

447 if vulns_rels: 

448 return vulns_rels.get(vuln) 

449 return None 

450 

451 def get_build_recipe_packages( 

452 self, build_recipe: spdx30.build_Build 

453 ) -> set[spdx30.software_Package]: 

454 pkgs: set[spdx30.software_Package] = set() 

455 for rel in self._map_relation.get(build_recipe, []): 

456 if ( 

457 isinstance(rel, spdx30.LifecycleScopedRelationship) 

458 and rel.relationshipType == spdx30.RelationshipType.hasOutput 

459 ): 

460 for pkg in rel.to: 

461 if ( 

462 isinstance(pkg, spdx30.software_Package) 

463 and pkg.software_primaryPurpose 

464 == spdx30.software_SoftwarePurpose.install 

465 ): 

466 pkgs.add(pkg) 

467 return pkgs 

468 

469 def get_build_recipe_sources( 

470 self, build_recipe: spdx30.build_Build 

471 ) -> set[spdx30.software_SoftwareArtifact]: 

472 pkg_sources: set[spdx30.software_SoftwareArtifact] = set() 

473 for rel in self._map_relation.get(build_recipe, []): 

474 if ( 

475 isinstance(rel, spdx30.LifecycleScopedRelationship) 

476 and rel.relationshipType == spdx30.RelationshipType.hasInput 

477 ): 

478 for pkg_src_o in rel.to: 

479 if ( 

480 isinstance(pkg_src_o, spdx30.software_SoftwareArtifact) 

481 and pkg_src_o.software_primaryPurpose 

482 == spdx30.software_SoftwarePurpose.source 

483 ): 

484 pkg_sources.add(pkg_src_o) 

485 return pkg_sources 

486 

487 @classmethod 

488 def _get_namespace(cls, pn: str) -> str: 

489 namespace_uuid = uuid.uuid5(uuid.NAMESPACE_DNS, cls.SPDX_UUID_NAMESPACE) 

490 return f"{cls.SPDX_NAMESPACE_PREFIX}/{uuid.uuid5(namespace_uuid, pn)!s}" 

491 

492 @classmethod 

493 def new_spdxid(cls, *suffix: str, pn: str) -> str: 

494 items = [cls._get_namespace(pn)] 

495 items.extend(re.sub(r"[^a-zA-Z0-9_-]", "_", s) for s in suffix) 

496 return "/".join(items) 

497 

498 @classmethod 

499 def new_agent( 

500 cls, 

501 *, 

502 pn: str, 

503 agent_type: str, 

504 name: str, 

505 comment: str | None = None, 

506 creation_info: spdx30.CreationInfo | None = None, 

507 ) -> spdx30.Agent: 

508 spdxid = cls.new_spdxid("agent", name, pn=pn) 

509 

510 if agent_type == "person": 

511 agent = spdx30.Person(_id=spdxid, name=name) 

512 elif agent_type == "software": 

513 agent = spdx30.SoftwareAgent(_id=spdxid, name=name) 

514 elif agent_type == "organization": 

515 agent = spdx30.Organization(_id=spdxid, name=name) 

516 elif not agent_type or agent_type == "agent": 

517 agent = spdx30.Agent(_id=spdxid, name=name) 

518 else: 

519 raise ValueError(f"Unknown agent type: {agent_type}") 

520 

521 if creation_info is not None: 

522 agent.creationInfo = creation_info 

523 if comment: 

524 agent.comment = comment 

525 

526 return agent 

527 

528 @classmethod 

529 def new_tool( 

530 cls, *, pn: str, name: str, creation_info: spdx30.CreationInfo = None 

531 ) -> spdx30.Tool: 

532 tool = spdx30.Tool(_id=cls.new_spdxid("tool", name, pn=pn), name=name) 

533 if creation_info is not None: 

534 tool.creationInfo = creation_info 

535 return tool 

536 

537 @staticmethod 

538 def new_creation_info( 

539 *, tools: list[spdx30.Tool], agents: list[spdx30.Agent] 

540 ) -> spdx30.CreationInfo: 

541 creation_info = spdx30.CreationInfo() 

542 

543 for tool in tools: 

544 if not tool.creationInfo: 

545 tool.creationInfo = creation_info 

546 

547 for agent in agents: 

548 if not agent.creationInfo: 

549 agent.creationInfo = creation_info 

550 

551 creation_info.created = ReproducibleDateTime().now 

552 creation_info.specVersion = SPDX_VERSION 

553 creation_info.createdBy = agents 

554 creation_info.createdUsing = tools 

555 

556 return creation_info 

557 

558 @classmethod 

559 def new_objset(cls, name: str) -> "ObjectSet": 

560 object_set = cls() 

561 

562 document = spdx30.SpdxDocument( 

563 _id=object_set.new_spdxid("document", pn=name), 

564 name=name, 

565 ) 

566 

567 document.profileConformance = [ 

568 spdx30.ProfileIdentifierType.build, 

569 spdx30.ProfileIdentifierType.core, 

570 spdx30.ProfileIdentifierType.security, 

571 spdx30.ProfileIdentifierType.simpleLicensing, 

572 spdx30.ProfileIdentifierType.software, 

573 ] 

574 

575 object_set._doc = document 

576 object_set.add(document) 

577 

578 return object_set 

579 

580 def set_doc_creation_info(self, creation_info: spdx30.CreationInfo) -> None: 

581 for tool in creation_info.createdUsing: 

582 self.add(tool) 

583 

584 for agent in creation_info.createdBy: 

585 self.add(agent) 

586 

587 self.add(creation_info) 

588 assert self._doc is not None 

589 self._doc.creationInfo = creation_info 

590 

591 @property 

592 def doc_creation_info(self) -> spdx30.CreationInfo: 

593 assert self._doc is not None 

594 return self._doc.creationInfo 

595 

596 def add_sbom_as_doc_root_element(self, name: str) -> None: 

597 assert self._sbom is None 

598 self._sbom = spdx30.software_Sbom( 

599 _id=self.new_spdxid("sbom", pn=name), 

600 name=name, 

601 creationInfo=self.doc_creation_info, 

602 software_sbomType=[spdx30.software_SbomType.build], 

603 ) 

604 

605 self.add(self._sbom) 

606 assert self._doc is not None 

607 self._doc.rootElement = [self._sbom] 

608 

609 def add_sbom_root_element(self, e: spdx30.Element) -> None: 

610 assert self._sbom is not None 

611 self._sbom.rootElement.append(e) 

612 

613 def _new_relationship( 

614 self, 

615 cls: type[_RelationshipT], 

616 from_: ListLikeSHACLObject, 

617 typ: str, 

618 to: ListLikeSHACLObject, 

619 *, 

620 pn: str, 

621 spdxid_name: str = "relationship", 

622 **props: object, 

623 ) -> list[_RelationshipT]: 

624 from_ = to_list(from_) 

625 to = to_list(to) 

626 

627 if not from_: 

628 return [] 

629 

630 if not to: 

631 to = [spdx30.IndividualElement.NoneElement] 

632 

633 ret = [] 

634 

635 for f in from_: 

636 hash_args: list[object] = [typ, f] 

637 for _, p in sorted(props.items(), key=lambda t: t[0]): 

638 hash_args.append(p) 

639 hash_args.extend(to) 

640 

641 relationship = self.add( 

642 cls( 

643 _id=self.new_spdxid(spdxid_name, spdxid_hash(*hash_args), pn=pn), 

644 creationInfo=self.doc_creation_info, 

645 from_=f, 

646 relationshipType=typ, 

647 to=to, 

648 **props, 

649 ) 

650 ) 

651 ret.append(relationship) 

652 

653 return ret 

654 

655 def new_relationship( 

656 self, from_: ListLikeSHACLObject, typ: str, to: ListLikeSHACLObject, *, pn: str 

657 ) -> list[spdx30.Relationship]: 

658 return self._new_relationship(spdx30.Relationship, from_, typ, to, pn=pn) 

659 

660 def new_scoped_relationship( 

661 self, 

662 from_: ListLikeSHACLObject, 

663 typ: str, # RelationshipType 

664 scope: str, # LifecycleScopeType 

665 to: ListLikeSHACLObject, 

666 *, 

667 pn: str, 

668 ) -> list[spdx30.LifecycleScopedRelationship]: 

669 return self._new_relationship( 

670 spdx30.LifecycleScopedRelationship, from_, typ, to, scope=scope, pn=pn 

671 ) 

672 

673 def update_cve_vuln( 

674 self, vuln: spdx30.security_Vulnerability, cve_info: CveInfo 

675 ) -> None: 

676 updated = False 

677 

678 # Update description 

679 if cve_info.description and (vuln.description != cve_info.description): 

680 vuln.description = cve_info.description 

681 updated = True 

682 

683 # Add new external references 

684 map_ext_refs = {} 

685 for ext_ref in cve_info.references: 

686 map_ext_refs[ext_ref.url] = ext_ref 

687 

688 for ext_ref in vuln.externalRef: 

689 for locator in ext_ref.locator: 

690 map_ext_refs.pop(locator, None) 

691 

692 for ext_ref in map_ext_refs.values(): 

693 props = ( 

694 {} 

695 if ext_ref.ref_type is None 

696 else {"externalRefType": ext_ref.ref_type} 

697 ) 

698 vuln.externalRef.append(spdx30.ExternalRef(locator=[ext_ref.url], **props)) 

699 updated = True 

700 

701 # Update last modification date time 

702 if cve_info.date_modified is not None: 

703 date_modified = cve_info.date_modified.astimezone(UTC) 

704 if date_modified != vuln.security_modifiedTime: 

705 vuln.security_modifiedTime = date_modified 

706 updated = True 

707 

708 # Update published date time 

709 if cve_info.date_published is not None: 

710 date_published = cve_info.date_published.astimezone(UTC) 

711 if date_published != vuln.security_publishedTime: 

712 vuln.security_publishedTime = date_published 

713 updated = True 

714 

715 if updated: 

716 vuln.creationInfo = self.doc_creation_info 

717 

718 def new_cve_vuln( 

719 self, cve_info: CveInfo, *, pn: str 

720 ) -> spdx30.security_Vulnerability: 

721 cve_id = cve_info.cve_id.id 

722 vuln = spdx30.security_Vulnerability( 

723 _id=self.new_spdxid("vulnerability", cve_id, pn=pn) 

724 ) 

725 vuln.creationInfo = self.doc_creation_info 

726 if cve_info.description is not None: 

727 vuln.description = cve_info.description 

728 

729 if cve_info.date_modified is not None: 

730 vuln.security_modifiedTime = cve_info.date_modified.astimezone(UTC) 

731 

732 if cve_info.date_published is not None: 

733 vuln.security_publishedTime = cve_info.date_published.astimezone(UTC) 

734 

735 vuln.externalIdentifier.append( 

736 spdx30.ExternalIdentifier( 

737 externalIdentifierType=spdx30.ExternalIdentifierType.cve, 

738 identifier=cve_id, 

739 identifierLocator=[ 

740 f"https://cveawg.mitre.org/api/cve/{cve_id}", 

741 f"https://www.cve.org/CVERecord?id={cve_id}", 

742 ], 

743 ) 

744 ) 

745 

746 for ext_ref in cve_info.references: 

747 props = ( 

748 {} 

749 if ext_ref.ref_type is None 

750 else {"externalRefType": ext_ref.ref_type} 

751 ) 

752 vuln.externalRef.append(spdx30.ExternalRef(locator=[ext_ref.url], **props)) 

753 

754 return vuln 

755 

756 @staticmethod 

757 def _props_for_vex_assessment(assessment: CveVexAssessment) -> dict[str, Any]: 

758 props: dict[str, Any] = {} 

759 

760 if assessment.vex_version is not None: 

761 props["security_vexVersion"] = assessment.vex_version 

762 if assessment.status_notes is not None: 

763 props["security_statusNotes"] = assessment.status_notes 

764 

765 if isinstance(assessment, CveVexAffectedAssessment): 

766 props["security_actionStatement"] = assessment.action_statement 

767 if assessment.action_statement_time is not None: 

768 props["security_actionStatementTime"] = ( 

769 assessment.action_statement_time.astimezone(tz=UTC) 

770 ) 

771 

772 elif isinstance(assessment, CveVexNotAffectedAssessment): 

773 props["security_impactStatement"] = assessment.impact_statement 

774 if assessment.impact_statement_time is not None: 

775 props["security_impactStatementTime"] = ( 

776 assessment.impact_statement_time.astimezone(tz=UTC) 

777 ) 

778 if assessment.justification is not None: 

779 props["security_justificationType"] = ( 

780 spdx30.security_VexJustificationType.NAMED_INDIVIDUALS[ 

781 cve_vex_justification_to_str(assessment.justification) 

782 ] 

783 ) 

784 

785 return props 

786 

787 def update_vex_assessment_relationship( 

788 self, 

789 rel: spdx30.security_VexVulnAssessmentRelationship, 

790 assessment: CveVexAssessment, 

791 ) -> None: 

792 props = self._props_for_vex_assessment(assessment) 

793 if props: 

794 updated = False 

795 for key, val in props.items(): 

796 if getattr(rel, key) != val: 

797 setattr(rel, key, val) 

798 updated = True 

799 

800 if updated: 

801 rel.creationInfo = self.doc_creation_info 

802 

803 def new_vex_assessment_relationship( 

804 self, 

805 from_: ListLikeSHACLObject, 

806 to: ListLikeSHACLObject, 

807 assessment: CveVexAssessment, 

808 *, 

809 pn: str, 

810 ) -> list[spdx30.security_VexVulnAssessmentRelationship]: 

811 if isinstance(assessment, CveVexFixedAssessment): 

812 cls = spdx30.security_VexFixedVulnAssessmentRelationship 

813 rel_typ = spdx30.RelationshipType.fixedIn 

814 spdxid_name = "vex-fixed" 

815 

816 elif isinstance(assessment, CveVexAffectedAssessment): 

817 cls = spdx30.security_VexAffectedVulnAssessmentRelationship 

818 rel_typ = spdx30.RelationshipType.affects 

819 spdxid_name = "vex-affected" 

820 

821 elif isinstance(assessment, CveVexNotAffectedAssessment): 

822 cls = spdx30.security_VexNotAffectedVulnAssessmentRelationship 

823 rel_typ = spdx30.RelationshipType.doesNotAffect 

824 spdxid_name = "vex-not-affected" 

825 

826 elif isinstance(assessment, CveVexUnderInvestigationAssessment): 

827 cls = spdx30.security_VexUnderInvestigationVulnAssessmentRelationship 

828 rel_typ = spdx30.RelationshipType.underInvestigationFor 

829 spdxid_name = "vex-under-invest" 

830 

831 else: 

832 raise TypeError(f"Unexpected assessment type: {assessment.status}") 

833 

834 props = self._props_for_vex_assessment(assessment) 

835 

836 return self._new_relationship( 

837 cls, from_, rel_typ, to, spdxid_name=spdxid_name, **props, pn=pn 

838 ) 

839 

840 def new_cvss_vuln_assessment_relationship( 

841 self, 

842 from_: ListLikeSHACLObject, 

843 to: ListLikeSHACLObject, 

844 cvss_metric: CvssMetric, 

845 *, 

846 pn: str, 

847 ) -> list[spdx30.security_VulnAssessmentRelationship]: 

848 if cvss_metric.cvss_ver == CvssVersion.V2_0: 

849 cls = spdx30.security_CvssV2VulnAssessmentRelationship 

850 elif cvss_metric.cvss_ver in (CvssVersion.V3_0, CvssVersion.V3_1): 

851 cls = spdx30.security_CvssV3VulnAssessmentRelationship 

852 elif cvss_metric.cvss_ver == CvssVersion.V4_0: 

853 cls = spdx30.security_CvssV4VulnAssessmentRelationship 

854 else: 

855 raise ValueError(f"Unexpected CVSS version: {cvss_metric.cvss_ver}") 

856 

857 props = {} 

858 if cvss_metric.cvss_ver != CvssVersion.V2_0: 

859 assert cvss_metric.severity is not None 

860 props["security_severity"] = ( 

861 spdx30.security_CvssSeverityType.NAMED_INDIVIDUALS[ 

862 cvss_metric.severity.value 

863 ] 

864 ) 

865 

866 if cvss_metric.source is not None: 

867 props["comment"] = cvss_metric.source 

868 

869 return self._new_relationship( 

870 cls, 

871 from_, 

872 spdx30.RelationshipType.hasAssessmentFor, 

873 to, 

874 spdxid_name=f"cvss-{cvss_metric.cvss_ver.name}", 

875 security_score=cvss_metric.score, 

876 security_vectorString=cvss_metric.vector_str, 

877 **props, 

878 pn=pn, 

879 ) 

880 

881 def _create_vex_vuln_assessment_relationship( 

882 self, 

883 package: spdx30.software_Package, 

884 vuln: spdx30.security_Vulnerability, 

885 assessment: CveVexAssessment, 

886 rels: set[spdx30.security_VulnAssessmentRelationship], 

887 ) -> None: 

888 vex_rel: spdx30.security_VexVulnAssessmentRelationship | None = None 

889 

890 # Check current Vex Vulnerability Assessment Relationship 

891 for r in list(rels): 

892 a = cve_vex_assessment_from_vuln_relationship(r) 

893 if a is None: 

894 continue 

895 if (a.status == assessment.status) and (vex_rel is None): 

896 # Ok, only need to update properties 

897 vex_rel = cast("spdx30.security_VexVulnAssessmentRelationship", r) 

898 self.update_vex_assessment_relationship(vex_rel, assessment) 

899 else: 

900 # The status changed, or there is a duplicated relationship, 

901 # need to destroy this old relationship 

902 self.remove(r) 

903 rels.discard(r) 

904 

905 # If there was already a relationship with the appropriate status, return 

906 if vex_rel is not None: 

907 return 

908 

909 # Create a new Vex Vulnerability Assessment Relationship 

910 vex_rel = self.new_vex_assessment_relationship( 

911 from_=[vuln], to=[package], assessment=assessment, pn=package.name 

912 )[0] 

913 self.add(vex_rel) 

914 rels.add(vex_rel) 

915 

916 def _cleanup_cvss_vuln_assessment_relationship( 

917 self, 

918 cvss_metrics: Iterable[CvssMetric], 

919 rels: set[spdx30.security_VulnAssessmentRelationship], 

920 remove_old_metrics: bool, 

921 ) -> list[CvssMetric]: 

922 def _sort_rel(rel: spdx30.security_VulnAssessmentRelationship) -> int: 

923 return len(rel.comment or "") 

924 

925 # Build a lookup between the CVSS key and the CVSS relationship 

926 metrics_doc: dict[ 

927 tuple[Any, ...], 

928 list[spdx30.security_VulnAssessmentRelationship], 

929 ] = defaultdict(list) 

930 

931 for r in rels: 

932 m_from_r = cvss_metric_from_vuln_relationship(r) 

933 if m_from_r: 

934 metrics_doc[m_from_r.cmp_key()].append(r) 

935 

936 # Only append new metrics 

937 if not remove_old_metrics: 

938 return [m for m in cvss_metrics if (m.cmp_key() not in metrics_doc)] 

939 

940 # Remove old metrics if different that new ones 

941 metrics_to_add: list[CvssMetric] = [] 

942 for new_m in cvss_metrics: 

943 old_rels = metrics_doc.pop(new_m.cmp_key(), None) 

944 if not old_rels: 

945 metrics_to_add.append(new_m) 

946 continue 

947 

948 if len(old_rels) == 1: 

949 continue 

950 

951 del_rels = iter(sorted(old_rels, key=_sort_rel, reverse=True)) 

952 next(del_rels) 

953 for r in del_rels: 

954 self.remove(r) 

955 rels.discard(r) 

956 

957 # Remove old metrics left 

958 for old_rels in metrics_doc.values(): 

959 for r in old_rels: 

960 self.remove(r) 

961 rels.discard(r) 

962 

963 return metrics_to_add 

964 

965 def create_vex_vuln_assessment_relationship( 

966 self, 

967 package: spdx30.software_Package, 

968 vuln: spdx30.security_Vulnerability, 

969 assessment: CveVexAssessment, 

970 ) -> bool: 

971 # Check if there is already a relation between the package and the vulnerability 

972 assess_rels = self.get_package_vuln_assessment_rels(package, vuln) 

973 if assess_rels is None: 

974 return False 

975 

976 self._create_vex_vuln_assessment_relationship( 

977 package, vuln, assessment, assess_rels 

978 ) 

979 return True 

980 

981 def _create_cvss_vuln_assessment_relationship( 

982 self, 

983 package: spdx30.software_Package, 

984 vuln: spdx30.security_Vulnerability, 

985 cvss_metric: CvssMetric, 

986 rels: set[spdx30.security_VulnAssessmentRelationship], 

987 ) -> None: 

988 # Create a new CVSS Vulnerability Assessment Relationship 

989 r = self.new_cvss_vuln_assessment_relationship( 

990 from_=[vuln], to=[package], cvss_metric=cvss_metric, pn=package.name 

991 )[0] 

992 self.add(r) 

993 rels.add(r) 

994 

995 def create_cvss_vuln_assessment_relationship( 

996 self, 

997 package: spdx30.software_Package, 

998 vuln: spdx30.security_Vulnerability, 

999 cvss_metric: CvssMetric, 

1000 ) -> bool: 

1001 # Check if there is already a relation between the package and the vulnerability 

1002 assess_rels = self.get_package_vuln_assessment_rels(package, vuln) 

1003 if assess_rels is None: 

1004 return False 

1005 

1006 self._create_cvss_vuln_assessment_relationship( 

1007 package, vuln, cvss_metric, assess_rels 

1008 ) 

1009 return True 

1010 

1011 def add_cve_vulnerability( 

1012 self, 

1013 package: spdx30.software_Package, 

1014 cve_info: CveInfo, 

1015 *, 

1016 update_vuln_info: bool = True, 

1017 remove_old_metrics: bool = True, 

1018 ) -> None: 

1019 # Check if this CVE already exists 

1020 vulns_rels = self._map_pkg_vulns[package] 

1021 vulns_cve_id = self._map_cve_vulns[cve_info.cve_id.id] 

1022 if not vulns_cve_id: 

1023 # Create new CVE entry 

1024 vuln = self.new_cve_vuln(cve_info, pn=package.name) 

1025 self.add(vuln) 

1026 else: 

1027 # If there are duplicate CVE objects, try to find the right one 

1028 # Otherwise use the first one that was found. 

1029 if len(vulns_cve_id) == 1: 

1030 vuln = next(iter(vulns_cve_id)) 

1031 else: 

1032 for v in vulns_cve_id: 

1033 if v in vulns_rels: 

1034 vuln = v 

1035 break 

1036 else: 

1037 vuln = next(iter(vulns_cve_id)) 

1038 

1039 if update_vuln_info: 

1040 # Update CVE information using the ones provided in input 

1041 self.update_cve_vuln(vuln, cve_info) 

1042 

1043 # Search for a previous hasAssociatedVulnerability relationship, 

1044 # and create it if not already exists 

1045 assess_rels = vulns_rels.get(vuln) 

1046 if assess_rels is None: 

1047 for r in self._map_relation.get(package, []): 

1048 if ( 

1049 r.relationshipType 

1050 == spdx30.RelationshipType.hasAssociatedVulnerability 

1051 ): 

1052 r.to.append(vuln) 

1053 r.creationInfo = self.doc_creation_info 

1054 break 

1055 else: 

1056 r = self.new_relationship( 

1057 from_=[package], 

1058 typ=spdx30.RelationshipType.hasAssociatedVulnerability, 

1059 to=[vuln], 

1060 pn=package.name, 

1061 )[0] 

1062 self.add(r) 

1063 assess_rels = set() 

1064 vulns_rels[vuln] = assess_rels 

1065 

1066 # Create or update Vulnerability Assessment Relationships 

1067 vex_assess = cve_info.vex_assessment 

1068 if vex_assess is not None: 

1069 self._create_vex_vuln_assessment_relationship( 

1070 package, vuln, vex_assess, assess_rels 

1071 ) 

1072 

1073 cvss_metrics = self._cleanup_cvss_vuln_assessment_relationship( 

1074 cve_info.cvss_metrics, assess_rels, remove_old_metrics 

1075 ) 

1076 for cvss_metric in cvss_metrics: 

1077 self._create_cvss_vuln_assessment_relationship( 

1078 package, vuln, cvss_metric, assess_rels 

1079 ) 

1080 

1081 def remove_all_cve_vulnerability(self, cve_id: str) -> None: 

1082 # Check if this CVE exists 

1083 vulns_cve_id = self._map_cve_vulns.get(cve_id) 

1084 if vulns_cve_id is None: 

1085 return 

1086 

1087 # For each vulnerability with this CVE identifier 

1088 for vuln in vulns_cve_id: 

1089 # Remove all relationships found associated with this CVE 

1090 for r in list(self._map_relation.get(vuln, [])): 

1091 self._remove(r) 

1092 for r in list(self._map_relation_rev.get(vuln, [])): 

1093 self._remove(r) 

1094 

1095 # Update map between package and vulnerabilities 

1096 for vulns_rels in self._map_pkg_vulns.values(): 

1097 vulns_rels.pop(vuln, None) 

1098 

1099 # Recreate index 

1100 self.create_index()