Coverage for src / kemi / consolidation.py: 96%

105 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-05 15:47 +0000

1"""Memory consolidation: summarize old episodic memories into semantic ones. 

2 

3Supports both local extractive summarization (no LLM required) and optional 

4LLM-powered abstractive summarization via :class:`kemi.summarizer.LLMSummarizer`. 

5""" 

6 

7import logging 

8import uuid 

9from datetime import datetime, timezone 

10from typing import Any 

11 

12from kemi import lifecycle 

13from kemi.models import LifecycleState, MemoryObject, MemorySource, MemoryType 

14from kemi.scoring import cosine_similarity 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19def _get_summarizer( 

20 with_llm_summary: bool, 

21 summarizer_llm_provider: str | None = None, 

22 summarizer_llm_model: str | None = None, 

23 summarizer_prompt_template: str | None = None, 

24) -> Any | None: 

25 """Return an LLMSummarizer instance if LLM summarization is requested. 

26 

27 Returns None if *with_llm_summary* is False or the summarizer module is 

28 not available. On import failure the error is logged and None is returned 

29 so caller can fall back to extractive summarization gracefully. 

30 """ 

31 if not with_llm_summary: 

32 return None 

33 try: 

34 from kemi.summarizer import LLMSummarizer 

35 

36 return LLMSummarizer( 

37 provider=summarizer_llm_provider or "openai", 

38 model=summarizer_llm_model, 

39 prompt_template=summarizer_prompt_template, 

40 ) 

41 except Exception: 

42 logger.warning( 

43 "LLM summarizer not available, falling back to extractive summary", 

44 exc_info=True, 

45 ) 

46 return None 

47 

48 

49def consolidate_cluster( 

50 store: Any, 

51 embed: Any, 

52 user_id: str, 

53 cluster: list[MemoryObject], 

54 namespace: str = "default", 

55 summarizer: Any | None = None, 

56) -> MemoryObject | None: 

57 """Consolidate a single cluster of related memories into a semantic summary. 

58 

59 Generates either an extractive or LLM-powered summary (if *summarizer* is 

60 provided), embeds it, and returns a ``MemoryObject`` ready for storage. 

61 

62 Args: 

63 store: StorageAdapter instance (used only for lifecycle transitions). 

64 embed: EmbeddingAdapter instance. 

65 user_id: User to consolidate. 

66 cluster: List of related MemoryObjects to consolidate. 

67 namespace: Memory namespace. 

68 summarizer: Optional ``LLMSummarizer`` instance for abstractive summaries. 

69 

70 Returns: 

71 A ``MemoryObject`` representing the consolidated summary, or None if 

72 the cluster is empty. 

73 """ 

74 if not cluster: 

75 return None 

76 

77 # Always generate extractive summary as the canonical content 

78 summary_text = _extractive_summary(cluster) 

79 

80 # If LLM summarizer is available, generate abstractive summary as metadata 

81 metadata: dict[str, Any] = { 

82 "consolidated_from": [m.memory_id for m in cluster], 

83 "consolidated_count": len(cluster), 

84 } 

85 if summarizer is not None: 

86 contents = [m.content for m in cluster] 

87 llm_summary = summarizer.summarize(contents) 

88 if llm_summary: 

89 metadata["llm_summary"] = llm_summary 

90 

91 summary_embedding = embed.embed_single(summary_text) 

92 

93 summary_memory = MemoryObject( 

94 memory_id=str(uuid.uuid4()), 

95 user_id=user_id, 

96 content=summary_text, 

97 embedding=summary_embedding, 

98 score=0.0, 

99 created_at=datetime.now(timezone.utc), 

100 last_accessed_at=datetime.now(timezone.utc), 

101 source=MemorySource.SYSTEM_GENERATED, 

102 importance=0.7, 

103 lifecycle_state=LifecycleState.ACTIVE, 

104 metadata=metadata, 

105 embedding_dim=len(summary_embedding), 

106 memory_type=MemoryType.SEMANTIC, 

107 namespace=namespace, 

108 ) 

109 

110 # Archive old memories 

111 for mem in cluster: 

112 archived = lifecycle.transition(mem, LifecycleState.ARCHIVED) 

113 store.update(archived) 

114 

115 return summary_memory 

116 

117 

118def consolidate( 

119 store: Any, 

120 embed: Any, 

121 user_id: str, 

122 namespace: str = "default", 

123 min_memories: int = 5, 

124 max_age_days: float = 30.0, 

125 with_llm_summary: bool = False, 

126 summarizer_llm_provider: str | None = None, 

127 summarizer_llm_model: str | None = None, 

128 summarizer_prompt_template: str | None = None, 

129) -> str | None: 

130 """Consolidate old episodic memories into a semantic summary. 

131 

132 Algorithm: 

133 1. Fetch old EPISODIC memories 

134 2. Cluster them by semantic similarity 

135 3. For each cluster, generate a summary (extractive or LLM-powered) 

136 4. Store the summary as a SEMANTIC memory 

137 5. Mark old memories as ARCHIVED 

138 

139 Args: 

140 store: StorageAdapter instance. 

141 embed: EmbeddingAdapter instance. 

142 user_id: User to consolidate. 

143 namespace: Memory namespace. 

144 min_memories: Minimum memories needed to form a cluster. 

145 max_age_days: Only consider memories older than this many days. 

146 with_llm_summary: If True, use LLM-powered abstractive summarization 

147 instead of extractive. Falls back to extractive on failure. 

148 summarizer_llm_provider: LLM provider ("openai", "anthropic", 

149 "ollama", "custom"). Default "openai". 

150 summarizer_llm_model: Model name override. 

151 summarizer_prompt_template: Custom prompt template with ``{memories}``. 

152 

153 Returns: 

154 Memory ID of the consolidated summary, or None if consolidation did not occur. 

155 """ 

156 all_memories = store.get_all_by_user( 

157 user_id, 

158 lifecycle_filter=[LifecycleState.ACTIVE, LifecycleState.DECAYING], 

159 namespace=namespace, 

160 ) 

161 

162 now = datetime.now(timezone.utc) 

163 cutoff = now.timestamp() - (max_age_days * 86400) 

164 

165 # Filter old episodic memories 

166 old_episodic = [ 

167 m 

168 for m in all_memories 

169 if m.memory_type == MemoryType.EPISODIC 

170 and m.created_at.timestamp() < cutoff 

171 and m.embedding is not None 

172 ] 

173 

174 if len(old_episodic) < min_memories: 

175 logger.info( 

176 f"Consolidation skipped for {user_id}: only {len(old_episodic)} " 

177 f"old episodic memories (min={min_memories})" 

178 ) 

179 return None 

180 

181 # Simple greedy clustering by similarity 

182 clusters = _cluster_by_similarity(old_episodic, threshold=0.75) 

183 

184 best_cluster = max(clusters, key=len) 

185 if len(best_cluster) < min_memories: 

186 logger.info( 

187 f"Consolidation skipped for {user_id}: best cluster has " 

188 f"{len(best_cluster)} memories (min={min_memories})" 

189 ) 

190 return None 

191 

192 # Initialize LLM summarizer if requested 

193 summarizer = _get_summarizer( 

194 with_llm_summary, 

195 summarizer_llm_provider, 

196 summarizer_llm_model, 

197 summarizer_prompt_template, 

198 ) 

199 

200 # Consolidate the best cluster 

201 summary_memory = consolidate_cluster( 

202 store=store, 

203 embed=embed, 

204 user_id=user_id, 

205 cluster=best_cluster, 

206 namespace=namespace, 

207 summarizer=summarizer, 

208 ) 

209 

210 if summary_memory is None: 

211 logger.warning(f"Consolidation failed for {user_id}: cluster processing returned None") 

212 return None 

213 

214 # Store the consolidated memory 

215 store.store(summary_memory) 

216 logger.info( 

217 f"Consolidated {len(best_cluster)} memories for {user_id} " 

218 f"into semantic memory {summary_memory.memory_id}" 

219 + (" (LLM summary)" if summarizer else "") 

220 ) 

221 

222 return summary_memory.memory_id 

223 

224 

225def _cluster_by_similarity( 

226 memories: list[MemoryObject], 

227 threshold: float = 0.75, 

228) -> list[list[MemoryObject]]: 

229 """Greedy clustering of memories by embedding similarity.""" 

230 clusters: list[list[MemoryObject]] = [] 

231 unassigned = list(memories) 

232 

233 while unassigned: 

234 seed = unassigned.pop(0) 

235 cluster = [seed] 

236 to_remove: list[int] = [] 

237 

238 for i, candidate in enumerate(unassigned): 

239 if candidate.embedding is None: 

240 continue 

241 sim = cosine_similarity(seed.embedding, candidate.embedding) 

242 normalized = (sim + 1.0) / 2.0 

243 if normalized >= threshold: 

244 cluster.append(candidate) 

245 to_remove.append(i) 

246 

247 # Remove in reverse order to maintain indices 

248 for i in reversed(to_remove): 

249 unassigned.pop(i) 

250 

251 clusters.append(cluster) 

252 

253 return clusters 

254 

255 

256def _extractive_summary(memories: list[MemoryObject]) -> str: 

257 """Generate an extractive summary from a cluster of memories. 

258 

259 Uses a simple TextRank-like approach: score sentences by their 

260 average similarity to all other sentences, then pick the top ones. 

261 """ 

262 # Collect all sentences 

263 sentences: list[str] = [] 

264 for mem in memories: 

265 for sent in mem.content.split("."): 

266 sent = sent.strip() 

267 if len(sent) > 10: 

268 sentences.append(sent) 

269 

270 if not sentences: 

271 return " ".join(m.content for m in memories[:3]) 

272 

273 if len(sentences) <= 3: 

274 return ". ".join(sentences) + "." 

275 

276 # Simple TF-IDF-like scoring: pick sentences with most overlap in keywords 

277 word_counts: dict[str, int] = {} 

278 for sent in sentences: 

279 for word in sent.lower().split(): 

280 clean = word.strip(".,!?;:'\"()-") 

281 if len(clean) > 2: 

282 word_counts[clean] = word_counts.get(clean, 0) + 1 

283 

284 # Score each sentence by sum of word frequencies 

285 sentence_scores: list[tuple[str, float]] = [] 

286 for sent in sentences: 

287 score = 0.0 

288 words = sent.lower().split() 

289 for word in words: 

290 clean = word.strip(".,!?;:'\"()-") 

291 if len(clean) > 2: 

292 score += word_counts.get(clean, 0) 

293 sentence_scores.append((sent, score / max(len(words), 1))) 

294 

295 # Sort by score and pick top sentences (up to 3) 

296 sentence_scores.sort(key=lambda x: x[1], reverse=True) 

297 top_sentences = [s[0] for s in sentence_scores[:3]] 

298 top_set = set(top_sentences) 

299 

300 # Preserve original order; set lookup is O(1) instead of O(n) per element 

301 ordered = [s for s in sentences if s in top_set] 

302 

303 summary = ". ".join(ordered) + "." 

304 # Cap summary length to avoid blowing up embedding or storage with 

305 # a giant consolidated memory. 

306 return summary[:1024]