Source code for jeevesagent.memory.worker

"""Background consolidation worker.

A long-running anyio task that periodically calls
``memory.consolidate()``. Useful for very long-lived agents where
per-run consolidation (the ``auto_consolidate=True`` flag on
:class:`Agent`) is wasteful — you'd rather batch every N seconds.

Usage::

    worker = ConsolidationWorker(memory, interval_seconds=60)

    async with anyio.create_task_group() as tg:
        tg.start_soon(worker.run_forever)

        # main agent work here…
        await main()

    # On task-group exit the worker is cancelled cleanly; any
    # in-flight consolidate call gets cooperatively interrupted at the
    # next ``await``.

Errors raised by the underlying ``memory.consolidate()`` call are
caught and routed to the optional ``on_error`` callback so a transient
LLM hiccup doesn't kill the worker. New facts trigger
``on_consolidated(count)`` when set; both callbacks are awaitable.
"""

from __future__ import annotations

from collections.abc import Awaitable, Callable
from typing import Any

import anyio

from ..core.protocols import Memory

OnConsolidatedCb = Callable[[int], Awaitable[None]]
OnErrorCb = Callable[[BaseException], Awaitable[None]]


[docs] class ConsolidationWorker: """Periodic consolidator for any :class:`Memory` backend.""" def __init__( self, memory: Memory, *, interval_seconds: float = 60.0, on_consolidated: OnConsolidatedCb | None = None, on_error: OnErrorCb | None = None, ) -> None: if interval_seconds <= 0: raise ValueError("interval_seconds must be positive") self._memory = memory self._interval = interval_seconds self._on_consolidated = on_consolidated self._on_error = on_error self._iterations = 0 self._total_extracted = 0 @property def iterations(self) -> int: """Number of consolidate cycles attempted (test introspection).""" return self._iterations @property def total_extracted(self) -> int: """Cumulative count of facts extracted across all cycles.""" return self._total_extracted # ---- one-shot API (handy for tests + manual invocation) ------------
[docs] async def run_once(self) -> int: """Run a single consolidation pass. Returns the number of new facts extracted (``0`` when no fact store / nothing changed). Errors in ``memory.consolidate()`` are routed to ``on_error`` and **not** re-raised, so callers can use this in a polling loop without wrapping it in their own try/except. """ self._iterations += 1 fact_store = getattr(self._memory, "facts", None) before = 0 if fact_store is not None: before = len(await fact_store.all_facts()) try: await self._memory.consolidate() except BaseException as exc: # noqa: BLE001 — surface via callback if self._on_error is not None: await self._on_error(exc) return 0 if fact_store is None: return 0 after = len(await fact_store.all_facts()) count = max(0, after - before) self._total_extracted += count if count > 0 and self._on_consolidated is not None: await self._on_consolidated(count) return count
# ---- run-forever loop ---------------------------------------------
[docs] async def run_forever(self) -> None: """Sleep ``interval_seconds`` then consolidate. Repeat until cancelled. Spawn this in an :func:`anyio.create_task_group` — the cancel scope at scope exit terminates the worker cooperatively. """ while True: await anyio.sleep(self._interval) await self.run_once()
# ---- async-context-manager sugar ----------------------------------- async def __aenter__(self) -> ConsolidationWorker: # Lazily attach a task-group on first use so the worker can be # used standalone (without callers managing a task group). The # contract: ``async with worker: ...`` runs ``run_forever`` # in the background; exiting the block cancels it. self._tg = await anyio.create_task_group().__aenter__() self._tg.start_soon(self.run_forever) return self async def __aexit__(self, *exc_info: Any) -> None: tg = getattr(self, "_tg", None) if tg is None: return tg.cancel_scope.cancel() await tg.__aexit__(*exc_info)