Source code for jeevesagent.runtime.inproc
"""In-process runtime: no durability, no journal.
Every step just runs. Used in dev, tests, and demos. Production users
swap in :class:`DBOSRuntime` or :class:`TemporalRuntime` (Phase 5).
"""
from __future__ import annotations
from collections.abc import AsyncIterator, Awaitable, Callable
from contextlib import asynccontextmanager
from typing import Any
[docs]
class InProcSession:
"""Trivial session: just a holder for the session ID and signals."""
def __init__(self, session_id: str) -> None:
self.id = session_id
self._signals: dict[str, Any] = {}
[docs]
async def deliver(self, name: str, payload: Any) -> None:
self._signals[name] = payload
[docs]
class InProcRuntime:
"""No durability. Each step runs immediately."""
name = "inproc"
def __init__(self) -> None:
self._sessions: dict[str, InProcSession] = {}
[docs]
async def step(
self,
name: str,
fn: Callable[..., Awaitable[Any]],
*args: Any,
idempotency_key: str | None = None,
**kwargs: Any,
) -> Any:
return await fn(*args, **kwargs)
[docs]
def stream_step(
self,
name: str,
fn: Callable[..., AsyncIterator[Any]],
*args: Any,
**kwargs: Any,
) -> AsyncIterator[Any]:
return fn(*args, **kwargs)
[docs]
@asynccontextmanager
async def session(self, session_id: str) -> AsyncIterator[InProcSession]:
s = self._sessions.setdefault(session_id, InProcSession(session_id))
try:
yield s
finally:
pass # no persistence; keep in-memory for the process lifetime
[docs]
async def signal(self, session_id: str, name: str, payload: Any) -> None:
s = self._sessions.get(session_id)
if s is not None:
await s.deliver(name, payload)