jeevesagent.runtime
===================

.. py:module:: jeevesagent.runtime

.. autoapi-nested-parse::

   Durable runtime adapters.

   * :class:`InProcRuntime` — no durability; just runs every step.
   * :class:`JournaledRuntime` — generic journal-backed runtime that
     caches step results and replays them on a second call to the same
     ``(session_id, step_name)``. Pair with any :class:`JournalStore`.
   * :class:`SqliteRuntime` — convenience: ``JournaledRuntime`` rooted
     at a sqlite file. Durable across process restarts.

   Future adapters: ``DBOSRuntime`` (Postgres-backed via DBOS workflows),
   ``TemporalRuntime`` (Temporal cluster).



Submodules
----------

.. toctree::
   :maxdepth: 1

   /api/jeevesagent/runtime/inproc/index
   /api/jeevesagent/runtime/journal/index
   /api/jeevesagent/runtime/journaled/index
   /api/jeevesagent/runtime/postgres/index
   /api/jeevesagent/runtime/sqlite/index


Classes
-------

.. autoapisummary::

   jeevesagent.runtime.InMemoryJournalStore
   jeevesagent.runtime.InProcRuntime
   jeevesagent.runtime.InProcSession
   jeevesagent.runtime.JournalEntry
   jeevesagent.runtime.JournalStore
   jeevesagent.runtime.JournaledRuntime
   jeevesagent.runtime.JournaledSession
   jeevesagent.runtime.PostgresJournalStore
   jeevesagent.runtime.PostgresRuntime
   jeevesagent.runtime.SqliteJournalStore
   jeevesagent.runtime.SqliteRuntime


Package Contents
----------------

.. py:class:: InMemoryJournalStore

   Dict-backed journal. Process-local; lost on exit.


   .. py:method:: aclose() -> None
      :async:



   .. py:method:: get_step(session_id: str, step_name: str) -> JournalEntry | None
      :async:



   .. py:method:: get_stream(session_id: str, step_name: str) -> list[Any] | None
      :async:



   .. py:method:: put_step(session_id: str, step_name: str, value: Any) -> None
      :async:



   .. py:method:: put_stream(session_id: str, step_name: str, chunks: list[Any]) -> None
      :async:



   .. py:method:: step_keys() -> list[tuple[str, str]]


   .. py:method:: stream_keys() -> list[tuple[str, str]]


.. py:class:: InProcRuntime

   No durability. Each step runs immediately.


   .. py:method:: session(session_id: str) -> collections.abc.AsyncIterator[InProcSession]
      :async:



   .. py:method:: signal(session_id: str, name: str, payload: Any) -> None
      :async:



   .. py:method:: step(name: str, fn: collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]], *args: Any, idempotency_key: str | None = None, **kwargs: Any) -> Any
      :async:



   .. py:method:: stream_step(name: str, fn: collections.abc.Callable[Ellipsis, collections.abc.AsyncIterator[Any]], *args: Any, **kwargs: Any) -> collections.abc.AsyncIterator[Any]


   .. py:attribute:: name
      :value: 'inproc'



.. py:class:: InProcSession(session_id: str)

   Trivial session: just a holder for the session ID and signals.


   .. py:method:: deliver(name: str, payload: Any) -> None
      :async:



   .. py:attribute:: id


.. py:class:: JournalEntry

   A single recorded step result with a creation timestamp.


   .. py:attribute:: created_at
      :type:  float


   .. py:attribute:: value
      :type:  Any


.. py:class:: JournalStore

   Bases: :py:obj:`Protocol`


   Storage surface for the durable runtime.


   .. py:method:: aclose() -> None
      :async:



   .. py:method:: get_step(session_id: str, step_name: str) -> JournalEntry | None
      :async:



   .. py:method:: get_stream(session_id: str, step_name: str) -> list[Any] | None
      :async:



   .. py:method:: put_step(session_id: str, step_name: str, value: Any) -> None
      :async:



   .. py:method:: put_stream(session_id: str, step_name: str, chunks: list[Any]) -> None
      :async:



.. py:class:: JournaledRuntime(store: jeevesagent.runtime.journal.JournalStore | None = None)

   Runtime that journals every step's result for replay.

   Pass any :class:`JournalStore` (in-memory for tests, sqlite for
   durable single-process use, future Postgres/DBOS adapters for
   multi-process / multi-host).


   .. py:method:: session(session_id: str) -> collections.abc.AsyncIterator[JournaledSession]
      :async:



   .. py:method:: signal(session_id: str, name: str, payload: Any) -> None
      :async:



   .. py:method:: step(name: str, fn: collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]], *args: Any, idempotency_key: str | None = None, **kwargs: Any) -> Any
      :async:



   .. py:method:: stream_step(name: str, fn: collections.abc.Callable[Ellipsis, collections.abc.AsyncIterator[Any]], *args: Any, **kwargs: Any) -> collections.abc.AsyncIterator[Any]


   .. py:attribute:: name
      :value: 'journaled'



   .. py:property:: store
      :type: jeevesagent.runtime.journal.JournalStore



.. py:class:: JournaledSession(session_id: str)

   The handle yielded by :meth:`JournaledRuntime.session`.


   .. py:method:: deliver(name: str, payload: Any) -> None
      :async:



   .. py:attribute:: id


.. py:class:: PostgresJournalStore(pool: Any)

   Postgres-backed journal. Production-grade durable replay.

   Same shape as :class:`SqliteJournalStore` but uses ``asyncpg`` and
   a Postgres database. Designed for users who already run a Postgres
   instance for the rest of their stack (memory, audit, app state)
   and want their durable-runtime journal to live there too.

   Why not a DBOS adapter?

       DBOS Python's workflow model requires ``@DBOS.workflow()`` and
       ``@DBOS.communicator()`` decorators at module-load time. Our
       ``Runtime.step(name, fn, *args)`` API takes arbitrary
       callables at runtime, which doesn't compose cleanly with
       DBOS's static-decoration model. ``PostgresJournalStore``
       gives the same durability guarantee through our existing
       :class:`JournaledRuntime` architecture, with no decorator
       intrusion on user code.


   .. py:method:: aclose() -> None
      :async:



   .. py:method:: connect(dsn: str, *, min_size: int = 1, max_size: int = 10) -> PostgresJournalStore
      :classmethod:

      :async:


      Open an asyncpg pool and return the store rooted at it.



   .. py:method:: get_step(session_id: str, step_name: str) -> JournalEntry | None
      :async:



   .. py:method:: get_stream(session_id: str, step_name: str) -> list[Any] | None
      :async:



   .. py:method:: init_schema() -> None
      :async:



   .. py:method:: put_step(session_id: str, step_name: str, value: Any) -> None
      :async:



   .. py:method:: put_stream(session_id: str, step_name: str, chunks: list[Any]) -> None
      :async:



   .. py:method:: schema_sql() -> list[str]
      :staticmethod:


      Return the DDL needed to bootstrap this store's schema.

      Idempotent; safe to run on every process start.



.. py:class:: PostgresRuntime(pool: Any)

   Bases: :py:obj:`jeevesagent.runtime.journaled.JournaledRuntime`


   :class:`JournaledRuntime` backed by Postgres for cross-host
   durable replay.


   .. py:method:: aclose() -> None
      :async:


      Close the underlying connection pool.



   .. py:method:: connect(dsn: str, *, min_size: int = 1, max_size: int = 10) -> PostgresRuntime
      :classmethod:

      :async:


      Open a fresh asyncpg pool and return the runtime rooted at it.



   .. py:method:: init_schema() -> None
      :async:


      Create the journal tables if they don't already exist.



   .. py:attribute:: name
      :value: 'postgres'



.. py:class:: SqliteJournalStore(path: str | pathlib.Path)

   SQLite-backed journal. Durable across process restarts.


   .. py:method:: aclose() -> None
      :async:



   .. py:method:: get_step(session_id: str, step_name: str) -> JournalEntry | None
      :async:



   .. py:method:: get_stream(session_id: str, step_name: str) -> list[Any] | None
      :async:



   .. py:method:: put_step(session_id: str, step_name: str, value: Any) -> None
      :async:



   .. py:method:: put_stream(session_id: str, step_name: str, chunks: list[Any]) -> None
      :async:



   .. py:property:: path
      :type: pathlib.Path



.. py:class:: SqliteRuntime(path: str | pathlib.Path)

   Bases: :py:obj:`jeevesagent.runtime.journaled.JournaledRuntime`


   :class:`JournaledRuntime` with a :class:`SqliteJournalStore`.


   .. py:attribute:: name
      :value: 'sqlite'



   .. py:property:: path
      :type: pathlib.Path



