jeevesagent.runtime.journaled

Journal-based durable runtime.

Wraps a JournalStore with the Runtime protocol. The contract: every step() and stream_step() call inside an open session(session_id) context records its result. On a subsequent call with the same (session_id, step_name), the cached result is returned without re-executing the underlying function.

Session tracking uses contextvars.ContextVar. anyio’s structured concurrency propagates contextvars to spawned tasks, so parallel tool dispatches under _dispatch_tools still see the right session id without explicit threading.

When step() is called outside any open session, the journal is bypassed and the function runs directly — the runtime degrades gracefully into the same behavior as InProcRuntime.

Classes

JournaledRuntime

Runtime that journals every step's result for replay.

JournaledSession

The handle yielded by JournaledRuntime.session().

Module Contents

class jeevesagent.runtime.journaled.JournaledRuntime(store: jeevesagent.runtime.journal.JournalStore | None = None)[source]

Runtime that journals every step’s result for replay.

Pass any JournalStore (in-memory for tests, sqlite for durable single-process use, future Postgres/DBOS adapters for multi-process / multi-host).

async session(session_id: str) collections.abc.AsyncIterator[JournaledSession][source]
async signal(session_id: str, name: str, payload: Any) None[source]
async step(name: str, fn: collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]], *args: Any, idempotency_key: str | None = None, **kwargs: Any) Any[source]
stream_step(name: str, fn: collections.abc.Callable[Ellipsis, collections.abc.AsyncIterator[Any]], *args: Any, **kwargs: Any) collections.abc.AsyncIterator[Any][source]
name = 'journaled'
property store: jeevesagent.runtime.journal.JournalStore
class jeevesagent.runtime.journaled.JournaledSession(session_id: str)[source]

The handle yielded by JournaledRuntime.session().

async deliver(name: str, payload: Any) None[source]
id