jeevesagent.runtime.journal
===========================

.. py:module:: jeevesagent.runtime.journal

.. autoapi-nested-parse::

   Journal stores for the durable runtime.

   A journal records the result of every side-effecting step in a run,
   keyed by ``(session_id, step_name)``. On replay, the runtime returns
   the cached result instead of re-executing the step. This is the
   mechanism that makes long-running agents resumable across crashes.

   Today's stores:

   * :class:`InMemoryJournalStore` — dict-backed; lost on process exit.
     Useful for tests and for runs where you want replay-within-a-run
     semantics but don't need durability across restarts.
   * :class:`SqliteJournalStore` — sqlite3 file with two tables; survives
     process restarts. Sync sqlite3 calls dispatched through
     :func:`anyio.to_thread.run_sync`.

   Both stores use :mod:`pickle` for value serialization. That's safe in
   this context because journals only ever hold values returned by *your
   own* trusted code (tools, models, memory backends) — the same code
   path that ran them in the first place. Switching to JSON would force
   every stored value to be JSON-serialisable, which precludes Pydantic
   models and arbitrary tool return values.



Classes
-------

.. autoapisummary::

   jeevesagent.runtime.journal.InMemoryJournalStore
   jeevesagent.runtime.journal.JournalEntry
   jeevesagent.runtime.journal.JournalStore
   jeevesagent.runtime.journal.PostgresJournalStore
   jeevesagent.runtime.journal.SqliteJournalStore


Module Contents
---------------

.. py:class:: InMemoryJournalStore

   Dict-backed journal. Process-local; lost on exit.


   .. py:method:: aclose() -> None
      :async:



   .. py:method:: get_step(session_id: str, step_name: str) -> JournalEntry | None
      :async:



   .. py:method:: get_stream(session_id: str, step_name: str) -> list[Any] | None
      :async:



   .. py:method:: put_step(session_id: str, step_name: str, value: Any) -> None
      :async:



   .. py:method:: put_stream(session_id: str, step_name: str, chunks: list[Any]) -> None
      :async:



   .. py:method:: step_keys() -> list[tuple[str, str]]


   .. py:method:: stream_keys() -> list[tuple[str, str]]


.. py:class:: JournalEntry

   A single recorded step result with a creation timestamp.


   .. py:attribute:: created_at
      :type:  float


   .. py:attribute:: value
      :type:  Any


.. py:class:: JournalStore

   Bases: :py:obj:`Protocol`


   Storage surface for the durable runtime.


   .. py:method:: aclose() -> None
      :async:



   .. py:method:: get_step(session_id: str, step_name: str) -> JournalEntry | None
      :async:



   .. py:method:: get_stream(session_id: str, step_name: str) -> list[Any] | None
      :async:



   .. py:method:: put_step(session_id: str, step_name: str, value: Any) -> None
      :async:



   .. py:method:: put_stream(session_id: str, step_name: str, chunks: list[Any]) -> None
      :async:



.. py:class:: PostgresJournalStore(pool: Any)

   Postgres-backed journal. Production-grade durable replay.

   Same shape as :class:`SqliteJournalStore` but uses ``asyncpg`` and
   a Postgres database. Designed for users who already run a Postgres
   instance for the rest of their stack (memory, audit, app state)
   and want their durable-runtime journal to live there too.

   Why not a DBOS adapter?

       DBOS Python's workflow model requires ``@DBOS.workflow()`` and
       ``@DBOS.communicator()`` decorators at module-load time. Our
       ``Runtime.step(name, fn, *args)`` API takes arbitrary
       callables at runtime, which doesn't compose cleanly with
       DBOS's static-decoration model. ``PostgresJournalStore``
       gives the same durability guarantee through our existing
       :class:`JournaledRuntime` architecture, with no decorator
       intrusion on user code.


   .. py:method:: aclose() -> None
      :async:



   .. py:method:: connect(dsn: str, *, min_size: int = 1, max_size: int = 10) -> PostgresJournalStore
      :classmethod:

      :async:


      Open an asyncpg pool and return the store rooted at it.



   .. py:method:: get_step(session_id: str, step_name: str) -> JournalEntry | None
      :async:



   .. py:method:: get_stream(session_id: str, step_name: str) -> list[Any] | None
      :async:



   .. py:method:: init_schema() -> None
      :async:



   .. py:method:: put_step(session_id: str, step_name: str, value: Any) -> None
      :async:



   .. py:method:: put_stream(session_id: str, step_name: str, chunks: list[Any]) -> None
      :async:



   .. py:method:: schema_sql() -> list[str]
      :staticmethod:


      Return the DDL needed to bootstrap this store's schema.

      Idempotent; safe to run on every process start.



.. py:class:: SqliteJournalStore(path: str | pathlib.Path)

   SQLite-backed journal. Durable across process restarts.


   .. py:method:: aclose() -> None
      :async:



   .. py:method:: get_step(session_id: str, step_name: str) -> JournalEntry | None
      :async:



   .. py:method:: get_stream(session_id: str, step_name: str) -> list[Any] | None
      :async:



   .. py:method:: put_step(session_id: str, step_name: str, value: Any) -> None
      :async:



   .. py:method:: put_stream(session_id: str, step_name: str, chunks: list[Any]) -> None
      :async:



   .. py:property:: path
      :type: pathlib.Path



