jeevesagent.runtime

Durable runtime adapters.

  • InProcRuntime — no durability; just runs every step.

  • JournaledRuntime — generic journal-backed runtime that caches step results and replays them on a second call to the same (session_id, step_name). Pair with any JournalStore.

  • SqliteRuntime — convenience: JournaledRuntime rooted at a sqlite file. Durable across process restarts.

Future adapters: DBOSRuntime (Postgres-backed via DBOS workflows), TemporalRuntime (Temporal cluster).

Submodules

Classes

InMemoryJournalStore

Dict-backed journal. Process-local; lost on exit.

InProcRuntime

No durability. Each step runs immediately.

InProcSession

Trivial session: just a holder for the session ID and signals.

JournalEntry

A single recorded step result with a creation timestamp.

JournalStore

Storage surface for the durable runtime.

JournaledRuntime

Runtime that journals every step's result for replay.

JournaledSession

The handle yielded by JournaledRuntime.session().

PostgresJournalStore

Postgres-backed journal. Production-grade durable replay.

PostgresRuntime

JournaledRuntime backed by Postgres for cross-host

SqliteJournalStore

SQLite-backed journal. Durable across process restarts.

SqliteRuntime

JournaledRuntime with a SqliteJournalStore.

Package Contents

class jeevesagent.runtime.InMemoryJournalStore[source]

Dict-backed journal. Process-local; lost on exit.

async aclose() None[source]
async get_step(session_id: str, step_name: str) JournalEntry | None[source]
async get_stream(session_id: str, step_name: str) list[Any] | None[source]
async put_step(session_id: str, step_name: str, value: Any) None[source]
async put_stream(session_id: str, step_name: str, chunks: list[Any]) None[source]
step_keys() list[tuple[str, str]][source]
stream_keys() list[tuple[str, str]][source]
class jeevesagent.runtime.InProcRuntime[source]

No durability. Each step runs immediately.

async session(session_id: str) collections.abc.AsyncIterator[InProcSession][source]
async signal(session_id: str, name: str, payload: Any) None[source]
async step(name: str, fn: collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]], *args: Any, idempotency_key: str | None = None, **kwargs: Any) Any[source]
stream_step(name: str, fn: collections.abc.Callable[Ellipsis, collections.abc.AsyncIterator[Any]], *args: Any, **kwargs: Any) collections.abc.AsyncIterator[Any][source]
name = 'inproc'
class jeevesagent.runtime.InProcSession(session_id: str)[source]

Trivial session: just a holder for the session ID and signals.

async deliver(name: str, payload: Any) None[source]
id
class jeevesagent.runtime.JournalEntry[source]

A single recorded step result with a creation timestamp.

created_at: float
value: Any
class jeevesagent.runtime.JournalStore[source]

Bases: Protocol

Storage surface for the durable runtime.

async aclose() None[source]
async get_step(session_id: str, step_name: str) JournalEntry | None[source]
async get_stream(session_id: str, step_name: str) list[Any] | None[source]
async put_step(session_id: str, step_name: str, value: Any) None[source]
async put_stream(session_id: str, step_name: str, chunks: list[Any]) None[source]
class jeevesagent.runtime.JournaledRuntime(store: jeevesagent.runtime.journal.JournalStore | None = None)[source]

Runtime that journals every step’s result for replay.

Pass any JournalStore (in-memory for tests, sqlite for durable single-process use, future Postgres/DBOS adapters for multi-process / multi-host).

async session(session_id: str) collections.abc.AsyncIterator[JournaledSession][source]
async signal(session_id: str, name: str, payload: Any) None[source]
async step(name: str, fn: collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]], *args: Any, idempotency_key: str | None = None, **kwargs: Any) Any[source]
stream_step(name: str, fn: collections.abc.Callable[Ellipsis, collections.abc.AsyncIterator[Any]], *args: Any, **kwargs: Any) collections.abc.AsyncIterator[Any][source]
name = 'journaled'
property store: jeevesagent.runtime.journal.JournalStore
class jeevesagent.runtime.JournaledSession(session_id: str)[source]

The handle yielded by JournaledRuntime.session().

async deliver(name: str, payload: Any) None[source]
id
class jeevesagent.runtime.PostgresJournalStore(pool: Any)[source]

Postgres-backed journal. Production-grade durable replay.

Same shape as SqliteJournalStore but uses asyncpg and a Postgres database. Designed for users who already run a Postgres instance for the rest of their stack (memory, audit, app state) and want their durable-runtime journal to live there too.

Why not a DBOS adapter?

DBOS Python’s workflow model requires @DBOS.workflow() and @DBOS.communicator() decorators at module-load time. Our Runtime.step(name, fn, *args) API takes arbitrary callables at runtime, which doesn’t compose cleanly with DBOS’s static-decoration model. PostgresJournalStore gives the same durability guarantee through our existing JournaledRuntime architecture, with no decorator intrusion on user code.

async aclose() None[source]
classmethod connect(dsn: str, *, min_size: int = 1, max_size: int = 10) PostgresJournalStore[source]
Async:

Open an asyncpg pool and return the store rooted at it.

async get_step(session_id: str, step_name: str) JournalEntry | None[source]
async get_stream(session_id: str, step_name: str) list[Any] | None[source]
async init_schema() None[source]
async put_step(session_id: str, step_name: str, value: Any) None[source]
async put_stream(session_id: str, step_name: str, chunks: list[Any]) None[source]
static schema_sql() list[str][source]

Return the DDL needed to bootstrap this store’s schema.

Idempotent; safe to run on every process start.

class jeevesagent.runtime.PostgresRuntime(pool: Any)[source]

Bases: jeevesagent.runtime.journaled.JournaledRuntime

JournaledRuntime backed by Postgres for cross-host durable replay.

async aclose() None[source]

Close the underlying connection pool.

classmethod connect(dsn: str, *, min_size: int = 1, max_size: int = 10) PostgresRuntime[source]
Async:

Open a fresh asyncpg pool and return the runtime rooted at it.

async init_schema() None[source]

Create the journal tables if they don’t already exist.

name = 'postgres'
class jeevesagent.runtime.SqliteJournalStore(path: str | pathlib.Path)[source]

SQLite-backed journal. Durable across process restarts.

async aclose() None[source]
async get_step(session_id: str, step_name: str) JournalEntry | None[source]
async get_stream(session_id: str, step_name: str) list[Any] | None[source]
async put_step(session_id: str, step_name: str, value: Any) None[source]
async put_stream(session_id: str, step_name: str, chunks: list[Any]) None[source]
property path: pathlib.Path
class jeevesagent.runtime.SqliteRuntime(path: str | pathlib.Path)[source]

Bases: jeevesagent.runtime.journaled.JournaledRuntime

JournaledRuntime with a SqliteJournalStore.

name = 'sqlite'
property path: pathlib.Path