Source code for jeevesagent.data.lineage
"""Freshness and lineage policies for :class:`CertifiedValue`.
A :class:`CertifiedValue` (defined in :mod:`jeevesagent.core.types`)
carries provenance metadata: ``source``, ``fetched_at``, optional
``valid_until``, ``schema_version``, and a tuple of upstream value IDs
in ``lineage``.
This module supplies the policy *types* and *helpers*. Two flavours:
* :class:`FreshnessPolicy` — declare a maximum age per source
prefix; ``valid_until`` always wins when set on the value itself.
* :class:`LineagePolicy` — declare an allow-list of source prefixes
every value in a lineage chain must originate from.
Two helper styles for each:
* ``check_*`` returns ``True``/``False`` so callers can branch.
* ``require_*`` raises the appropriate
:class:`~jeevesagent.core.errors.FreshnessError` /
:class:`~jeevesagent.core.errors.LineageError` so callers can rely
on exception propagation.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from ..core.errors import FreshnessError, LineageError
from ..core.types import CertifiedValue
[docs]
@dataclass(frozen=True)
class FreshnessPolicy:
"""Maximum age for certified values from each source.
``per_source`` maps a source-prefix (matched with ``startswith``)
to a ``timedelta``. The first prefix that matches wins. ``default``
is used when no prefix matches; if also ``None``, the policy
treats all values as fresh.
"""
per_source: tuple[tuple[str, timedelta], ...] = ()
default: timedelta | None = None
[docs]
def max_age_for(self, source: str) -> timedelta | None:
for prefix, max_age in self.per_source:
if source.startswith(prefix):
return max_age
return self.default
[docs]
@classmethod
def from_dict(
cls,
per_source: dict[str, timedelta] | None = None,
*,
default: timedelta | None = None,
) -> FreshnessPolicy:
items = tuple((k, v) for k, v in (per_source or {}).items())
return cls(per_source=items, default=default)
[docs]
@dataclass(frozen=True)
class LineagePolicy:
"""Allow-list of source prefixes for the entire lineage chain.
A :class:`CertifiedValue` is acceptable if every entry in
``value.lineage`` (interpreted as a source prefix) starts with one
of the allowed prefixes.
"""
allowed_sources: frozenset[str] = field(default_factory=frozenset)
[docs]
@classmethod
def from_iter(cls, sources: list[str] | tuple[str, ...]) -> LineagePolicy:
return cls(allowed_sources=frozenset(sources))
# ---------------------------------------------------------------------------
# Freshness checks
# ---------------------------------------------------------------------------
def _utcnow() -> datetime:
return datetime.now(UTC)
[docs]
def check_freshness(
value: CertifiedValue,
policy: FreshnessPolicy,
*,
now: datetime | None = None,
) -> bool:
"""Return ``True`` if ``value`` satisfies ``policy`` at ``now``.
Logic:
1. If ``valid_until`` is set on the value, fail if ``now > valid_until``.
2. Look up ``policy.max_age_for(source)``. If ``None`` (no rule),
the value is fresh by default.
3. Otherwise fail if ``now - fetched_at > max_age``.
"""
moment = now if now is not None else _utcnow()
if value.valid_until is not None and moment > value.valid_until:
return False
max_age = policy.max_age_for(value.source)
if max_age is None:
return True
return (moment - value.fetched_at) <= max_age
[docs]
def require_freshness(
value: CertifiedValue,
policy: FreshnessPolicy,
*,
now: datetime | None = None,
) -> None:
"""Raise :class:`FreshnessError` when :func:`check_freshness` fails."""
if not check_freshness(value, policy, now=now):
moment = now if now is not None else _utcnow()
age = moment - value.fetched_at
raise FreshnessError(
f"value from {value.source!r} is stale "
f"(age {age}, valid_until={value.valid_until})"
)
# ---------------------------------------------------------------------------
# Lineage checks
# ---------------------------------------------------------------------------
[docs]
def check_lineage(value: CertifiedValue, policy: LineagePolicy) -> bool:
"""Return ``True`` if every lineage source is allowed.
The value's own ``source`` is also required to be in the allow-list
— there's no point trusting a chain whose tip you don't.
"""
if policy.allowed_sources and not _allowed(value.source, policy.allowed_sources):
return False
for ancestor in value.lineage:
if not _allowed(ancestor, policy.allowed_sources):
return False
return True
[docs]
def require_lineage(value: CertifiedValue, policy: LineagePolicy) -> None:
"""Raise :class:`LineageError` when :func:`check_lineage` fails."""
if not check_lineage(value, policy):
bad = next(
(anc for anc in value.lineage if not _allowed(anc, policy.allowed_sources)),
value.source,
)
raise LineageError(
f"lineage source {bad!r} not in allow-list "
f"({sorted(policy.allowed_sources)})"
)
def _allowed(source: str, allow: frozenset[str]) -> bool:
if not allow:
return True
return any(source.startswith(prefix) for prefix in allow)