"""Pydantic types for messages, events, tools, memory, and runtime state.
These are the value objects that flow across module boundaries. They are
immutable where possible, validated on construction, and free of behavior
that requires I/O.
"""
from datetime import UTC, datetime, timedelta
from enum import StrEnum
from typing import Any, Literal
from pydantic import BaseModel, ConfigDict, Field
from .ids import new_id
def _utcnow() -> datetime:
return datetime.now(UTC)
# ---------------------------------------------------------------------------
# Messages and model I/O
# ---------------------------------------------------------------------------
[docs]
class Role(StrEnum):
SYSTEM = "system"
USER = "user"
ASSISTANT = "assistant"
TOOL = "tool"
[docs]
class Message(BaseModel):
"""A single chat message in the model's conversation.
``tool_calls`` is populated on assistant messages that emitted tool
calls in the previous turn — real provider adapters (Anthropic
``tool_use`` blocks, OpenAI ``tool_calls`` array) need to reconstruct
the right wire format from this.
"""
model_config = ConfigDict(frozen=True)
role: Role
content: str
name: str | None = None
tool_call_id: str | None = None
tool_calls: tuple[ToolCall, ...] = ()
[docs]
class Usage(BaseModel):
"""Token and cost accounting for a model call."""
model_config = ConfigDict(frozen=True)
input_tokens: int = 0
output_tokens: int = 0
cost_usd: float = 0.0
[docs]
class ModelChunk(BaseModel):
"""A single chunk from a streaming model call.
Discriminated by ``kind``. Exactly one of the optional fields is set
depending on the kind.
"""
kind: Literal["text", "tool_call", "finish"]
text: str | None = None
tool_call: ToolCall | None = None
finish_reason: str | None = None
usage: Usage | None = None
# ---------------------------------------------------------------------------
# Memory
# ---------------------------------------------------------------------------
[docs]
class MemoryBlock(BaseModel):
"""An in-context memory block, pinned to every prompt."""
name: str
content: str
updated_at: datetime = Field(default_factory=_utcnow)
pinned_order: int = 0
[docs]
class Episode(BaseModel):
"""A single (input, decisions, tool calls, output) tuple from history.
``user_id`` is the framework-managed namespace partition. Episodes
persisted with one ``user_id`` value are never visible to memory
recall queries scoped to a different ``user_id``. ``None`` is its
own bucket — the "anonymous / single-tenant" namespace — and does
not see episodes belonging to a non-None ``user_id`` (and vice
versa). Set automatically from :class:`~jeevesagent.RunContext`
by the agent loop; pass explicitly when constructing episodes
outside a run.
"""
id: str = Field(default_factory=lambda: new_id("ep"))
session_id: str
user_id: str | None = None
occurred_at: datetime = Field(default_factory=_utcnow)
input: str
output: str
tool_calls: list[ToolCall] = Field(default_factory=list)
embedding: list[float] | None = None
[docs]
class Fact(BaseModel):
"""A semantic claim extracted from one or more episodes.
Bi-temporal: ``valid_from``/``valid_until`` tracks when the fact was
true in the world; ``recorded_at`` tracks when we learned it.
``user_id`` is the framework-managed namespace partition. Facts
persisted with one ``user_id`` value are never visible to recall
queries scoped to a different ``user_id``. Set automatically from
:class:`~jeevesagent.RunContext` by the agent loop / consolidator;
pass explicitly when constructing facts outside a run.
"""
id: str = Field(default_factory=lambda: new_id("fact"))
user_id: str | None = None
subject: str
predicate: str
object: str
confidence: float = 1.0
valid_from: datetime = Field(default_factory=_utcnow)
valid_until: datetime | None = None
recorded_at: datetime = Field(default_factory=_utcnow)
sources: list[str] = Field(default_factory=list)
# ---------------------------------------------------------------------------
# Decisions and control signals
# ---------------------------------------------------------------------------
[docs]
class PermissionDecision(BaseModel):
"""Outcome of a permission check or pre-tool hook."""
model_config = ConfigDict(frozen=True)
decision: Literal["allow", "deny", "ask"]
reason: str | None = None
@property
def allow(self) -> bool:
return self.decision == "allow"
@property
def deny(self) -> bool:
return self.decision == "deny"
@property
def ask(self) -> bool:
return self.decision == "ask"
[docs]
@classmethod
def allow_(cls, reason: str | None = None) -> "PermissionDecision":
return cls(decision="allow", reason=reason)
[docs]
@classmethod
def deny_(cls, reason: str) -> "PermissionDecision":
return cls(decision="deny", reason=reason)
[docs]
@classmethod
def ask_(cls, reason: str | None = None) -> "PermissionDecision":
return cls(decision="ask", reason=reason)
[docs]
class BudgetStatus(BaseModel):
"""Result of a budget check before each step."""
model_config = ConfigDict(frozen=True)
state: Literal["ok", "warn", "blocked"]
reason: str | None = None
@property
def blocked(self) -> bool:
return self.state == "blocked"
@property
def warn(self) -> bool:
return self.state == "warn"
[docs]
@classmethod
def ok_(cls) -> "BudgetStatus":
return cls(state="ok")
[docs]
@classmethod
def warn_(cls, reason: str) -> "BudgetStatus":
return cls(state="warn", reason=reason)
[docs]
@classmethod
def blocked_(cls, reason: str) -> "BudgetStatus":
return cls(state="blocked", reason=reason)
# ---------------------------------------------------------------------------
# Events (the streamed observation channel)
# ---------------------------------------------------------------------------
[docs]
class EventKind(StrEnum):
STARTED = "started"
MODEL_CHUNK = "model_chunk"
TOOL_CALL = "tool_call"
TOOL_RESULT = "tool_result"
MEMORY_RECALL = "memory_recall"
MEMORY_WRITE = "memory_write"
BUDGET_WARNING = "budget_warning"
BUDGET_EXCEEDED = "budget_exceeded"
PERMISSION_ASK = "permission_ask"
PERMISSION_DECISION = "permission_decision"
ERROR = "error"
COMPLETED = "completed"
ARCHITECTURE_EVENT = "architecture_event"
"""Generic architecture-progress event. Carries a namespaced
``name`` in the payload (e.g. ``"self_refine.critique"``,
``"reflexion.lesson_persisted"``, ``"router.classified"``) so
each architecture can stream its own progress signal without
expanding :class:`EventKind`."""
[docs]
class Event(BaseModel):
"""A single observable record from a running session.
Carries a discriminator (``kind``) plus a free-form payload. Construct
via the class methods to ensure consistent shapes.
"""
kind: EventKind
session_id: str
at: datetime = Field(default_factory=_utcnow)
payload: dict[str, Any] = Field(default_factory=dict)
[docs]
@classmethod
def started(cls, session_id: str, prompt: str) -> "Event":
return cls(kind=EventKind.STARTED, session_id=session_id, payload={"prompt": prompt})
[docs]
@classmethod
def model_chunk(cls, session_id: str, chunk: ModelChunk) -> "Event":
return cls(
kind=EventKind.MODEL_CHUNK,
session_id=session_id,
payload={"chunk": chunk.model_dump()},
)
[docs]
@classmethod
def budget_warning(cls, session_id: str, status: BudgetStatus) -> "Event":
return cls(
kind=EventKind.BUDGET_WARNING,
session_id=session_id,
payload={"status": status.model_dump()},
)
[docs]
@classmethod
def budget_exceeded(cls, session_id: str, status: BudgetStatus) -> "Event":
return cls(
kind=EventKind.BUDGET_EXCEEDED,
session_id=session_id,
payload={"status": status.model_dump()},
)
[docs]
@classmethod
def error(cls, session_id: str, exc: BaseException) -> "Event":
return cls(
kind=EventKind.ERROR,
session_id=session_id,
payload={"type": type(exc).__name__, "message": str(exc)},
)
[docs]
@classmethod
def completed(cls, session_id: str, result: Any) -> "Event":
return cls(
kind=EventKind.COMPLETED,
session_id=session_id,
payload={"result": result},
)
[docs]
@classmethod
def architecture_event(
cls,
session_id: str,
name: str,
**data: Any,
) -> "Event":
"""Generic architecture-progress event.
``name`` is a namespaced string identifying the source
architecture and the kind of progress
(e.g. ``"self_refine.critique"``,
``"reflexion.lesson_persisted"``,
``"router.classified"``). ``data`` is merged into the
payload alongside ``name`` so consumers can pattern-match
on ``name`` and read structured fields off the rest.
"""
return cls(
kind=EventKind.ARCHITECTURE_EVENT,
session_id=session_id,
payload={"name": name, **data},
)
# ---------------------------------------------------------------------------
# Run results, audit, certified values
# ---------------------------------------------------------------------------
[docs]
class RunResult(BaseModel):
"""Final outcome of an ``Agent.run`` call.
``output`` is always the raw assistant text (the JSON itself when
a structured-output schema was supplied). ``parsed`` is the
validated Pydantic instance — populated only when the caller
passed ``output_schema=`` to :meth:`Agent.run`. Use whichever
fits the call site::
# free-form text run
result = await agent.run("summarise this PDF")
print(result.output)
# structured-output run
result = await agent.run(prompt, output_schema=Invoice)
invoice: Invoice = result.parsed # typed, validated
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
session_id: str
output: str
parsed: Any | None = None
"""The validated Pydantic instance when ``output_schema=`` was
supplied to :meth:`Agent.run`; ``None`` otherwise. Typed as
``Any`` to keep the runtime type free; the call site has the
schema and can cast or annotate as needed."""
turns: int
tokens_in: int = 0
tokens_out: int = 0
cost_usd: float = 0.0
started_at: datetime
finished_at: datetime
interrupted: bool = False
interruption_reason: str | None = None
@property
def total_tokens(self) -> int:
"""Convenience: ``tokens_in + tokens_out``."""
return self.tokens_in + self.tokens_out
@property
def duration(self) -> timedelta:
"""Wall-clock latency between ``started_at`` and ``finished_at``."""
return self.finished_at - self.started_at
[docs]
class CertifiedValue(BaseModel):
"""A value carrying provenance metadata for freshness/lineage checks."""
model_config = ConfigDict(frozen=True)
value: Any
source: str
fetched_at: datetime
valid_until: datetime | None = None
schema_version: str = "1"
lineage: tuple[str, ...] = ()
[docs]
class AuditEntry(BaseModel):
"""An immutable, signed entry in the audit log."""
model_config = ConfigDict(frozen=True)
seq: int
timestamp: datetime
session_id: str
actor: str
action: str
payload: dict[str, Any]
signature: str
[docs]
class Span(BaseModel):
"""A trace span handle. Concrete telemetry adapters return their own
representation; this is the value-object contract for in-process use."""
name: str
trace_id: str
span_id: str
started_at: datetime = Field(default_factory=_utcnow)
attributes: dict[str, Any] = Field(default_factory=dict)