jeevesagent.memory.worker¶
Background consolidation worker.
A long-running anyio task that periodically calls
memory.consolidate(). Useful for very long-lived agents where
per-run consolidation (the auto_consolidate=True flag on
Agent) is wasteful — you’d rather batch every N seconds.
Usage:
worker = ConsolidationWorker(memory, interval_seconds=60)
async with anyio.create_task_group() as tg:
tg.start_soon(worker.run_forever)
# main agent work here…
await main()
# On task-group exit the worker is cancelled cleanly; any
# in-flight consolidate call gets cooperatively interrupted at the
# next ``await``.
Errors raised by the underlying memory.consolidate() call are
caught and routed to the optional on_error callback so a transient
LLM hiccup doesn’t kill the worker. New facts trigger
on_consolidated(count) when set; both callbacks are awaitable.
Attributes¶
Classes¶
Periodic consolidator for any |
Module Contents¶
- class jeevesagent.memory.worker.ConsolidationWorker(memory: jeevesagent.core.protocols.Memory, *, interval_seconds: float = 60.0, on_consolidated: OnConsolidatedCb | None = None, on_error: OnErrorCb | None = None)[source]¶
Periodic consolidator for any
Memorybackend.- async run_forever() None[source]¶
Sleep
interval_secondsthen consolidate. Repeat until cancelled.Spawn this in an
anyio.create_task_group()— the cancel scope at scope exit terminates the worker cooperatively.
- async run_once() int[source]¶
Run a single consolidation pass. Returns the number of new facts extracted (
0when no fact store / nothing changed).Errors in
memory.consolidate()are routed toon_errorand not re-raised, so callers can use this in a polling loop without wrapping it in their own try/except.
- jeevesagent.memory.worker.OnConsolidatedCb¶
- jeevesagent.memory.worker.OnErrorCb¶