Source code for jeevesagent.runtime.postgres
"""Convenience: ``JournaledRuntime`` rooted at a Postgres pool.
Usage::
runtime = await PostgresRuntime.connect("postgres://localhost/jeeves")
await runtime.init_schema()
agent = Agent("...", model="claude-opus-4-7", runtime=runtime)
The journal lives in two Postgres tables (``journal_steps`` and
``journal_streams``) which :meth:`init_schema` creates idempotently.
Same protocol as :class:`SqliteRuntime`; production-grade durability
when paired with a managed Postgres instance.
"""
from __future__ import annotations
from typing import Any
from .journal import PostgresJournalStore
from .journaled import JournaledRuntime
[docs]
class PostgresRuntime(JournaledRuntime):
""":class:`JournaledRuntime` backed by Postgres for cross-host
durable replay."""
name = "postgres"
def __init__(self, pool: Any) -> None:
store = PostgresJournalStore(pool)
super().__init__(store=store)
self._pg_store = store
[docs]
@classmethod
async def connect(
cls,
dsn: str,
*,
min_size: int = 1,
max_size: int = 10,
) -> PostgresRuntime:
"""Open a fresh asyncpg pool and return the runtime rooted at it."""
store = await PostgresJournalStore.connect(
dsn, min_size=min_size, max_size=max_size
)
instance = cls.__new__(cls)
# Bypass ``__init__`` (which creates a new store from a pool)
# because we already have the connected store.
JournaledRuntime.__init__(instance, store=store)
instance._pg_store = store
return instance
[docs]
async def init_schema(self) -> None:
"""Create the journal tables if they don't already exist."""
await self._pg_store.init_schema()
[docs]
async def aclose(self) -> None:
"""Close the underlying connection pool."""
await self._pg_store.aclose()