"""Protocol definitions for every module boundary.
These structural types are the contract surface of the harness. Every
implementation — first-party or third-party — satisfies one of these. The
loop and the agent only depend on the protocols, never on concrete
implementations.
The protocols are intentionally async-only: every method that performs
I/O is a coroutine, every stream is an :class:`AsyncIterator`, every
resource is an :class:`AsyncContextManager`.
"""
from __future__ import annotations
from collections.abc import AsyncIterator, Awaitable, Callable, Mapping
from contextlib import AbstractAsyncContextManager
from datetime import datetime
from typing import Any, Protocol, runtime_checkable
from .types import (
BudgetStatus,
Episode,
Event,
Fact,
MemoryBlock,
Message,
ModelChunk,
PermissionDecision,
Span,
ToolCall,
ToolDef,
ToolEvent,
ToolResult,
)
[docs]
@runtime_checkable
class Model(Protocol):
"""LLM provider interface. One adapter per lab (Anthropic, OpenAI, ...).
The required surface is ``stream(...)`` — every adapter must
implement it. Adapters MAY additionally override ``complete(...)``
with a non-streaming (single-shot) call; if not, ``complete``
falls back to consuming the stream internally and assembling the
full response, which is correct but slower (per-chunk wire +
parsing overhead). Architectures use ``complete`` on the
non-streaming hot path (``agent.run()``) and ``stream`` when a
consumer is reading from ``agent.stream()``.
"""
name: str
[docs]
def stream(
self,
messages: list[Message],
*,
tools: list[ToolDef] | None = None,
temperature: float = 1.0,
max_tokens: int | None = None,
) -> AsyncIterator[ModelChunk]:
"""Stream completion chunks. Each chunk is text, tool_call, or finish."""
...
[docs]
@runtime_checkable
class Memory(Protocol):
"""Tiered memory: working blocks, episodic store, semantic graph."""
[docs]
async def working(self) -> list[MemoryBlock]:
"""All in-context blocks. Pinned to every prompt."""
...
[docs]
async def update_block(self, name: str, content: str) -> None:
"""Replace the contents of a named block."""
...
[docs]
async def append_block(self, name: str, content: str) -> None:
"""Append to a named block, creating it if absent."""
...
[docs]
async def remember(self, episode: Episode) -> str:
"""Persist an episode. Returns the episode ID."""
...
[docs]
async def recall(
self,
query: str,
*,
kind: str = "episodic",
limit: int = 5,
time_range: tuple[datetime, datetime] | None = None,
user_id: str | None = None,
) -> list[Episode]:
"""Retrieve episodes (or facts, when ``kind='semantic'``).
When ``user_id`` is supplied, results are restricted to
episodes stored with that exact ``user_id`` value. ``None``
is its own bucket (the "anonymous / single-tenant"
namespace) — episodes stored with ``user_id=None`` are never
visible to a query with ``user_id="alice"`` and vice versa.
Backends MUST honour this filter to preserve the framework's
multi-tenant safety contract.
"""
...
[docs]
async def recall_facts(
self,
query: str,
*,
limit: int = 5,
valid_at: datetime | None = None,
user_id: str | None = None,
) -> list[Fact]:
"""Retrieve bi-temporal facts matching ``query``.
Backends that don't expose a fact store return ``[]``. The agent
loop calls this directly rather than duck-typing on
``memory.facts`` so backends without fact support don't need
any opt-out mechanism.
``user_id`` filters by namespace partition with the same
semantics as :meth:`recall`: ``None`` is its own bucket and
does not cross-contaminate with non-None values.
"""
...
[docs]
async def consolidate(self) -> None:
"""Background: extract semantic facts from recent episodes."""
...
[docs]
async def session_messages(
self,
session_id: str,
*,
user_id: str | None = None,
limit: int = 20,
) -> list[Message]:
"""Return the most-recent ``limit`` user/assistant turns from
the conversation identified by ``session_id``, in order
(oldest first).
This is the conversation-continuity primitive — the agent
loop calls it at the top of every run so that reusing a
``session_id`` actually continues the chat (the model sees
previous turns as real :class:`Message` history) rather than
starting fresh and relying solely on semantic recall.
``user_id`` MUST be respected by backends as a hard
namespace partition: messages persisted under one
``user_id`` are never visible to a query scoped to a
different one. Backends without persisted message logs
return ``[]`` — the agent loop falls back to the
semantic-recall path in that case.
"""
...
[docs]
class RuntimeSession(Protocol):
"""Handle to an open durable session held by a :class:`Runtime`."""
id: str
[docs]
async def deliver(self, name: str, payload: Any) -> None:
...
[docs]
@runtime_checkable
class Runtime(Protocol):
"""Durable execution. Wraps every side effect in a journal entry."""
name: str
[docs]
async def step(
self,
name: str,
fn: Callable[..., Awaitable[Any]],
*args: Any,
idempotency_key: str | None = None,
**kwargs: Any,
) -> Any:
"""Execute ``fn`` as a journaled step. Replays cached on resume."""
...
[docs]
def stream_step(
self,
name: str,
fn: Callable[..., AsyncIterator[Any]],
*args: Any,
**kwargs: Any,
) -> AsyncIterator[Any]:
"""Execute a streaming step. Replays the aggregate on resume."""
...
[docs]
def session(
self,
session_id: str,
) -> AbstractAsyncContextManager[RuntimeSession]:
"""Open or resume a durable session."""
...
[docs]
async def signal(self, session_id: str, name: str, payload: Any) -> None:
"""Send an external signal (e.g., human approval) to a session."""
...
[docs]
class Sandbox(Protocol):
"""Isolation layer for tool execution."""
[docs]
async def execute(self, tool: ToolDef, args: Mapping[str, Any]) -> ToolResult:
...
[docs]
def with_filesystem(
self, root: str
) -> AbstractAsyncContextManager[None]:
"""Temporary filesystem sandbox for the duration of the context."""
...
[docs]
class Permissions(Protocol):
"""Decides whether a tool call is allowed."""
[docs]
async def check(
self, call: ToolCall, *, context: Mapping[str, Any]
) -> PermissionDecision:
...
[docs]
class HookHost(Protocol):
"""Aggregator over user-registered lifecycle callbacks."""
[docs]
async def post_tool(self, call: ToolCall, result: ToolResult) -> None:
...
[docs]
async def on_event(self, event: Event) -> None:
...
[docs]
class Budget(Protocol):
"""Resource governance — tokens, calls, cost, wall clock."""
[docs]
async def allows_step(self) -> BudgetStatus:
...
[docs]
async def consume(
self,
*,
tokens_in: int,
tokens_out: int,
cost_usd: float,
) -> None:
...
[docs]
class Telemetry(Protocol):
"""OpenTelemetry-compatible tracing/metrics surface."""
[docs]
def trace(
self, name: str, **attrs: Any
) -> AbstractAsyncContextManager[Span]:
...
[docs]
async def emit_metric(self, name: str, value: float, **attrs: Any) -> None:
...
[docs]
class Embedder(Protocol):
"""Text-to-vector embedding model used by the memory subsystem."""
name: str
dimensions: int
[docs]
async def embed(self, text: str) -> list[float]:
...
[docs]
async def embed_batch(self, texts: list[str]) -> list[list[float]]:
...
[docs]
class Secrets(Protocol):
"""Resolution and redaction of named secrets."""
[docs]
async def resolve(self, ref: str) -> str:
...
[docs]
async def store(self, ref: str, value: str) -> None:
...
[docs]
def redact(self, text: str) -> str:
...