jeevesagent.runtime.journal

Journal stores for the durable runtime.

A journal records the result of every side-effecting step in a run, keyed by (session_id, step_name). On replay, the runtime returns the cached result instead of re-executing the step. This is the mechanism that makes long-running agents resumable across crashes.

Today’s stores:

  • InMemoryJournalStore — dict-backed; lost on process exit. Useful for tests and for runs where you want replay-within-a-run semantics but don’t need durability across restarts.

  • SqliteJournalStore — sqlite3 file with two tables; survives process restarts. Sync sqlite3 calls dispatched through anyio.to_thread.run_sync().

Both stores use pickle for value serialization. That’s safe in this context because journals only ever hold values returned by your own trusted code (tools, models, memory backends) — the same code path that ran them in the first place. Switching to JSON would force every stored value to be JSON-serialisable, which precludes Pydantic models and arbitrary tool return values.

Classes

InMemoryJournalStore

Dict-backed journal. Process-local; lost on exit.

JournalEntry

A single recorded step result with a creation timestamp.

JournalStore

Storage surface for the durable runtime.

PostgresJournalStore

Postgres-backed journal. Production-grade durable replay.

SqliteJournalStore

SQLite-backed journal. Durable across process restarts.

Module Contents

class jeevesagent.runtime.journal.InMemoryJournalStore[source]

Dict-backed journal. Process-local; lost on exit.

async aclose() None[source]
async get_step(session_id: str, step_name: str) JournalEntry | None[source]
async get_stream(session_id: str, step_name: str) list[Any] | None[source]
async put_step(session_id: str, step_name: str, value: Any) None[source]
async put_stream(session_id: str, step_name: str, chunks: list[Any]) None[source]
step_keys() list[tuple[str, str]][source]
stream_keys() list[tuple[str, str]][source]
class jeevesagent.runtime.journal.JournalEntry[source]

A single recorded step result with a creation timestamp.

created_at: float
value: Any
class jeevesagent.runtime.journal.JournalStore[source]

Bases: Protocol

Storage surface for the durable runtime.

async aclose() None[source]
async get_step(session_id: str, step_name: str) JournalEntry | None[source]
async get_stream(session_id: str, step_name: str) list[Any] | None[source]
async put_step(session_id: str, step_name: str, value: Any) None[source]
async put_stream(session_id: str, step_name: str, chunks: list[Any]) None[source]
class jeevesagent.runtime.journal.PostgresJournalStore(pool: Any)[source]

Postgres-backed journal. Production-grade durable replay.

Same shape as SqliteJournalStore but uses asyncpg and a Postgres database. Designed for users who already run a Postgres instance for the rest of their stack (memory, audit, app state) and want their durable-runtime journal to live there too.

Why not a DBOS adapter?

DBOS Python’s workflow model requires @DBOS.workflow() and @DBOS.communicator() decorators at module-load time. Our Runtime.step(name, fn, *args) API takes arbitrary callables at runtime, which doesn’t compose cleanly with DBOS’s static-decoration model. PostgresJournalStore gives the same durability guarantee through our existing JournaledRuntime architecture, with no decorator intrusion on user code.

async aclose() None[source]
classmethod connect(dsn: str, *, min_size: int = 1, max_size: int = 10) PostgresJournalStore[source]
Async:

Open an asyncpg pool and return the store rooted at it.

async get_step(session_id: str, step_name: str) JournalEntry | None[source]
async get_stream(session_id: str, step_name: str) list[Any] | None[source]
async init_schema() None[source]
async put_step(session_id: str, step_name: str, value: Any) None[source]
async put_stream(session_id: str, step_name: str, chunks: list[Any]) None[source]
static schema_sql() list[str][source]

Return the DDL needed to bootstrap this store’s schema.

Idempotent; safe to run on every process start.

class jeevesagent.runtime.journal.SqliteJournalStore(path: str | pathlib.Path)[source]

SQLite-backed journal. Durable across process restarts.

async aclose() None[source]
async get_step(session_id: str, step_name: str) JournalEntry | None[source]
async get_stream(session_id: str, step_name: str) list[Any] | None[source]
async put_step(session_id: str, step_name: str, value: Any) None[source]
async put_stream(session_id: str, step_name: str, chunks: list[Any]) None[source]
property path: pathlib.Path