Coverage for src / kemi / pipeline / ingestion.py: 96%
81 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-05 15:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-05 15:47 +0000
1"""Ingestion pipeline: take a fully-built ``MemoryObject`` and store it.
3The pipeline is the "ingest" half of the remember flow. It is purely
4about "what to do with a candidate memory": dedup, conflict detection,
5entity extraction, storage, webhook dispatch, and audit tracking.
7Composition (validation, sanitization, embedding, ``MemoryObject``
8construction, pre/post hooks) is the caller's responsibility — see
9:class:`kemi._memory_impl.Memory.remember`.
11The pipeline does not depend on the ``Memory`` class. It takes an
12:class:`IngestionContext` with the storage adapter, configuration,
13and the side-effect callables that the orchestrator wires up. This
14keeps the pipeline independently testable.
15"""
17from __future__ import annotations
19import logging
20from collections.abc import Callable
21from dataclasses import dataclass
22from typing import TYPE_CHECKING, Any
24from kemi import dedup
25from kemi.models import LifecycleState, MemoryConfig, MemoryObject
26from kemi.webhooks import WebhookEventType
28if TYPE_CHECKING:
29 from kemi.adapters.base import StorageAdapter
30 from kemi.entities import EntityLinker
31 from kemi.versions import MemoryVersionStore
33logger = logging.getLogger(__name__)
36def _memory_to_dict(memory: MemoryObject) -> dict[str, Any]:
37 """Convert a MemoryObject to a JSON-serialisable dict for webhook payloads."""
38 return {
39 "memory_id": memory.memory_id,
40 "content": memory.content,
41 "importance": memory.importance,
42 "confidence": memory.confidence,
43 "lifecycle_state": memory.lifecycle_state.value if memory.lifecycle_state else None,
44 "memory_type": memory.memory_type.value if memory.memory_type else None,
45 "source": memory.source.value if memory.source else None,
46 "tags": memory.tags,
47 "namespace": memory.namespace,
48 "session_id": memory.session_id,
49 "version": memory.version,
50 "created_at": memory.created_at.isoformat() if memory.created_at else None,
51 "last_accessed_at": memory.last_accessed_at.isoformat() if memory.last_accessed_at else None,
52 "metadata": memory.metadata,
53 "agent_id": memory.agent_id,
54 "run_id": memory.run_id,
55 "app_id": memory.app_id,
56 }
59@dataclass
60class IngestionContext:
61 """Dependencies required to ingest a single ``MemoryObject``.
63 The pipeline is "what to do with a candidate memory" — it needs
64 the storage adapter, configuration, and the side-effect callables
65 that the orchestrator wires up to ``kemi.operations._ops_*``.
66 No global state, no hidden coupling to ``Memory``.
67 """
69 store: "StorageAdapter"
70 config: MemoryConfig
71 entity_linker: "EntityLinker"
72 metrics: Any | None
74 # Side-effect callbacks. The ``Memory`` orchestrator wires these
75 # to the implementations in ``kemi.operations._ops_*``.
76 record_store_error: Callable[[], None] = lambda: None
77 dispatch_webhook: Callable[..., None] = lambda *args, **kwargs: None
78 track_operation: Callable[..., None] = lambda *args, **kwargs: None
79 get_version_store: Callable[[], "MemoryVersionStore"] = lambda: None # type: ignore[return-value]
80 auto_prune_versions: Callable[[str], None] = lambda memory_id: None
83class IngestionPipeline:
84 """Ingest a fully-built ``MemoryObject`` and return the stored result.
86 The pipeline mutates ``memory.metadata`` to attach
87 ``conflict_flagged`` and ``extracted_entities`` annotations, then
88 stores it. In the dedup case the input is merged into the canonical
89 existing memory and that canonical is returned instead.
91 The pipeline does NOT fire user hooks. The caller
92 (:meth:`kemi._memory_impl.Memory.remember`) fires the pre/post
93 hooks around the pipeline call to preserve the historical
94 contract: ``_remember_with_embedding`` (used by ``remember_many``)
95 fires no hooks, while the public ``remember`` fires both.
96 """
98 def __init__(self, ctx: IngestionContext) -> None:
99 self._ctx = ctx
101 def ingest(
102 self,
103 memory: MemoryObject,
104 *,
105 audit_batch: list[dict[str, Any]] | None = None,
106 ) -> MemoryObject:
107 """Ingest ``memory`` and return the stored ``MemoryObject``."""
108 existing = self._ctx.store.get_all_by_user(
109 memory.user_id,
110 lifecycle_filter=[
111 LifecycleState.ACTIVE,
112 LifecycleState.DECAYING,
113 LifecycleState.ARCHIVED,
114 ],
115 namespace=memory.namespace,
116 )
118 duplicates = dedup.find_duplicates(
119 memory, existing, self._ctx.config.dedup_threshold
120 )
121 if duplicates:
122 return self._handle_duplicate(memory, duplicates, audit_batch)
124 conflicts = dedup.find_conflicts(
125 memory,
126 existing,
127 self._ctx.config.conflict_threshold,
128 self._ctx.config.dedup_threshold,
129 )
130 conflict_detected = False
131 if conflicts:
132 memory.metadata["conflict_flagged"] = True
133 conflict_detected = True
134 logger.warning(
135 f"Potential conflict detected for user {memory.user_id}: "
136 f"new memory '{memory.content[:50]}...' conflicts with existing memory "
137 f"'{conflicts[0].content[:50]}...'"
138 )
139 if self._ctx.metrics is not None:
140 self._ctx.metrics.conflicts_detected.inc(1)
142 if self._ctx.config.enable_entity_boost:
143 memory.metadata["extracted_entities"] = list(
144 self._ctx.entity_linker.extract(memory.content)
145 )
147 try:
148 self._ctx.store.store(memory)
149 except Exception:
150 # Broad catch intentional: storage adapters can raise from
151 # many layers (SQLite, JSON, Postgres, encryption). Record
152 # the error in metrics and re-raise the original.
153 self._ctx.record_store_error()
154 raise
156 if self._ctx.metrics is not None:
157 self._ctx.metrics.embed_total.inc(1)
158 self._ctx.metrics.embed_bytes_total.inc(len(memory.content))
159 self._ctx.metrics.total_memories.set(
160 self._ctx.store.count(memory.user_id)
161 )
163 snapshot = _memory_to_dict(memory)
164 self._ctx.dispatch_webhook(
165 WebhookEventType.REMEMBERED,
166 memory_id=memory.memory_id,
167 user_id=memory.user_id,
168 snapshot=snapshot,
169 )
170 if conflict_detected:
171 self._ctx.dispatch_webhook(
172 WebhookEventType.CONFLICT,
173 memory_id=memory.memory_id,
174 user_id=memory.user_id,
175 snapshot=snapshot,
176 conflict_with=conflicts[0].memory_id,
177 )
179 details: dict[str, Any] = {
180 "memory_id": memory.memory_id,
181 "content_length": len(memory.content),
182 }
183 if conflict_detected:
184 details["conflict"] = True
185 details["conflict_with"] = conflicts[0].memory_id
186 self._ctx.track_operation(
187 "remember",
188 memory.user_id,
189 details,
190 memory.memory_id,
191 memory.namespace,
192 audit_batch=audit_batch,
193 )
194 return memory
196 def _handle_duplicate(
197 self,
198 memory: MemoryObject,
199 duplicates: list[MemoryObject],
200 audit_batch: list[dict[str, Any]] | None,
201 ) -> MemoryObject:
202 """Merge ``memory`` into the canonical existing one and return it."""
203 ctx = self._ctx
204 resolved = dedup.resolve_duplicate(memory, duplicates[0])
205 # Content changed during merge — invalidate stale cached entities.
206 resolved.metadata.pop("extracted_entities", None)
207 # Record version BEFORE overwriting.
208 try:
209 vs = ctx.get_version_store()
210 vs.record_version(duplicates[0], changed_by="merge")
211 ctx.auto_prune_versions(duplicates[0].memory_id)
212 except (RuntimeError, Exception):
213 pass
214 ctx.store.update(resolved)
215 # Remove the other near-duplicates so they don't re-trigger on
216 # the next insert. duplicates[0] is the canonical we merged into.
217 for extra in duplicates[1:]:
218 if extra.memory_id != resolved.memory_id:
219 ctx.store.delete_by_id(extra.memory_id)
220 snapshot = _memory_to_dict(resolved)
221 ctx.dispatch_webhook(
222 WebhookEventType.UPDATED,
223 memory_id=resolved.memory_id,
224 user_id=resolved.user_id,
225 snapshot=snapshot,
226 )
227 logger.info(
228 f"Resolved duplicate for user {resolved.user_id}: {resolved.memory_id}"
229 )
230 if ctx.metrics is not None:
231 ctx.metrics.duplicates_detected.inc(1)
232 ctx.track_operation(
233 "remember",
234 resolved.user_id,
235 {"memory_id": resolved.memory_id, "duplicate": True},
236 resolved.memory_id,
237 resolved.namespace,
238 audit_batch=audit_batch,
239 )
240 return resolved
243__all__ = ["IngestionContext", "IngestionPipeline"]