Source code for jeevesagent.core.protocols

"""Protocol definitions for every module boundary.

These structural types are the contract surface of the harness. Every
implementation — first-party or third-party — satisfies one of these. The
loop and the agent only depend on the protocols, never on concrete
implementations.

The protocols are intentionally async-only: every method that performs
I/O is a coroutine, every stream is an :class:`AsyncIterator`, every
resource is an :class:`AsyncContextManager`.
"""

from __future__ import annotations

from collections.abc import AsyncIterator, Awaitable, Callable, Mapping
from contextlib import AbstractAsyncContextManager
from datetime import datetime
from typing import Any, Protocol, runtime_checkable

from .types import (
    BudgetStatus,
    Episode,
    Event,
    Fact,
    MemoryBlock,
    Message,
    ModelChunk,
    PermissionDecision,
    Span,
    ToolCall,
    ToolDef,
    ToolEvent,
    ToolResult,
)


[docs] @runtime_checkable class Model(Protocol): """LLM provider interface. One adapter per lab (Anthropic, OpenAI, ...). The required surface is ``stream(...)`` — every adapter must implement it. Adapters MAY additionally override ``complete(...)`` with a non-streaming (single-shot) call; if not, ``complete`` falls back to consuming the stream internally and assembling the full response, which is correct but slower (per-chunk wire + parsing overhead). Architectures use ``complete`` on the non-streaming hot path (``agent.run()``) and ``stream`` when a consumer is reading from ``agent.stream()``. """ name: str
[docs] def stream( self, messages: list[Message], *, tools: list[ToolDef] | None = None, temperature: float = 1.0, max_tokens: int | None = None, ) -> AsyncIterator[ModelChunk]: """Stream completion chunks. Each chunk is text, tool_call, or finish.""" ...
[docs] @runtime_checkable class Memory(Protocol): """Tiered memory: working blocks, episodic store, semantic graph."""
[docs] async def working(self) -> list[MemoryBlock]: """All in-context blocks. Pinned to every prompt.""" ...
[docs] async def update_block(self, name: str, content: str) -> None: """Replace the contents of a named block.""" ...
[docs] async def append_block(self, name: str, content: str) -> None: """Append to a named block, creating it if absent.""" ...
[docs] async def remember(self, episode: Episode) -> str: """Persist an episode. Returns the episode ID.""" ...
[docs] async def recall( self, query: str, *, kind: str = "episodic", limit: int = 5, time_range: tuple[datetime, datetime] | None = None, user_id: str | None = None, ) -> list[Episode]: """Retrieve episodes (or facts, when ``kind='semantic'``). When ``user_id`` is supplied, results are restricted to episodes stored with that exact ``user_id`` value. ``None`` is its own bucket (the "anonymous / single-tenant" namespace) — episodes stored with ``user_id=None`` are never visible to a query with ``user_id="alice"`` and vice versa. Backends MUST honour this filter to preserve the framework's multi-tenant safety contract. """ ...
[docs] async def recall_facts( self, query: str, *, limit: int = 5, valid_at: datetime | None = None, user_id: str | None = None, ) -> list[Fact]: """Retrieve bi-temporal facts matching ``query``. Backends that don't expose a fact store return ``[]``. The agent loop calls this directly rather than duck-typing on ``memory.facts`` so backends without fact support don't need any opt-out mechanism. ``user_id`` filters by namespace partition with the same semantics as :meth:`recall`: ``None`` is its own bucket and does not cross-contaminate with non-None values. """ ...
[docs] async def consolidate(self) -> None: """Background: extract semantic facts from recent episodes.""" ...
[docs] async def session_messages( self, session_id: str, *, user_id: str | None = None, limit: int = 20, ) -> list[Message]: """Return the most-recent ``limit`` user/assistant turns from the conversation identified by ``session_id``, in order (oldest first). This is the conversation-continuity primitive — the agent loop calls it at the top of every run so that reusing a ``session_id`` actually continues the chat (the model sees previous turns as real :class:`Message` history) rather than starting fresh and relying solely on semantic recall. ``user_id`` MUST be respected by backends as a hard namespace partition: messages persisted under one ``user_id`` are never visible to a query scoped to a different one. Backends without persisted message logs return ``[]`` — the agent loop falls back to the semantic-recall path in that case. """ ...
[docs] class RuntimeSession(Protocol): """Handle to an open durable session held by a :class:`Runtime`.""" id: str
[docs] async def deliver(self, name: str, payload: Any) -> None: ...
[docs] @runtime_checkable class Runtime(Protocol): """Durable execution. Wraps every side effect in a journal entry.""" name: str
[docs] async def step( self, name: str, fn: Callable[..., Awaitable[Any]], *args: Any, idempotency_key: str | None = None, **kwargs: Any, ) -> Any: """Execute ``fn`` as a journaled step. Replays cached on resume.""" ...
[docs] def stream_step( self, name: str, fn: Callable[..., AsyncIterator[Any]], *args: Any, **kwargs: Any, ) -> AsyncIterator[Any]: """Execute a streaming step. Replays the aggregate on resume.""" ...
[docs] def session( self, session_id: str, ) -> AbstractAsyncContextManager[RuntimeSession]: """Open or resume a durable session.""" ...
[docs] async def signal(self, session_id: str, name: str, payload: Any) -> None: """Send an external signal (e.g., human approval) to a session.""" ...
[docs] @runtime_checkable class ToolHost(Protocol): """MCP-aware tool registry. Lazy-loads schemas on demand."""
[docs] async def list_tools(self, *, query: str | None = None) -> list[ToolDef]: ...
[docs] async def call( self, tool: str, args: Mapping[str, Any], *, call_id: str = "", ) -> ToolResult: """Invoke ``tool`` with ``args``. The ``call_id`` is propagated into the returned :class:`ToolResult` so the loop can correlate results with the originating model-emitted call. """ ...
[docs] def watch(self) -> AsyncIterator[ToolEvent]: """Notifications when the tool list changes (MCP listChanged).""" ...
[docs] class Sandbox(Protocol): """Isolation layer for tool execution."""
[docs] async def execute(self, tool: ToolDef, args: Mapping[str, Any]) -> ToolResult: ...
[docs] def with_filesystem( self, root: str ) -> AbstractAsyncContextManager[None]: """Temporary filesystem sandbox for the duration of the context.""" ...
[docs] class Permissions(Protocol): """Decides whether a tool call is allowed."""
[docs] async def check( self, call: ToolCall, *, context: Mapping[str, Any] ) -> PermissionDecision: ...
[docs] class HookHost(Protocol): """Aggregator over user-registered lifecycle callbacks."""
[docs] async def pre_tool(self, call: ToolCall) -> PermissionDecision: ...
[docs] async def post_tool(self, call: ToolCall, result: ToolResult) -> None: ...
[docs] async def on_event(self, event: Event) -> None: ...
[docs] class Budget(Protocol): """Resource governance — tokens, calls, cost, wall clock."""
[docs] async def allows_step(self) -> BudgetStatus: ...
[docs] async def consume( self, *, tokens_in: int, tokens_out: int, cost_usd: float, ) -> None: ...
[docs] class Telemetry(Protocol): """OpenTelemetry-compatible tracing/metrics surface."""
[docs] def trace( self, name: str, **attrs: Any ) -> AbstractAsyncContextManager[Span]: ...
[docs] async def emit_metric(self, name: str, value: float, **attrs: Any) -> None: ...
[docs] class Embedder(Protocol): """Text-to-vector embedding model used by the memory subsystem.""" name: str dimensions: int
[docs] async def embed(self, text: str) -> list[float]: ...
[docs] async def embed_batch(self, texts: list[str]) -> list[list[float]]: ...
[docs] class Secrets(Protocol): """Resolution and redaction of named secrets."""
[docs] async def resolve(self, ref: str) -> str: ...
[docs] async def store(self, ref: str, value: str) -> None: ...
[docs] def redact(self, text: str) -> str: ...