Coverage for src / kemi / pipeline / ingestion.py: 96%

81 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-05 15:47 +0000

1"""Ingestion pipeline: take a fully-built ``MemoryObject`` and store it. 

2 

3The pipeline is the "ingest" half of the remember flow. It is purely 

4about "what to do with a candidate memory": dedup, conflict detection, 

5entity extraction, storage, webhook dispatch, and audit tracking. 

6 

7Composition (validation, sanitization, embedding, ``MemoryObject`` 

8construction, pre/post hooks) is the caller's responsibility — see 

9:class:`kemi._memory_impl.Memory.remember`. 

10 

11The pipeline does not depend on the ``Memory`` class. It takes an 

12:class:`IngestionContext` with the storage adapter, configuration, 

13and the side-effect callables that the orchestrator wires up. This 

14keeps the pipeline independently testable. 

15""" 

16 

17from __future__ import annotations 

18 

19import logging 

20from collections.abc import Callable 

21from dataclasses import dataclass 

22from typing import TYPE_CHECKING, Any 

23 

24from kemi import dedup 

25from kemi.models import LifecycleState, MemoryConfig, MemoryObject 

26from kemi.webhooks import WebhookEventType 

27 

28if TYPE_CHECKING: 

29 from kemi.adapters.base import StorageAdapter 

30 from kemi.entities import EntityLinker 

31 from kemi.versions import MemoryVersionStore 

32 

33logger = logging.getLogger(__name__) 

34 

35 

36def _memory_to_dict(memory: MemoryObject) -> dict[str, Any]: 

37 """Convert a MemoryObject to a JSON-serialisable dict for webhook payloads.""" 

38 return { 

39 "memory_id": memory.memory_id, 

40 "content": memory.content, 

41 "importance": memory.importance, 

42 "confidence": memory.confidence, 

43 "lifecycle_state": memory.lifecycle_state.value if memory.lifecycle_state else None, 

44 "memory_type": memory.memory_type.value if memory.memory_type else None, 

45 "source": memory.source.value if memory.source else None, 

46 "tags": memory.tags, 

47 "namespace": memory.namespace, 

48 "session_id": memory.session_id, 

49 "version": memory.version, 

50 "created_at": memory.created_at.isoformat() if memory.created_at else None, 

51 "last_accessed_at": memory.last_accessed_at.isoformat() if memory.last_accessed_at else None, 

52 "metadata": memory.metadata, 

53 "agent_id": memory.agent_id, 

54 "run_id": memory.run_id, 

55 "app_id": memory.app_id, 

56 } 

57 

58 

59@dataclass 

60class IngestionContext: 

61 """Dependencies required to ingest a single ``MemoryObject``. 

62 

63 The pipeline is "what to do with a candidate memory" — it needs 

64 the storage adapter, configuration, and the side-effect callables 

65 that the orchestrator wires up to ``kemi.operations._ops_*``. 

66 No global state, no hidden coupling to ``Memory``. 

67 """ 

68 

69 store: "StorageAdapter" 

70 config: MemoryConfig 

71 entity_linker: "EntityLinker" 

72 metrics: Any | None 

73 

74 # Side-effect callbacks. The ``Memory`` orchestrator wires these 

75 # to the implementations in ``kemi.operations._ops_*``. 

76 record_store_error: Callable[[], None] = lambda: None 

77 dispatch_webhook: Callable[..., None] = lambda *args, **kwargs: None 

78 track_operation: Callable[..., None] = lambda *args, **kwargs: None 

79 get_version_store: Callable[[], "MemoryVersionStore"] = lambda: None # type: ignore[return-value] 

80 auto_prune_versions: Callable[[str], None] = lambda memory_id: None 

81 

82 

83class IngestionPipeline: 

84 """Ingest a fully-built ``MemoryObject`` and return the stored result. 

85 

86 The pipeline mutates ``memory.metadata`` to attach 

87 ``conflict_flagged`` and ``extracted_entities`` annotations, then 

88 stores it. In the dedup case the input is merged into the canonical 

89 existing memory and that canonical is returned instead. 

90 

91 The pipeline does NOT fire user hooks. The caller 

92 (:meth:`kemi._memory_impl.Memory.remember`) fires the pre/post 

93 hooks around the pipeline call to preserve the historical 

94 contract: ``_remember_with_embedding`` (used by ``remember_many``) 

95 fires no hooks, while the public ``remember`` fires both. 

96 """ 

97 

98 def __init__(self, ctx: IngestionContext) -> None: 

99 self._ctx = ctx 

100 

101 def ingest( 

102 self, 

103 memory: MemoryObject, 

104 *, 

105 audit_batch: list[dict[str, Any]] | None = None, 

106 ) -> MemoryObject: 

107 """Ingest ``memory`` and return the stored ``MemoryObject``.""" 

108 existing = self._ctx.store.get_all_by_user( 

109 memory.user_id, 

110 lifecycle_filter=[ 

111 LifecycleState.ACTIVE, 

112 LifecycleState.DECAYING, 

113 LifecycleState.ARCHIVED, 

114 ], 

115 namespace=memory.namespace, 

116 ) 

117 

118 duplicates = dedup.find_duplicates( 

119 memory, existing, self._ctx.config.dedup_threshold 

120 ) 

121 if duplicates: 

122 return self._handle_duplicate(memory, duplicates, audit_batch) 

123 

124 conflicts = dedup.find_conflicts( 

125 memory, 

126 existing, 

127 self._ctx.config.conflict_threshold, 

128 self._ctx.config.dedup_threshold, 

129 ) 

130 conflict_detected = False 

131 if conflicts: 

132 memory.metadata["conflict_flagged"] = True 

133 conflict_detected = True 

134 logger.warning( 

135 f"Potential conflict detected for user {memory.user_id}: " 

136 f"new memory '{memory.content[:50]}...' conflicts with existing memory " 

137 f"'{conflicts[0].content[:50]}...'" 

138 ) 

139 if self._ctx.metrics is not None: 

140 self._ctx.metrics.conflicts_detected.inc(1) 

141 

142 if self._ctx.config.enable_entity_boost: 

143 memory.metadata["extracted_entities"] = list( 

144 self._ctx.entity_linker.extract(memory.content) 

145 ) 

146 

147 try: 

148 self._ctx.store.store(memory) 

149 except Exception: 

150 # Broad catch intentional: storage adapters can raise from 

151 # many layers (SQLite, JSON, Postgres, encryption). Record 

152 # the error in metrics and re-raise the original. 

153 self._ctx.record_store_error() 

154 raise 

155 

156 if self._ctx.metrics is not None: 

157 self._ctx.metrics.embed_total.inc(1) 

158 self._ctx.metrics.embed_bytes_total.inc(len(memory.content)) 

159 self._ctx.metrics.total_memories.set( 

160 self._ctx.store.count(memory.user_id) 

161 ) 

162 

163 snapshot = _memory_to_dict(memory) 

164 self._ctx.dispatch_webhook( 

165 WebhookEventType.REMEMBERED, 

166 memory_id=memory.memory_id, 

167 user_id=memory.user_id, 

168 snapshot=snapshot, 

169 ) 

170 if conflict_detected: 

171 self._ctx.dispatch_webhook( 

172 WebhookEventType.CONFLICT, 

173 memory_id=memory.memory_id, 

174 user_id=memory.user_id, 

175 snapshot=snapshot, 

176 conflict_with=conflicts[0].memory_id, 

177 ) 

178 

179 details: dict[str, Any] = { 

180 "memory_id": memory.memory_id, 

181 "content_length": len(memory.content), 

182 } 

183 if conflict_detected: 

184 details["conflict"] = True 

185 details["conflict_with"] = conflicts[0].memory_id 

186 self._ctx.track_operation( 

187 "remember", 

188 memory.user_id, 

189 details, 

190 memory.memory_id, 

191 memory.namespace, 

192 audit_batch=audit_batch, 

193 ) 

194 return memory 

195 

196 def _handle_duplicate( 

197 self, 

198 memory: MemoryObject, 

199 duplicates: list[MemoryObject], 

200 audit_batch: list[dict[str, Any]] | None, 

201 ) -> MemoryObject: 

202 """Merge ``memory`` into the canonical existing one and return it.""" 

203 ctx = self._ctx 

204 resolved = dedup.resolve_duplicate(memory, duplicates[0]) 

205 # Content changed during merge — invalidate stale cached entities. 

206 resolved.metadata.pop("extracted_entities", None) 

207 # Record version BEFORE overwriting. 

208 try: 

209 vs = ctx.get_version_store() 

210 vs.record_version(duplicates[0], changed_by="merge") 

211 ctx.auto_prune_versions(duplicates[0].memory_id) 

212 except (RuntimeError, Exception): 

213 pass 

214 ctx.store.update(resolved) 

215 # Remove the other near-duplicates so they don't re-trigger on 

216 # the next insert. duplicates[0] is the canonical we merged into. 

217 for extra in duplicates[1:]: 

218 if extra.memory_id != resolved.memory_id: 

219 ctx.store.delete_by_id(extra.memory_id) 

220 snapshot = _memory_to_dict(resolved) 

221 ctx.dispatch_webhook( 

222 WebhookEventType.UPDATED, 

223 memory_id=resolved.memory_id, 

224 user_id=resolved.user_id, 

225 snapshot=snapshot, 

226 ) 

227 logger.info( 

228 f"Resolved duplicate for user {resolved.user_id}: {resolved.memory_id}" 

229 ) 

230 if ctx.metrics is not None: 

231 ctx.metrics.duplicates_detected.inc(1) 

232 ctx.track_operation( 

233 "remember", 

234 resolved.user_id, 

235 {"memory_id": resolved.memory_id, "duplicate": True}, 

236 resolved.memory_id, 

237 resolved.namespace, 

238 audit_batch=audit_batch, 

239 ) 

240 return resolved 

241 

242 

243__all__ = ["IngestionContext", "IngestionPipeline"]