Coverage for /home/benjarobin/Bootlin/projects/Schneider-Electric-Senux/sbom-cve-check/src/sbom_cve_check/sbom/lib_spdx3.py: 88%
550 statements
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
« prev ^ index » next coverage.py v7.11.1, created at 2025-11-28 15:37 +0100
1# -*- coding: utf-8 -*-
2# SPDX-License-Identifier: GPL-2.0-only
4import hashlib
5import pathlib
6import re
7import uuid
8from collections import defaultdict
9from collections.abc import Generator, Iterable
10from datetime import UTC, datetime
11from typing import Any, TypeVar, Union, cast
13from spdx_python_model.bindings import v3_0_1 as spdx30 # type: ignore[import-untyped]
15from ..utils.reproducible import ReproducibleDateTime
16from ..vuln.cpe import Cpe23
17from ..vuln.cve import (
18 CveInfo,
19 CveVexAffectedAssessment,
20 CveVexAssessment,
21 CveVexFixedAssessment,
22 CveVexJustification,
23 CveVexNotAffectedAssessment,
24 CveVexUnderInvestigationAssessment,
25)
26from ..vuln.cvss import CvssMetric, CvssSeverity, CvssVersion
28SPDX_VERSION = "3.0.1"
30ListLikeSHACLObject = Union[ # noqa: UP007
31 set[spdx30.SHACLObject], list[spdx30.SHACLObject], tuple[spdx30.SHACLObject]
32]
34_RelationshipT = TypeVar("_RelationshipT", bound=spdx30.Relationship)
37def spdxid_hash(*items: Any) -> str:
38 h = hashlib.md5()
39 for i in items:
40 if isinstance(i, spdx30.Element):
41 # noinspection PyProtectedMember
42 h.update(i._id.encode("utf-8"))
43 else:
44 h.update(repr(i).encode("utf-8"))
45 return h.hexdigest()
48def to_list(
49 lst: ListLikeSHACLObject,
50) -> list[spdx30.SHACLObject] | tuple[spdx30.SHACLObject]:
51 if isinstance(lst, set):
52 lst = sorted(lst)
54 if not isinstance(lst, (list, tuple)):
55 raise TypeError(f"Must be a list or tuple. Got {(type(lst))}")
57 return lst
60def cvss_severity_from_spdx3_type(severity: str) -> CvssSeverity:
61 for name, url in spdx30.security_CvssSeverityType.NAMED_INDIVIDUALS.items():
62 if severity == url:
63 return CvssSeverity(name)
64 raise ValueError(f"Unexpected CVSS spdx 3 severity: {severity}")
67def cve_vex_justification_from_str(s: str) -> CveVexJustification | None:
68 map_justification: dict[str, CveVexJustification] = {
69 "componentNotPresent": CveVexJustification.COMPONENT_NOT_PRESENT,
70 "vulnerableCodeNotPresent": CveVexJustification.VULNERABLE_CODE_NOT_PRESENT,
71 "inlineMitigationsAlreadyExist":
72 CveVexJustification.INLINE_MITIGATIONS_ALREADY_EXIST,
73 "vulnerableCodeCannotBeControlledByAdversary":
74 CveVexJustification.VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY,
75 "vulnerableCodeNotInExecutePath":
76 CveVexJustification.VULNERABLE_CODE_NOT_IN_EXECUTE_PATH,
77 } # fmt: skip
79 return map_justification.get(s)
82def cve_vex_justification_to_str(s: CveVexJustification) -> str:
83 map_justification: dict[CveVexJustification, str] = {
84 CveVexJustification.COMPONENT_NOT_PRESENT: "componentNotPresent",
85 CveVexJustification.VULNERABLE_CODE_NOT_PRESENT: "vulnerableCodeNotPresent",
86 CveVexJustification.INLINE_MITIGATIONS_ALREADY_EXIST:
87 "inlineMitigationsAlreadyExist",
88 CveVexJustification.VULNERABLE_CODE_CANNOT_BE_CONTROLLED_BY_ADVERSARY:
89 "vulnerableCodeCannotBeControlledByAdversary",
90 CveVexJustification.VULNERABLE_CODE_NOT_IN_EXECUTE_PATH:
91 "vulnerableCodeNotInExecutePath",
92 } # fmt: skip
94 return map_justification[s]
97def cve_vex_assessment_from_vuln_relationship(
98 r: spdx30.security_VulnAssessmentRelationship,
99) -> CveVexAssessment | None:
100 if not isinstance(r, spdx30.security_VexVulnAssessmentRelationship):
101 return None
103 vex_version = r.security_vexVersion
104 status_notes = r.security_statusNotes
106 if isinstance(r, spdx30.security_VexFixedVulnAssessmentRelationship):
107 return CveVexFixedAssessment(vex_version=vex_version, status_notes=status_notes)
109 if isinstance(r, spdx30.security_VexAffectedVulnAssessmentRelationship):
110 return CveVexAffectedAssessment(
111 vex_version=vex_version,
112 status_notes=status_notes,
113 action_statement=r.security_actionStatement,
114 action_statement_time=r.security_actionStatementTime,
115 )
117 if isinstance(r, spdx30.security_VexNotAffectedVulnAssessmentRelationship):
118 return CveVexNotAffectedAssessment(
119 vex_version=vex_version,
120 status_notes=status_notes,
121 impact_statement=r.security_impactStatement,
122 impact_statement_time=r.security_impactStatementTime,
123 justification=cve_vex_justification_from_str(r.security_justificationType),
124 )
126 if isinstance(r, spdx30.security_VexUnderInvestigationVulnAssessmentRelationship):
127 return CveVexUnderInvestigationAssessment(
128 vex_version=vex_version, status_notes=status_notes
129 )
131 return None
134def cvss_metric_from_vuln_relationship(
135 r: spdx30.security_VulnAssessmentRelationship,
136) -> CvssMetric | None:
137 cvss_ver = None
139 if isinstance(r, spdx30.security_CvssV2VulnAssessmentRelationship):
140 cvss_ver = CvssVersion.V2_0
141 elif isinstance(r, spdx30.security_CvssV3VulnAssessmentRelationship):
142 if r.security_vectorString.startswith("CVSS:3.0/"):
143 cvss_ver = CvssVersion.V3_0
144 elif r.security_vectorString.startswith("CVSS:3.1/"):
145 cvss_ver = CvssVersion.V3_1
146 elif isinstance(
147 r, spdx30.security_CvssV4VulnAssessmentRelationship
148 ) and r.security_vectorString.startswith("CVSS:4.0/"):
149 cvss_ver = CvssVersion.V4_0
151 if cvss_ver is not None:
152 severity = (
153 cvss_severity_from_spdx3_type(r.security_severity)
154 if cvss_ver != CvssVersion.V2_0
155 else None
156 )
157 return CvssMetric(
158 cvss_ver=cvss_ver,
159 score=r.security_score,
160 vector_str=r.security_vectorString,
161 severity=severity,
162 )
163 return None
166def vuln_get_cve_id(vuln: spdx30.security_Vulnerability) -> str | None:
167 for ext_id in vuln.externalIdentifier:
168 if ext_id.externalIdentifierType == spdx30.ExternalIdentifierType.cve:
169 cve_id = ext_id.identifier
170 if isinstance(cve_id, str):
171 return cve_id
172 return None
175def _creation_info_cmp_key(obj: spdx30.CreationInfo) -> tuple[Any, ...]:
176 return (
177 obj.created or datetime.fromtimestamp(0, UTC),
178 tuple(obj.createdBy),
179 tuple(obj.createdUsing),
180 obj.comment or "",
181 )
184def __creation_info_lt(self: spdx30.CreationInfo, other: object) -> bool:
185 if not isinstance(other, spdx30.CreationInfo):
186 return bool(super(spdx30.CreationInfo, self).__lt__(other))
187 return _creation_info_cmp_key(self) < _creation_info_cmp_key(other)
190# Hack to have repeatability in SPDX generation
191spdx30.CreationInfo.__lt__ = __creation_info_lt
194class ObjectSet(spdx30.SHACLObjectSet): # type: ignore[misc]
195 SPDX_UUID_NAMESPACE = "sbom-cve-check"
196 SPDX_NAMESPACE_PREFIX = "http://spdx.org/spdxdocs"
198 def __init__(self) -> None:
199 super().__init__()
201 # Unique spdx document and sbom object contained into this graph
202 self._doc: spdx30.SpdxDocument | None = None
203 self._sbom: spdx30.software_Sbom | None = None
205 # Map (cache) between 'from' Relationship and all associated Relationships
206 self._map_relation: dict[spdx30.SHACLObject, set[spdx30.Relationship]] = (
207 defaultdict(set)
208 )
209 # Map (cache) between 'to' Relationship and all associated Relationships
210 self._map_relation_rev: dict[spdx30.SHACLObject, set[spdx30.Relationship]] = (
211 defaultdict(set)
212 )
213 # Map (cache) between CVE identifier and associated vulnerabilities
214 self._map_cve_vulns: dict[str, set[spdx30.security_Vulnerability]] = (
215 defaultdict(set)
216 )
217 # Map (cache) between a package and associated vulnerabilities and relationships
218 self._map_pkg_vulns: dict[
219 spdx30.software_Package,
220 dict[
221 spdx30.security_Vulnerability,
222 set[spdx30.security_VulnAssessmentRelationship],
223 ],
224 ] = defaultdict(dict)
226 def __add_assess_to_map_pkg_vulns(
227 self, p: spdx30.software_Package, rel: spdx30.Relationship
228 ) -> None:
229 if rel.relationshipType == spdx30.RelationshipType.hasAssociatedVulnerability:
230 for vuln in rel.to:
231 assessment_rels = self._map_pkg_vulns[p].setdefault(vuln, set())
232 for assessment in self._map_relation.get(vuln, []):
233 if (
234 isinstance(
235 assessment,
236 spdx30.security_VulnAssessmentRelationship,
237 )
238 and p in assessment.to
239 ):
240 assessment_rels.add(assessment)
242 def _update_map_cache(self) -> None:
243 self._map_relation.clear()
244 self._map_relation_rev.clear()
245 self._map_cve_vulns.clear()
247 for r in self.foreach_type(spdx30.Relationship):
248 self._map_relation[r.from_].add(r)
249 for o in r.to:
250 self._map_relation_rev[o].add(r)
252 for v in self.foreach_type(spdx30.security_Vulnerability):
253 cve_id = vuln_get_cve_id(v)
254 if cve_id:
255 self._map_cve_vulns[cve_id].add(v)
257 for p in self.iterate_install_packages():
258 rels = self._map_relation.get(p)
259 if not rels:
260 continue
262 for rel in rels:
263 self.__add_assess_to_map_pkg_vulns(p, rel)
265 def _init_parsed_document(self) -> None:
266 for d in self.foreach_type(spdx30.SpdxDocument):
267 self._doc = d
268 break
270 if not self._doc:
271 raise ValueError("Graph is missing an SpdxDocument node")
273 self._sbom = None
274 if len(self._doc.rootElement) == 1:
275 sbom = self._doc.rootElement[0]
276 if isinstance(sbom, spdx30.software_Sbom):
277 self._sbom = sbom
279 if not self._sbom:
280 raise ValueError("SpdxDocument does not contain an Sbom node")
282 self._update_map_cache()
284 @classmethod
285 def parse_jsonld(cls, path_spdx: str | pathlib.Path) -> "ObjectSet":
286 object_set = cls()
287 with pathlib.Path(path_spdx).open() as f:
288 d = spdx30.JSONLDDeserializer()
289 d.read(f, object_set)
290 object_set._init_parsed_document()
291 return object_set
293 def _cleanup_creation_info(self) -> None:
294 refed_ci: dict[tuple[Any, ...], spdx30.CreationInfo] = {}
295 insta_ci: set[spdx30.CreationInfo] = set()
297 for o in self.objects:
298 if isinstance(o, spdx30.CreationInfo):
299 insta_ci.add(o)
300 elif isinstance(o, spdx30.Element):
301 ci: spdx30.CreationInfo = o.creationInfo
302 if ci:
303 key = _creation_info_cmp_key(ci)
304 similar_ci = refed_ci.get(key)
305 if similar_ci is not None:
306 o.creationInfo = similar_ci
307 else:
308 refed_ci[key] = ci
310 insta_ci -= set(refed_ci.values())
311 self.objects -= insta_ci
313 def write_to_jsonld(self, path_spdx: str | pathlib.Path) -> None:
314 self._cleanup_creation_info()
315 with pathlib.Path(path_spdx).open("wb") as f:
316 s = spdx30.JSONLDInlineSerializer()
317 s.write(self, f)
319 def add(self, obj: spdx30.SHACLObject) -> spdx30.SHACLObject:
320 if isinstance(obj, spdx30.Relationship):
321 self._map_relation[obj.from_].add(obj)
322 for o in obj.to:
323 self._map_relation_rev[o].add(obj)
325 if isinstance(obj, spdx30.security_Vulnerability):
326 cve_id = vuln_get_cve_id(obj)
327 if cve_id:
328 self._map_cve_vulns[cve_id].add(obj)
330 return super().add(obj)
332 @staticmethod
333 def _remove_from_relation_map(
334 rel_map: dict[spdx30.SHACLObject, set[spdx30.Relationship]],
335 key: spdx30.SHACLObject,
336 val: spdx30.Relationship,
337 ) -> None:
338 rels = rel_map.get(key)
339 if rels:
340 rels.discard(val)
341 if rels:
342 del rel_map[key]
344 def _remove(self, obj: spdx30.SHACLObject) -> None:
345 if not isinstance(obj, spdx30.Relationship):
346 raise TypeError(f"Removing this kind of object is not supported: {obj}")
348 self._remove_from_relation_map(self._map_relation, obj.from_, obj)
349 for o in obj.to:
350 self._remove_from_relation_map(self._map_relation_rev, o, obj)
352 self.objects.discard(obj)
354 def remove(self, obj: spdx30.SHACLObject) -> None:
355 self._remove(obj)
356 self.create_index()
358 def iterate_builds(self) -> Generator[spdx30.build_Build, None, None]:
359 yield from self.foreach_type(spdx30.build_Build)
361 def iterate_install_packages(
362 self,
363 ) -> Generator[spdx30.software_Package, None, None]:
364 return (
365 p
366 for p in self.foreach_type(spdx30.software_Package)
367 if p.software_primaryPurpose == spdx30.software_SoftwarePurpose.install
368 )
370 def find_package_by_cpe(
371 self, cpe: str
372 ) -> Generator[spdx30.software_Package, None, None]:
373 for p in self.foreach_type(spdx30.software_Package):
374 for e in p.externalIdentifier:
375 if e.identifier == cpe:
376 yield p
378 def find_package_by_name(
379 self, name: str
380 ) -> Generator[spdx30.software_Package, None, None]:
381 for p in self.foreach_type(spdx30.software_Package):
382 if p.name == name:
383 yield p
385 @staticmethod
386 def list_cpe23(element: spdx30.Element) -> set[Cpe23]:
387 cpes = set()
388 for e in element.externalIdentifier:
389 if e.externalIdentifierType == spdx30.ExternalIdentifierType.cpe23:
390 cpe = Cpe23.parse(e.identifier)
391 if cpe is not None:
392 cpes.add(cpe)
393 return cpes
395 def get_package_vulns(
396 self, pkg: spdx30.software_Package
397 ) -> dict[
398 spdx30.security_Vulnerability, set[spdx30.security_VulnAssessmentRelationship]
399 ]:
400 """Get the list of Vulnerability associated with this package"""
401 return self._map_pkg_vulns.get(pkg, {})
403 def get_package_vuln(
404 self, pkg: spdx30.software_Package, cve_id: str
405 ) -> tuple[
406 spdx30.security_Vulnerability | None,
407 set[spdx30.security_VulnAssessmentRelationship],
408 ]:
409 """Get the Vulnerability, associated with this package, with this CVE id"""
410 for vuln, rels in self._map_pkg_vulns.get(pkg, {}).items():
411 if vuln_get_cve_id(vuln) == cve_id:
412 return vuln, rels
413 return None, set()
415 def get_all_vulns(
416 self,
417 ) -> dict[
418 spdx30.security_Vulnerability,
419 dict[spdx30.software_Package, set[spdx30.security_VulnAssessmentRelationship]],
420 ]:
421 """
422 Get the list of all Vulnerabilities, with associated packages and assessments
423 """
424 vulns: dict[
425 spdx30.security_Vulnerability,
426 dict[
427 spdx30.software_Package, set[spdx30.security_VulnAssessmentRelationship]
428 ],
429 ] = {}
430 for pkg, vulns_rels in self._map_pkg_vulns.items():
431 for vuln, rels in vulns_rels.items():
432 pkgs_rels = vulns.setdefault(vuln, {})
433 pkgs_rels.setdefault(pkg, set()).update(rels)
435 return vulns
437 def get_package_vuln_assessment_rels(
438 self, pkg: spdx30.software_Package, vuln: spdx30.security_Vulnerability
439 ) -> set[spdx30.security_VulnAssessmentRelationship] | None:
440 """
441 Get the vulnerability assessment relationships associated with a package and a
442 vulnerability.
443 :return: None if there is no relationship between this package and this
444 vulnerability
445 """
446 vulns_rels = self._map_pkg_vulns.get(pkg)
447 if vulns_rels:
448 return vulns_rels.get(vuln)
449 return None
451 def get_build_recipe_packages(
452 self, build_recipe: spdx30.build_Build
453 ) -> set[spdx30.software_Package]:
454 pkgs: set[spdx30.software_Package] = set()
455 for rel in self._map_relation.get(build_recipe, []):
456 if (
457 isinstance(rel, spdx30.LifecycleScopedRelationship)
458 and rel.relationshipType == spdx30.RelationshipType.hasOutput
459 ):
460 for pkg in rel.to:
461 if (
462 isinstance(pkg, spdx30.software_Package)
463 and pkg.software_primaryPurpose
464 == spdx30.software_SoftwarePurpose.install
465 ):
466 pkgs.add(pkg)
467 return pkgs
469 def get_build_recipe_sources(
470 self, build_recipe: spdx30.build_Build
471 ) -> set[spdx30.software_SoftwareArtifact]:
472 pkg_sources: set[spdx30.software_SoftwareArtifact] = set()
473 for rel in self._map_relation.get(build_recipe, []):
474 if (
475 isinstance(rel, spdx30.LifecycleScopedRelationship)
476 and rel.relationshipType == spdx30.RelationshipType.hasInput
477 ):
478 for pkg_src_o in rel.to:
479 if (
480 isinstance(pkg_src_o, spdx30.software_SoftwareArtifact)
481 and pkg_src_o.software_primaryPurpose
482 == spdx30.software_SoftwarePurpose.source
483 ):
484 pkg_sources.add(pkg_src_o)
485 return pkg_sources
487 @classmethod
488 def _get_namespace(cls, pn: str) -> str:
489 namespace_uuid = uuid.uuid5(uuid.NAMESPACE_DNS, cls.SPDX_UUID_NAMESPACE)
490 return f"{cls.SPDX_NAMESPACE_PREFIX}/{uuid.uuid5(namespace_uuid, pn)!s}"
492 @classmethod
493 def new_spdxid(cls, *suffix: str, pn: str) -> str:
494 items = [cls._get_namespace(pn)]
495 items.extend(re.sub(r"[^a-zA-Z0-9_-]", "_", s) for s in suffix)
496 return "/".join(items)
498 @classmethod
499 def new_agent(
500 cls,
501 *,
502 pn: str,
503 agent_type: str,
504 name: str,
505 comment: str | None = None,
506 creation_info: spdx30.CreationInfo | None = None,
507 ) -> spdx30.Agent:
508 spdxid = cls.new_spdxid("agent", name, pn=pn)
510 if agent_type == "person":
511 agent = spdx30.Person(_id=spdxid, name=name)
512 elif agent_type == "software":
513 agent = spdx30.SoftwareAgent(_id=spdxid, name=name)
514 elif agent_type == "organization":
515 agent = spdx30.Organization(_id=spdxid, name=name)
516 elif not agent_type or agent_type == "agent":
517 agent = spdx30.Agent(_id=spdxid, name=name)
518 else:
519 raise ValueError(f"Unknown agent type: {agent_type}")
521 if creation_info is not None:
522 agent.creationInfo = creation_info
523 if comment:
524 agent.comment = comment
526 return agent
528 @classmethod
529 def new_tool(
530 cls, *, pn: str, name: str, creation_info: spdx30.CreationInfo = None
531 ) -> spdx30.Tool:
532 tool = spdx30.Tool(_id=cls.new_spdxid("tool", name, pn=pn), name=name)
533 if creation_info is not None:
534 tool.creationInfo = creation_info
535 return tool
537 @staticmethod
538 def new_creation_info(
539 *, tools: list[spdx30.Tool], agents: list[spdx30.Agent]
540 ) -> spdx30.CreationInfo:
541 creation_info = spdx30.CreationInfo()
543 for tool in tools:
544 if not tool.creationInfo:
545 tool.creationInfo = creation_info
547 for agent in agents:
548 if not agent.creationInfo:
549 agent.creationInfo = creation_info
551 creation_info.created = ReproducibleDateTime().now
552 creation_info.specVersion = SPDX_VERSION
553 creation_info.createdBy = agents
554 creation_info.createdUsing = tools
556 return creation_info
558 @classmethod
559 def new_objset(cls, name: str) -> "ObjectSet":
560 object_set = cls()
562 document = spdx30.SpdxDocument(
563 _id=object_set.new_spdxid("document", pn=name),
564 name=name,
565 )
567 document.profileConformance = [
568 spdx30.ProfileIdentifierType.build,
569 spdx30.ProfileIdentifierType.core,
570 spdx30.ProfileIdentifierType.security,
571 spdx30.ProfileIdentifierType.simpleLicensing,
572 spdx30.ProfileIdentifierType.software,
573 ]
575 object_set._doc = document
576 object_set.add(document)
578 return object_set
580 def set_doc_creation_info(self, creation_info: spdx30.CreationInfo) -> None:
581 for tool in creation_info.createdUsing:
582 self.add(tool)
584 for agent in creation_info.createdBy:
585 self.add(agent)
587 self.add(creation_info)
588 assert self._doc is not None
589 self._doc.creationInfo = creation_info
591 @property
592 def doc_creation_info(self) -> spdx30.CreationInfo:
593 assert self._doc is not None
594 return self._doc.creationInfo
596 def add_sbom_as_doc_root_element(self, name: str) -> None:
597 assert self._sbom is None
598 self._sbom = spdx30.software_Sbom(
599 _id=self.new_spdxid("sbom", pn=name),
600 name=name,
601 creationInfo=self.doc_creation_info,
602 software_sbomType=[spdx30.software_SbomType.build],
603 )
605 self.add(self._sbom)
606 assert self._doc is not None
607 self._doc.rootElement = [self._sbom]
609 def add_sbom_root_element(self, e: spdx30.Element) -> None:
610 assert self._sbom is not None
611 self._sbom.rootElement.append(e)
613 def _new_relationship(
614 self,
615 cls: type[_RelationshipT],
616 from_: ListLikeSHACLObject,
617 typ: str,
618 to: ListLikeSHACLObject,
619 *,
620 pn: str,
621 spdxid_name: str = "relationship",
622 **props: object,
623 ) -> list[_RelationshipT]:
624 from_ = to_list(from_)
625 to = to_list(to)
627 if not from_:
628 return []
630 if not to:
631 to = [spdx30.IndividualElement.NoneElement]
633 ret = []
635 for f in from_:
636 hash_args: list[object] = [typ, f]
637 for _, p in sorted(props.items(), key=lambda t: t[0]):
638 hash_args.append(p)
639 hash_args.extend(to)
641 relationship = self.add(
642 cls(
643 _id=self.new_spdxid(spdxid_name, spdxid_hash(*hash_args), pn=pn),
644 creationInfo=self.doc_creation_info,
645 from_=f,
646 relationshipType=typ,
647 to=to,
648 **props,
649 )
650 )
651 ret.append(relationship)
653 return ret
655 def new_relationship(
656 self, from_: ListLikeSHACLObject, typ: str, to: ListLikeSHACLObject, *, pn: str
657 ) -> list[spdx30.Relationship]:
658 return self._new_relationship(spdx30.Relationship, from_, typ, to, pn=pn)
660 def new_scoped_relationship(
661 self,
662 from_: ListLikeSHACLObject,
663 typ: str, # RelationshipType
664 scope: str, # LifecycleScopeType
665 to: ListLikeSHACLObject,
666 *,
667 pn: str,
668 ) -> list[spdx30.LifecycleScopedRelationship]:
669 return self._new_relationship(
670 spdx30.LifecycleScopedRelationship, from_, typ, to, scope=scope, pn=pn
671 )
673 def update_cve_vuln(
674 self, vuln: spdx30.security_Vulnerability, cve_info: CveInfo
675 ) -> None:
676 updated = False
678 # Update description
679 if cve_info.description and (vuln.description != cve_info.description):
680 vuln.description = cve_info.description
681 updated = True
683 # Add new external references
684 map_ext_refs = {}
685 for ext_ref in cve_info.references:
686 map_ext_refs[ext_ref.url] = ext_ref
688 for ext_ref in vuln.externalRef:
689 for locator in ext_ref.locator:
690 map_ext_refs.pop(locator, None)
692 for ext_ref in map_ext_refs.values():
693 props = (
694 {}
695 if ext_ref.ref_type is None
696 else {"externalRefType": ext_ref.ref_type}
697 )
698 vuln.externalRef.append(spdx30.ExternalRef(locator=[ext_ref.url], **props))
699 updated = True
701 # Update last modification date time
702 if cve_info.date_modified is not None:
703 date_modified = cve_info.date_modified.astimezone(UTC)
704 if date_modified != vuln.security_modifiedTime:
705 vuln.security_modifiedTime = date_modified
706 updated = True
708 # Update published date time
709 if cve_info.date_published is not None:
710 date_published = cve_info.date_published.astimezone(UTC)
711 if date_published != vuln.security_publishedTime:
712 vuln.security_publishedTime = date_published
713 updated = True
715 if updated:
716 vuln.creationInfo = self.doc_creation_info
718 def new_cve_vuln(
719 self, cve_info: CveInfo, *, pn: str
720 ) -> spdx30.security_Vulnerability:
721 cve_id = cve_info.cve_id.id
722 vuln = spdx30.security_Vulnerability(
723 _id=self.new_spdxid("vulnerability", cve_id, pn=pn)
724 )
725 vuln.creationInfo = self.doc_creation_info
726 if cve_info.description is not None:
727 vuln.description = cve_info.description
729 if cve_info.date_modified is not None:
730 vuln.security_modifiedTime = cve_info.date_modified.astimezone(UTC)
732 if cve_info.date_published is not None:
733 vuln.security_publishedTime = cve_info.date_published.astimezone(UTC)
735 vuln.externalIdentifier.append(
736 spdx30.ExternalIdentifier(
737 externalIdentifierType=spdx30.ExternalIdentifierType.cve,
738 identifier=cve_id,
739 identifierLocator=[
740 f"https://cveawg.mitre.org/api/cve/{cve_id}",
741 f"https://www.cve.org/CVERecord?id={cve_id}",
742 ],
743 )
744 )
746 for ext_ref in cve_info.references:
747 props = (
748 {}
749 if ext_ref.ref_type is None
750 else {"externalRefType": ext_ref.ref_type}
751 )
752 vuln.externalRef.append(spdx30.ExternalRef(locator=[ext_ref.url], **props))
754 return vuln
756 @staticmethod
757 def _props_for_vex_assessment(assessment: CveVexAssessment) -> dict[str, Any]:
758 props: dict[str, Any] = {}
760 if assessment.vex_version is not None:
761 props["security_vexVersion"] = assessment.vex_version
762 if assessment.status_notes is not None:
763 props["security_statusNotes"] = assessment.status_notes
765 if isinstance(assessment, CveVexAffectedAssessment):
766 props["security_actionStatement"] = assessment.action_statement
767 if assessment.action_statement_time is not None:
768 props["security_actionStatementTime"] = (
769 assessment.action_statement_time.astimezone(tz=UTC)
770 )
772 elif isinstance(assessment, CveVexNotAffectedAssessment):
773 props["security_impactStatement"] = assessment.impact_statement
774 if assessment.impact_statement_time is not None:
775 props["security_impactStatementTime"] = (
776 assessment.impact_statement_time.astimezone(tz=UTC)
777 )
778 if assessment.justification is not None:
779 props["security_justificationType"] = (
780 spdx30.security_VexJustificationType.NAMED_INDIVIDUALS[
781 cve_vex_justification_to_str(assessment.justification)
782 ]
783 )
785 return props
787 def update_vex_assessment_relationship(
788 self,
789 rel: spdx30.security_VexVulnAssessmentRelationship,
790 assessment: CveVexAssessment,
791 ) -> None:
792 props = self._props_for_vex_assessment(assessment)
793 if props:
794 updated = False
795 for key, val in props.items():
796 if getattr(rel, key) != val:
797 setattr(rel, key, val)
798 updated = True
800 if updated:
801 rel.creationInfo = self.doc_creation_info
803 def new_vex_assessment_relationship(
804 self,
805 from_: ListLikeSHACLObject,
806 to: ListLikeSHACLObject,
807 assessment: CveVexAssessment,
808 *,
809 pn: str,
810 ) -> list[spdx30.security_VexVulnAssessmentRelationship]:
811 if isinstance(assessment, CveVexFixedAssessment):
812 cls = spdx30.security_VexFixedVulnAssessmentRelationship
813 rel_typ = spdx30.RelationshipType.fixedIn
814 spdxid_name = "vex-fixed"
816 elif isinstance(assessment, CveVexAffectedAssessment):
817 cls = spdx30.security_VexAffectedVulnAssessmentRelationship
818 rel_typ = spdx30.RelationshipType.affects
819 spdxid_name = "vex-affected"
821 elif isinstance(assessment, CveVexNotAffectedAssessment):
822 cls = spdx30.security_VexNotAffectedVulnAssessmentRelationship
823 rel_typ = spdx30.RelationshipType.doesNotAffect
824 spdxid_name = "vex-not-affected"
826 elif isinstance(assessment, CveVexUnderInvestigationAssessment):
827 cls = spdx30.security_VexUnderInvestigationVulnAssessmentRelationship
828 rel_typ = spdx30.RelationshipType.underInvestigationFor
829 spdxid_name = "vex-under-invest"
831 else:
832 raise TypeError(f"Unexpected assessment type: {assessment.status}")
834 props = self._props_for_vex_assessment(assessment)
836 return self._new_relationship(
837 cls, from_, rel_typ, to, spdxid_name=spdxid_name, **props, pn=pn
838 )
840 def new_cvss_vuln_assessment_relationship(
841 self,
842 from_: ListLikeSHACLObject,
843 to: ListLikeSHACLObject,
844 cvss_metric: CvssMetric,
845 *,
846 pn: str,
847 ) -> list[spdx30.security_VulnAssessmentRelationship]:
848 if cvss_metric.cvss_ver == CvssVersion.V2_0:
849 cls = spdx30.security_CvssV2VulnAssessmentRelationship
850 elif cvss_metric.cvss_ver in (CvssVersion.V3_0, CvssVersion.V3_1):
851 cls = spdx30.security_CvssV3VulnAssessmentRelationship
852 elif cvss_metric.cvss_ver == CvssVersion.V4_0:
853 cls = spdx30.security_CvssV4VulnAssessmentRelationship
854 else:
855 raise ValueError(f"Unexpected CVSS version: {cvss_metric.cvss_ver}")
857 props = {}
858 if cvss_metric.cvss_ver != CvssVersion.V2_0:
859 assert cvss_metric.severity is not None
860 props["security_severity"] = (
861 spdx30.security_CvssSeverityType.NAMED_INDIVIDUALS[
862 cvss_metric.severity.value
863 ]
864 )
866 if cvss_metric.source is not None:
867 props["comment"] = cvss_metric.source
869 return self._new_relationship(
870 cls,
871 from_,
872 spdx30.RelationshipType.hasAssessmentFor,
873 to,
874 spdxid_name=f"cvss-{cvss_metric.cvss_ver.name}",
875 security_score=cvss_metric.score,
876 security_vectorString=cvss_metric.vector_str,
877 **props,
878 pn=pn,
879 )
881 def _create_vex_vuln_assessment_relationship(
882 self,
883 package: spdx30.software_Package,
884 vuln: spdx30.security_Vulnerability,
885 assessment: CveVexAssessment,
886 rels: set[spdx30.security_VulnAssessmentRelationship],
887 ) -> None:
888 vex_rel: spdx30.security_VexVulnAssessmentRelationship | None = None
890 # Check current Vex Vulnerability Assessment Relationship
891 for r in list(rels):
892 a = cve_vex_assessment_from_vuln_relationship(r)
893 if a is None:
894 continue
895 if (a.status == assessment.status) and (vex_rel is None):
896 # Ok, only need to update properties
897 vex_rel = cast("spdx30.security_VexVulnAssessmentRelationship", r)
898 self.update_vex_assessment_relationship(vex_rel, assessment)
899 else:
900 # The status changed, or there is a duplicated relationship,
901 # need to destroy this old relationship
902 self.remove(r)
903 rels.discard(r)
905 # If there was already a relationship with the appropriate status, return
906 if vex_rel is not None:
907 return
909 # Create a new Vex Vulnerability Assessment Relationship
910 vex_rel = self.new_vex_assessment_relationship(
911 from_=[vuln], to=[package], assessment=assessment, pn=package.name
912 )[0]
913 self.add(vex_rel)
914 rels.add(vex_rel)
916 def _cleanup_cvss_vuln_assessment_relationship(
917 self,
918 cvss_metrics: Iterable[CvssMetric],
919 rels: set[spdx30.security_VulnAssessmentRelationship],
920 remove_old_metrics: bool,
921 ) -> list[CvssMetric]:
922 def _sort_rel(rel: spdx30.security_VulnAssessmentRelationship) -> int:
923 return len(rel.comment or "")
925 # Build a lookup between the CVSS key and the CVSS relationship
926 metrics_doc: dict[
927 tuple[Any, ...],
928 list[spdx30.security_VulnAssessmentRelationship],
929 ] = defaultdict(list)
931 for r in rels:
932 m_from_r = cvss_metric_from_vuln_relationship(r)
933 if m_from_r:
934 metrics_doc[m_from_r.cmp_key()].append(r)
936 # Only append new metrics
937 if not remove_old_metrics:
938 return [m for m in cvss_metrics if (m.cmp_key() not in metrics_doc)]
940 # Remove old metrics if different that new ones
941 metrics_to_add: list[CvssMetric] = []
942 for new_m in cvss_metrics:
943 old_rels = metrics_doc.pop(new_m.cmp_key(), None)
944 if not old_rels:
945 metrics_to_add.append(new_m)
946 continue
948 if len(old_rels) == 1:
949 continue
951 del_rels = iter(sorted(old_rels, key=_sort_rel, reverse=True))
952 next(del_rels)
953 for r in del_rels:
954 self.remove(r)
955 rels.discard(r)
957 # Remove old metrics left
958 for old_rels in metrics_doc.values():
959 for r in old_rels:
960 self.remove(r)
961 rels.discard(r)
963 return metrics_to_add
965 def create_vex_vuln_assessment_relationship(
966 self,
967 package: spdx30.software_Package,
968 vuln: spdx30.security_Vulnerability,
969 assessment: CveVexAssessment,
970 ) -> bool:
971 # Check if there is already a relation between the package and the vulnerability
972 assess_rels = self.get_package_vuln_assessment_rels(package, vuln)
973 if assess_rels is None:
974 return False
976 self._create_vex_vuln_assessment_relationship(
977 package, vuln, assessment, assess_rels
978 )
979 return True
981 def _create_cvss_vuln_assessment_relationship(
982 self,
983 package: spdx30.software_Package,
984 vuln: spdx30.security_Vulnerability,
985 cvss_metric: CvssMetric,
986 rels: set[spdx30.security_VulnAssessmentRelationship],
987 ) -> None:
988 # Create a new CVSS Vulnerability Assessment Relationship
989 r = self.new_cvss_vuln_assessment_relationship(
990 from_=[vuln], to=[package], cvss_metric=cvss_metric, pn=package.name
991 )[0]
992 self.add(r)
993 rels.add(r)
995 def create_cvss_vuln_assessment_relationship(
996 self,
997 package: spdx30.software_Package,
998 vuln: spdx30.security_Vulnerability,
999 cvss_metric: CvssMetric,
1000 ) -> bool:
1001 # Check if there is already a relation between the package and the vulnerability
1002 assess_rels = self.get_package_vuln_assessment_rels(package, vuln)
1003 if assess_rels is None:
1004 return False
1006 self._create_cvss_vuln_assessment_relationship(
1007 package, vuln, cvss_metric, assess_rels
1008 )
1009 return True
1011 def add_cve_vulnerability(
1012 self,
1013 package: spdx30.software_Package,
1014 cve_info: CveInfo,
1015 *,
1016 update_vuln_info: bool = True,
1017 remove_old_metrics: bool = True,
1018 ) -> None:
1019 # Check if this CVE already exists
1020 vulns_rels = self._map_pkg_vulns[package]
1021 vulns_cve_id = self._map_cve_vulns[cve_info.cve_id.id]
1022 if not vulns_cve_id:
1023 # Create new CVE entry
1024 vuln = self.new_cve_vuln(cve_info, pn=package.name)
1025 self.add(vuln)
1026 else:
1027 # If there are duplicate CVE objects, try to find the right one
1028 # Otherwise use the first one that was found.
1029 if len(vulns_cve_id) == 1:
1030 vuln = next(iter(vulns_cve_id))
1031 else:
1032 for v in vulns_cve_id:
1033 if v in vulns_rels:
1034 vuln = v
1035 break
1036 else:
1037 vuln = next(iter(vulns_cve_id))
1039 if update_vuln_info:
1040 # Update CVE information using the ones provided in input
1041 self.update_cve_vuln(vuln, cve_info)
1043 # Search for a previous hasAssociatedVulnerability relationship,
1044 # and create it if not already exists
1045 assess_rels = vulns_rels.get(vuln)
1046 if assess_rels is None:
1047 for r in self._map_relation.get(package, []):
1048 if (
1049 r.relationshipType
1050 == spdx30.RelationshipType.hasAssociatedVulnerability
1051 ):
1052 r.to.append(vuln)
1053 r.creationInfo = self.doc_creation_info
1054 break
1055 else:
1056 r = self.new_relationship(
1057 from_=[package],
1058 typ=spdx30.RelationshipType.hasAssociatedVulnerability,
1059 to=[vuln],
1060 pn=package.name,
1061 )[0]
1062 self.add(r)
1063 assess_rels = set()
1064 vulns_rels[vuln] = assess_rels
1066 # Create or update Vulnerability Assessment Relationships
1067 vex_assess = cve_info.vex_assessment
1068 if vex_assess is not None:
1069 self._create_vex_vuln_assessment_relationship(
1070 package, vuln, vex_assess, assess_rels
1071 )
1073 cvss_metrics = self._cleanup_cvss_vuln_assessment_relationship(
1074 cve_info.cvss_metrics, assess_rels, remove_old_metrics
1075 )
1076 for cvss_metric in cvss_metrics:
1077 self._create_cvss_vuln_assessment_relationship(
1078 package, vuln, cvss_metric, assess_rels
1079 )
1081 def remove_all_cve_vulnerability(self, cve_id: str) -> None:
1082 # Check if this CVE exists
1083 vulns_cve_id = self._map_cve_vulns.get(cve_id)
1084 if vulns_cve_id is None:
1085 return
1087 # For each vulnerability with this CVE identifier
1088 for vuln in vulns_cve_id:
1089 # Remove all relationships found associated with this CVE
1090 for r in list(self._map_relation.get(vuln, [])):
1091 self._remove(r)
1092 for r in list(self._map_relation_rev.get(vuln, [])):
1093 self._remove(r)
1095 # Update map between package and vulnerabilities
1096 for vulns_rels in self._map_pkg_vulns.values():
1097 vulns_rels.pop(vuln, None)
1099 # Recreate index
1100 self.create_index()