"""Journal stores for the durable runtime.
A journal records the result of every side-effecting step in a run,
keyed by ``(session_id, step_name)``. On replay, the runtime returns
the cached result instead of re-executing the step. This is the
mechanism that makes long-running agents resumable across crashes.
Today's stores:
* :class:`InMemoryJournalStore` — dict-backed; lost on process exit.
Useful for tests and for runs where you want replay-within-a-run
semantics but don't need durability across restarts.
* :class:`SqliteJournalStore` — sqlite3 file with two tables; survives
process restarts. Sync sqlite3 calls dispatched through
:func:`anyio.to_thread.run_sync`.
Both stores use :mod:`pickle` for value serialization. That's safe in
this context because journals only ever hold values returned by *your
own* trusted code (tools, models, memory backends) — the same code
path that ran them in the first place. Switching to JSON would force
every stored value to be JSON-serialisable, which precludes Pydantic
models and arbitrary tool return values.
"""
from __future__ import annotations
import pickle
import sqlite3
import time
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Protocol, runtime_checkable
import anyio
[docs]
@dataclass(frozen=True)
class JournalEntry:
"""A single recorded step result with a creation timestamp."""
value: Any
created_at: float
[docs]
@runtime_checkable
class JournalStore(Protocol):
"""Storage surface for the durable runtime."""
self, session_id: str, step_name: str
) -> JournalEntry | None: ...
self, session_id: str, step_name: str, value: Any
) -> None: ...
self, session_id: str, step_name: str
) -> list[Any] | None: ...
self, session_id: str, step_name: str, chunks: list[Any]
) -> None: ...
[docs]
async def aclose(self) -> None: ...
# ---------------------------------------------------------------------------
# In-memory store
# ---------------------------------------------------------------------------
[docs]
class InMemoryJournalStore:
"""Dict-backed journal. Process-local; lost on exit."""
def __init__(self) -> None:
self._steps: dict[tuple[str, str], JournalEntry] = {}
self._streams: dict[tuple[str, str], list[Any]] = {}
self._lock = anyio.Lock()
[docs]
async def get_step(
self, session_id: str, step_name: str
) -> JournalEntry | None:
async with self._lock:
return self._steps.get((session_id, step_name))
[docs]
async def put_step(
self, session_id: str, step_name: str, value: Any
) -> None:
async with self._lock:
self._steps[(session_id, step_name)] = JournalEntry(
value=value, created_at=time.time()
)
[docs]
async def get_stream(
self, session_id: str, step_name: str
) -> list[Any] | None:
async with self._lock:
chunks = self._streams.get((session_id, step_name))
return list(chunks) if chunks is not None else None
[docs]
async def put_stream(
self, session_id: str, step_name: str, chunks: list[Any]
) -> None:
async with self._lock:
self._streams[(session_id, step_name)] = list(chunks)
[docs]
async def aclose(self) -> None:
return None
# ---- introspection (test helpers) -----------------------------------
[docs]
def step_keys(self) -> list[tuple[str, str]]:
return list(self._steps.keys())
[docs]
def stream_keys(self) -> list[tuple[str, str]]:
return list(self._streams.keys())
# ---------------------------------------------------------------------------
# SQLite store
# ---------------------------------------------------------------------------
_STEP_DDL = """
CREATE TABLE IF NOT EXISTS journal_steps (
session_id TEXT NOT NULL,
step_name TEXT NOT NULL,
value BLOB NOT NULL,
created_at REAL NOT NULL,
PRIMARY KEY (session_id, step_name)
)
"""
_STREAM_DDL = """
CREATE TABLE IF NOT EXISTS journal_streams (
session_id TEXT NOT NULL,
step_name TEXT NOT NULL,
chunks BLOB NOT NULL,
created_at REAL NOT NULL,
PRIMARY KEY (session_id, step_name)
)
"""
# Postgres-flavoured DDL for :class:`PostgresJournalStore` below.
# ``BYTEA`` instead of ``BLOB``; ``DOUBLE PRECISION`` instead of ``REAL``.
_PG_STEP_DDL = """
CREATE TABLE IF NOT EXISTS journal_steps (
session_id TEXT NOT NULL,
step_name TEXT NOT NULL,
value BYTEA NOT NULL,
created_at DOUBLE PRECISION NOT NULL,
PRIMARY KEY (session_id, step_name)
)
"""
_PG_STREAM_DDL = """
CREATE TABLE IF NOT EXISTS journal_streams (
session_id TEXT NOT NULL,
step_name TEXT NOT NULL,
chunks BYTEA NOT NULL,
created_at DOUBLE PRECISION NOT NULL,
PRIMARY KEY (session_id, step_name)
)
"""
[docs]
class SqliteJournalStore:
"""SQLite-backed journal. Durable across process restarts."""
def __init__(self, path: str | Path) -> None:
self._path = Path(path)
self._path.parent.mkdir(parents=True, exist_ok=True)
self._init_schema()
@property
def path(self) -> Path:
return self._path
@contextmanager
def _connect(self) -> Iterator[sqlite3.Connection]:
# Each call creates and closes its own connection. SQLite
# connections are not safe to share across threads by default,
# and we hop threads for every async call.
conn = sqlite3.connect(self._path)
try:
yield conn
finally:
conn.close()
def _init_schema(self) -> None:
with self._connect() as conn:
conn.execute(_STEP_DDL)
conn.execute(_STREAM_DDL)
conn.commit()
# ---- step ops --------------------------------------------------------
[docs]
async def get_step(
self, session_id: str, step_name: str
) -> JournalEntry | None:
return await anyio.to_thread.run_sync(
self._get_step_sync, session_id, step_name
)
def _get_step_sync(
self, session_id: str, step_name: str
) -> JournalEntry | None:
with self._connect() as conn:
row = conn.execute(
"SELECT value, created_at FROM journal_steps "
"WHERE session_id = ? AND step_name = ?",
(session_id, step_name),
).fetchone()
if row is None:
return None
return JournalEntry(value=pickle.loads(row[0]), created_at=row[1])
[docs]
async def put_step(
self, session_id: str, step_name: str, value: Any
) -> None:
await anyio.to_thread.run_sync(
self._put_step_sync, session_id, step_name, value
)
def _put_step_sync(
self, session_id: str, step_name: str, value: Any
) -> None:
with self._connect() as conn:
conn.execute(
"INSERT OR REPLACE INTO journal_steps "
"(session_id, step_name, value, created_at) "
"VALUES (?, ?, ?, ?)",
(session_id, step_name, pickle.dumps(value), time.time()),
)
conn.commit()
# ---- stream ops ------------------------------------------------------
[docs]
async def get_stream(
self, session_id: str, step_name: str
) -> list[Any] | None:
return await anyio.to_thread.run_sync(
self._get_stream_sync, session_id, step_name
)
def _get_stream_sync(
self, session_id: str, step_name: str
) -> list[Any] | None:
with self._connect() as conn:
row = conn.execute(
"SELECT chunks FROM journal_streams "
"WHERE session_id = ? AND step_name = ?",
(session_id, step_name),
).fetchone()
if row is None:
return None
loaded = pickle.loads(row[0])
return list(loaded) if loaded is not None else []
[docs]
async def put_stream(
self, session_id: str, step_name: str, chunks: list[Any]
) -> None:
await anyio.to_thread.run_sync(
self._put_stream_sync, session_id, step_name, list(chunks)
)
def _put_stream_sync(
self, session_id: str, step_name: str, chunks: list[Any]
) -> None:
with self._connect() as conn:
conn.execute(
"INSERT OR REPLACE INTO journal_streams "
"(session_id, step_name, chunks, created_at) "
"VALUES (?, ?, ?, ?)",
(session_id, step_name, pickle.dumps(chunks), time.time()),
)
conn.commit()
# ---- lifecycle -------------------------------------------------------
[docs]
async def aclose(self) -> None:
return None
# ---------------------------------------------------------------------------
# Postgres store (Phase 5 production durable runtime)
# ---------------------------------------------------------------------------
[docs]
class PostgresJournalStore:
"""Postgres-backed journal. Production-grade durable replay.
Same shape as :class:`SqliteJournalStore` but uses ``asyncpg`` and
a Postgres database. Designed for users who already run a Postgres
instance for the rest of their stack (memory, audit, app state)
and want their durable-runtime journal to live there too.
Why not a DBOS adapter?
DBOS Python's workflow model requires ``@DBOS.workflow()`` and
``@DBOS.communicator()`` decorators at module-load time. Our
``Runtime.step(name, fn, *args)`` API takes arbitrary
callables at runtime, which doesn't compose cleanly with
DBOS's static-decoration model. ``PostgresJournalStore``
gives the same durability guarantee through our existing
:class:`JournaledRuntime` architecture, with no decorator
intrusion on user code.
"""
def __init__(self, pool: Any) -> None:
self._pool = pool
[docs]
@classmethod
async def connect(
cls,
dsn: str,
*,
min_size: int = 1,
max_size: int = 10,
) -> PostgresJournalStore:
"""Open an asyncpg pool and return the store rooted at it."""
try:
import asyncpg # type: ignore[import-not-found, import-untyped]
except ImportError as exc: # pragma: no cover
raise ImportError(
"asyncpg is not installed. "
"Install with: pip install 'jeevesagent[postgres]'"
) from exc
pool = await asyncpg.create_pool(
dsn=dsn,
min_size=min_size,
max_size=max_size,
)
return cls(pool)
[docs]
async def aclose(self) -> None:
if self._pool is not None and hasattr(self._pool, "close"):
await self._pool.close()
# ---- schema ---------------------------------------------------------
[docs]
@staticmethod
def schema_sql() -> list[str]:
"""Return the DDL needed to bootstrap this store's schema.
Idempotent; safe to run on every process start.
"""
return [_PG_STEP_DDL.strip(), _PG_STREAM_DDL.strip()]
[docs]
async def init_schema(self) -> None:
async with self._pool.acquire() as conn:
for stmt in self.schema_sql():
await conn.execute(stmt)
# ---- step ops --------------------------------------------------------
[docs]
async def get_step(
self, session_id: str, step_name: str
) -> JournalEntry | None:
async with self._pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT value, created_at FROM journal_steps "
"WHERE session_id = $1 AND step_name = $2",
session_id,
step_name,
)
if row is None:
return None
return JournalEntry(
value=pickle.loads(row["value"]),
created_at=row["created_at"],
)
[docs]
async def put_step(
self, session_id: str, step_name: str, value: Any
) -> None:
async with self._pool.acquire() as conn:
await conn.execute(
"INSERT INTO journal_steps "
"(session_id, step_name, value, created_at) "
"VALUES ($1, $2, $3, $4) "
"ON CONFLICT (session_id, step_name) DO UPDATE "
"SET value = EXCLUDED.value, "
" created_at = EXCLUDED.created_at",
session_id,
step_name,
pickle.dumps(value),
time.time(),
)
# ---- stream ops -----------------------------------------------------
[docs]
async def get_stream(
self, session_id: str, step_name: str
) -> list[Any] | None:
async with self._pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT chunks FROM journal_streams "
"WHERE session_id = $1 AND step_name = $2",
session_id,
step_name,
)
if row is None:
return None
loaded = pickle.loads(row["chunks"])
return list(loaded) if loaded is not None else []
[docs]
async def put_stream(
self, session_id: str, step_name: str, chunks: list[Any]
) -> None:
async with self._pool.acquire() as conn:
await conn.execute(
"INSERT INTO journal_streams "
"(session_id, step_name, chunks, created_at) "
"VALUES ($1, $2, $3, $4) "
"ON CONFLICT (session_id, step_name) DO UPDATE "
"SET chunks = EXCLUDED.chunks, "
" created_at = EXCLUDED.created_at",
session_id,
step_name,
pickle.dumps(list(chunks)),
time.time(),
)