jeevesagent.memory.worker

Background consolidation worker.

A long-running anyio task that periodically calls memory.consolidate(). Useful for very long-lived agents where per-run consolidation (the auto_consolidate=True flag on Agent) is wasteful — you’d rather batch every N seconds.

Usage:

worker = ConsolidationWorker(memory, interval_seconds=60)

async with anyio.create_task_group() as tg:
    tg.start_soon(worker.run_forever)

    # main agent work here…
    await main()

# On task-group exit the worker is cancelled cleanly; any
# in-flight consolidate call gets cooperatively interrupted at the
# next ``await``.

Errors raised by the underlying memory.consolidate() call are caught and routed to the optional on_error callback so a transient LLM hiccup doesn’t kill the worker. New facts trigger on_consolidated(count) when set; both callbacks are awaitable.

Attributes

Classes

ConsolidationWorker

Periodic consolidator for any Memory backend.

Module Contents

class jeevesagent.memory.worker.ConsolidationWorker(memory: jeevesagent.core.protocols.Memory, *, interval_seconds: float = 60.0, on_consolidated: OnConsolidatedCb | None = None, on_error: OnErrorCb | None = None)[source]

Periodic consolidator for any Memory backend.

async run_forever() None[source]

Sleep interval_seconds then consolidate. Repeat until cancelled.

Spawn this in an anyio.create_task_group() — the cancel scope at scope exit terminates the worker cooperatively.

async run_once() int[source]

Run a single consolidation pass. Returns the number of new facts extracted (0 when no fact store / nothing changed).

Errors in memory.consolidate() are routed to on_error and not re-raised, so callers can use this in a polling loop without wrapping it in their own try/except.

property iterations: int

Number of consolidate cycles attempted (test introspection).

property total_extracted: int

Cumulative count of facts extracted across all cycles.

jeevesagent.memory.worker.OnConsolidatedCb
jeevesagent.memory.worker.OnErrorCb