Source code for jeevesagent.core.types

"""Pydantic types for messages, events, tools, memory, and runtime state.

These are the value objects that flow across module boundaries. They are
immutable where possible, validated on construction, and free of behavior
that requires I/O.
"""

from datetime import UTC, datetime, timedelta
from enum import StrEnum
from typing import Any, Literal

from pydantic import BaseModel, ConfigDict, Field

from .ids import new_id


def _utcnow() -> datetime:
    return datetime.now(UTC)


# ---------------------------------------------------------------------------
# Messages and model I/O
# ---------------------------------------------------------------------------


[docs] class Role(StrEnum): SYSTEM = "system" USER = "user" ASSISTANT = "assistant" TOOL = "tool"
[docs] class ToolDef(BaseModel): """Schema description of a tool the model can call. Mirrors the JSON-Schema-flavored shape used across MCP and provider APIs. """ model_config = ConfigDict(frozen=True) name: str description: str input_schema: dict[str, Any] = Field(default_factory=dict) server: str | None = None # MCP server name, if applicable
[docs] class ToolCall(BaseModel): """A model-emitted request to invoke a tool.""" id: str = Field(default_factory=lambda: new_id("tcall")) tool: str args: dict[str, Any] = Field(default_factory=dict) tool_def: ToolDef | None = None destructive: bool = False
[docs] def is_destructive(self) -> bool: return self.destructive
[docs] def idempotency_key(self) -> str: from .ids import deterministic_hash return deterministic_hash(self.tool, self.args)
[docs] class Message(BaseModel): """A single chat message in the model's conversation. ``tool_calls`` is populated on assistant messages that emitted tool calls in the previous turn — real provider adapters (Anthropic ``tool_use`` blocks, OpenAI ``tool_calls`` array) need to reconstruct the right wire format from this. """ model_config = ConfigDict(frozen=True) role: Role content: str name: str | None = None tool_call_id: str | None = None tool_calls: tuple[ToolCall, ...] = ()
[docs] class ToolResult(BaseModel): """Outcome of a tool invocation.""" call_id: str ok: bool output: Any = None error: str | None = None denied: bool = False reason: str | None = None duration_ms: float | None = None started_at: datetime = Field(default_factory=_utcnow)
[docs] @classmethod def success(cls, call_id: str, output: Any, **kwargs: Any) -> "ToolResult": return cls(call_id=call_id, ok=True, output=output, **kwargs)
[docs] @classmethod def error_(cls, call_id: str, message: str, **kwargs: Any) -> "ToolResult": return cls(call_id=call_id, ok=False, error=message, **kwargs)
[docs] @classmethod def denied_(cls, call_id: str, reason: str, **kwargs: Any) -> "ToolResult": return cls(call_id=call_id, ok=False, denied=True, reason=reason, **kwargs)
[docs] class Usage(BaseModel): """Token and cost accounting for a model call.""" model_config = ConfigDict(frozen=True) input_tokens: int = 0 output_tokens: int = 0 cost_usd: float = 0.0
[docs] class ModelChunk(BaseModel): """A single chunk from a streaming model call. Discriminated by ``kind``. Exactly one of the optional fields is set depending on the kind. """ kind: Literal["text", "tool_call", "finish"] text: str | None = None tool_call: ToolCall | None = None finish_reason: str | None = None usage: Usage | None = None
# --------------------------------------------------------------------------- # Memory # ---------------------------------------------------------------------------
[docs] class MemoryBlock(BaseModel): """An in-context memory block, pinned to every prompt.""" name: str content: str updated_at: datetime = Field(default_factory=_utcnow) pinned_order: int = 0
[docs] def format(self) -> str: return f"<{self.name}>\n{self.content}\n</{self.name}>"
[docs] class Episode(BaseModel): """A single (input, decisions, tool calls, output) tuple from history. ``user_id`` is the framework-managed namespace partition. Episodes persisted with one ``user_id`` value are never visible to memory recall queries scoped to a different ``user_id``. ``None`` is its own bucket — the "anonymous / single-tenant" namespace — and does not see episodes belonging to a non-None ``user_id`` (and vice versa). Set automatically from :class:`~jeevesagent.RunContext` by the agent loop; pass explicitly when constructing episodes outside a run. """ id: str = Field(default_factory=lambda: new_id("ep")) session_id: str user_id: str | None = None occurred_at: datetime = Field(default_factory=_utcnow) input: str output: str tool_calls: list[ToolCall] = Field(default_factory=list) embedding: list[float] | None = None
[docs] def format(self) -> str: return f"[{self.occurred_at.isoformat()}] {self.input!r} -> {self.output!r}"
[docs] class Fact(BaseModel): """A semantic claim extracted from one or more episodes. Bi-temporal: ``valid_from``/``valid_until`` tracks when the fact was true in the world; ``recorded_at`` tracks when we learned it. ``user_id`` is the framework-managed namespace partition. Facts persisted with one ``user_id`` value are never visible to recall queries scoped to a different ``user_id``. Set automatically from :class:`~jeevesagent.RunContext` by the agent loop / consolidator; pass explicitly when constructing facts outside a run. """ id: str = Field(default_factory=lambda: new_id("fact")) user_id: str | None = None subject: str predicate: str object: str confidence: float = 1.0 valid_from: datetime = Field(default_factory=_utcnow) valid_until: datetime | None = None recorded_at: datetime = Field(default_factory=_utcnow) sources: list[str] = Field(default_factory=list)
[docs] def format(self) -> str: suffix = "" if self.valid_until is not None: suffix = f" (until {self.valid_until.isoformat()})" return ( f"{self.subject} {self.predicate} {self.object}" f" [confidence {self.confidence:.2f}]{suffix}" )
# --------------------------------------------------------------------------- # Decisions and control signals # ---------------------------------------------------------------------------
[docs] class PermissionDecision(BaseModel): """Outcome of a permission check or pre-tool hook.""" model_config = ConfigDict(frozen=True) decision: Literal["allow", "deny", "ask"] reason: str | None = None @property def allow(self) -> bool: return self.decision == "allow" @property def deny(self) -> bool: return self.decision == "deny" @property def ask(self) -> bool: return self.decision == "ask"
[docs] @classmethod def allow_(cls, reason: str | None = None) -> "PermissionDecision": return cls(decision="allow", reason=reason)
[docs] @classmethod def deny_(cls, reason: str) -> "PermissionDecision": return cls(decision="deny", reason=reason)
[docs] @classmethod def ask_(cls, reason: str | None = None) -> "PermissionDecision": return cls(decision="ask", reason=reason)
[docs] class BudgetStatus(BaseModel): """Result of a budget check before each step.""" model_config = ConfigDict(frozen=True) state: Literal["ok", "warn", "blocked"] reason: str | None = None @property def blocked(self) -> bool: return self.state == "blocked" @property def warn(self) -> bool: return self.state == "warn"
[docs] @classmethod def ok_(cls) -> "BudgetStatus": return cls(state="ok")
[docs] @classmethod def warn_(cls, reason: str) -> "BudgetStatus": return cls(state="warn", reason=reason)
[docs] @classmethod def blocked_(cls, reason: str) -> "BudgetStatus": return cls(state="blocked", reason=reason)
# --------------------------------------------------------------------------- # Events (the streamed observation channel) # ---------------------------------------------------------------------------
[docs] class EventKind(StrEnum): STARTED = "started" MODEL_CHUNK = "model_chunk" TOOL_CALL = "tool_call" TOOL_RESULT = "tool_result" MEMORY_RECALL = "memory_recall" MEMORY_WRITE = "memory_write" BUDGET_WARNING = "budget_warning" BUDGET_EXCEEDED = "budget_exceeded" PERMISSION_ASK = "permission_ask" PERMISSION_DECISION = "permission_decision" ERROR = "error" COMPLETED = "completed" ARCHITECTURE_EVENT = "architecture_event" """Generic architecture-progress event. Carries a namespaced ``name`` in the payload (e.g. ``"self_refine.critique"``, ``"reflexion.lesson_persisted"``, ``"router.classified"``) so each architecture can stream its own progress signal without expanding :class:`EventKind`."""
[docs] class Event(BaseModel): """A single observable record from a running session. Carries a discriminator (``kind``) plus a free-form payload. Construct via the class methods to ensure consistent shapes. """ kind: EventKind session_id: str at: datetime = Field(default_factory=_utcnow) payload: dict[str, Any] = Field(default_factory=dict)
[docs] @classmethod def started(cls, session_id: str, prompt: str) -> "Event": return cls(kind=EventKind.STARTED, session_id=session_id, payload={"prompt": prompt})
[docs] @classmethod def model_chunk(cls, session_id: str, chunk: ModelChunk) -> "Event": return cls( kind=EventKind.MODEL_CHUNK, session_id=session_id, payload={"chunk": chunk.model_dump()}, )
[docs] @classmethod def tool_call(cls, session_id: str, call: ToolCall) -> "Event": return cls( kind=EventKind.TOOL_CALL, session_id=session_id, payload={"call": call.model_dump()}, )
[docs] @classmethod def tool_result(cls, session_id: str, result: ToolResult) -> "Event": return cls( kind=EventKind.TOOL_RESULT, session_id=session_id, payload={"result": result.model_dump()}, )
[docs] @classmethod def budget_warning(cls, session_id: str, status: BudgetStatus) -> "Event": return cls( kind=EventKind.BUDGET_WARNING, session_id=session_id, payload={"status": status.model_dump()}, )
[docs] @classmethod def budget_exceeded(cls, session_id: str, status: BudgetStatus) -> "Event": return cls( kind=EventKind.BUDGET_EXCEEDED, session_id=session_id, payload={"status": status.model_dump()}, )
[docs] @classmethod def error(cls, session_id: str, exc: BaseException) -> "Event": return cls( kind=EventKind.ERROR, session_id=session_id, payload={"type": type(exc).__name__, "message": str(exc)}, )
[docs] @classmethod def completed(cls, session_id: str, result: Any) -> "Event": return cls( kind=EventKind.COMPLETED, session_id=session_id, payload={"result": result}, )
[docs] @classmethod def architecture_event( cls, session_id: str, name: str, **data: Any, ) -> "Event": """Generic architecture-progress event. ``name`` is a namespaced string identifying the source architecture and the kind of progress (e.g. ``"self_refine.critique"``, ``"reflexion.lesson_persisted"``, ``"router.classified"``). ``data`` is merged into the payload alongside ``name`` so consumers can pattern-match on ``name`` and read structured fields off the rest. """ return cls( kind=EventKind.ARCHITECTURE_EVENT, session_id=session_id, payload={"name": name, **data}, )
# --------------------------------------------------------------------------- # Run results, audit, certified values # ---------------------------------------------------------------------------
[docs] class RunResult(BaseModel): """Final outcome of an ``Agent.run`` call. ``output`` is always the raw assistant text (the JSON itself when a structured-output schema was supplied). ``parsed`` is the validated Pydantic instance — populated only when the caller passed ``output_schema=`` to :meth:`Agent.run`. Use whichever fits the call site:: # free-form text run result = await agent.run("summarise this PDF") print(result.output) # structured-output run result = await agent.run(prompt, output_schema=Invoice) invoice: Invoice = result.parsed # typed, validated """ model_config = ConfigDict(arbitrary_types_allowed=True) session_id: str output: str parsed: Any | None = None """The validated Pydantic instance when ``output_schema=`` was supplied to :meth:`Agent.run`; ``None`` otherwise. Typed as ``Any`` to keep the runtime type free; the call site has the schema and can cast or annotate as needed.""" turns: int tokens_in: int = 0 tokens_out: int = 0 cost_usd: float = 0.0 started_at: datetime finished_at: datetime interrupted: bool = False interruption_reason: str | None = None @property def total_tokens(self) -> int: """Convenience: ``tokens_in + tokens_out``.""" return self.tokens_in + self.tokens_out @property def duration(self) -> timedelta: """Wall-clock latency between ``started_at`` and ``finished_at``.""" return self.finished_at - self.started_at
[docs] class CertifiedValue(BaseModel): """A value carrying provenance metadata for freshness/lineage checks.""" model_config = ConfigDict(frozen=True) value: Any source: str fetched_at: datetime valid_until: datetime | None = None schema_version: str = "1" lineage: tuple[str, ...] = ()
[docs] class AuditEntry(BaseModel): """An immutable, signed entry in the audit log.""" model_config = ConfigDict(frozen=True) seq: int timestamp: datetime session_id: str actor: str action: str payload: dict[str, Any] signature: str
[docs] class Span(BaseModel): """A trace span handle. Concrete telemetry adapters return their own representation; this is the value-object contract for in-process use.""" name: str trace_id: str span_id: str started_at: datetime = Field(default_factory=_utcnow) attributes: dict[str, Any] = Field(default_factory=dict)
[docs] class ToolEvent(BaseModel): """Tool registry change notification (MCP listChanged etc.).""" kind: Literal["added", "removed", "updated"] tool: str server: str | None = None at: datetime = Field(default_factory=_utcnow)