Coverage for src / kemi / consolidation.py: 96%
105 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-05 15:47 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-05 15:47 +0000
1"""Memory consolidation: summarize old episodic memories into semantic ones.
3Supports both local extractive summarization (no LLM required) and optional
4LLM-powered abstractive summarization via :class:`kemi.summarizer.LLMSummarizer`.
5"""
7import logging
8import uuid
9from datetime import datetime, timezone
10from typing import Any
12from kemi import lifecycle
13from kemi.models import LifecycleState, MemoryObject, MemorySource, MemoryType
14from kemi.scoring import cosine_similarity
16logger = logging.getLogger(__name__)
19def _get_summarizer(
20 with_llm_summary: bool,
21 summarizer_llm_provider: str | None = None,
22 summarizer_llm_model: str | None = None,
23 summarizer_prompt_template: str | None = None,
24) -> Any | None:
25 """Return an LLMSummarizer instance if LLM summarization is requested.
27 Returns None if *with_llm_summary* is False or the summarizer module is
28 not available. On import failure the error is logged and None is returned
29 so caller can fall back to extractive summarization gracefully.
30 """
31 if not with_llm_summary:
32 return None
33 try:
34 from kemi.summarizer import LLMSummarizer
36 return LLMSummarizer(
37 provider=summarizer_llm_provider or "openai",
38 model=summarizer_llm_model,
39 prompt_template=summarizer_prompt_template,
40 )
41 except Exception:
42 logger.warning(
43 "LLM summarizer not available, falling back to extractive summary",
44 exc_info=True,
45 )
46 return None
49def consolidate_cluster(
50 store: Any,
51 embed: Any,
52 user_id: str,
53 cluster: list[MemoryObject],
54 namespace: str = "default",
55 summarizer: Any | None = None,
56) -> MemoryObject | None:
57 """Consolidate a single cluster of related memories into a semantic summary.
59 Generates either an extractive or LLM-powered summary (if *summarizer* is
60 provided), embeds it, and returns a ``MemoryObject`` ready for storage.
62 Args:
63 store: StorageAdapter instance (used only for lifecycle transitions).
64 embed: EmbeddingAdapter instance.
65 user_id: User to consolidate.
66 cluster: List of related MemoryObjects to consolidate.
67 namespace: Memory namespace.
68 summarizer: Optional ``LLMSummarizer`` instance for abstractive summaries.
70 Returns:
71 A ``MemoryObject`` representing the consolidated summary, or None if
72 the cluster is empty.
73 """
74 if not cluster:
75 return None
77 # Always generate extractive summary as the canonical content
78 summary_text = _extractive_summary(cluster)
80 # If LLM summarizer is available, generate abstractive summary as metadata
81 metadata: dict[str, Any] = {
82 "consolidated_from": [m.memory_id for m in cluster],
83 "consolidated_count": len(cluster),
84 }
85 if summarizer is not None:
86 contents = [m.content for m in cluster]
87 llm_summary = summarizer.summarize(contents)
88 if llm_summary:
89 metadata["llm_summary"] = llm_summary
91 summary_embedding = embed.embed_single(summary_text)
93 summary_memory = MemoryObject(
94 memory_id=str(uuid.uuid4()),
95 user_id=user_id,
96 content=summary_text,
97 embedding=summary_embedding,
98 score=0.0,
99 created_at=datetime.now(timezone.utc),
100 last_accessed_at=datetime.now(timezone.utc),
101 source=MemorySource.SYSTEM_GENERATED,
102 importance=0.7,
103 lifecycle_state=LifecycleState.ACTIVE,
104 metadata=metadata,
105 embedding_dim=len(summary_embedding),
106 memory_type=MemoryType.SEMANTIC,
107 namespace=namespace,
108 )
110 # Archive old memories
111 for mem in cluster:
112 archived = lifecycle.transition(mem, LifecycleState.ARCHIVED)
113 store.update(archived)
115 return summary_memory
118def consolidate(
119 store: Any,
120 embed: Any,
121 user_id: str,
122 namespace: str = "default",
123 min_memories: int = 5,
124 max_age_days: float = 30.0,
125 with_llm_summary: bool = False,
126 summarizer_llm_provider: str | None = None,
127 summarizer_llm_model: str | None = None,
128 summarizer_prompt_template: str | None = None,
129) -> str | None:
130 """Consolidate old episodic memories into a semantic summary.
132 Algorithm:
133 1. Fetch old EPISODIC memories
134 2. Cluster them by semantic similarity
135 3. For each cluster, generate a summary (extractive or LLM-powered)
136 4. Store the summary as a SEMANTIC memory
137 5. Mark old memories as ARCHIVED
139 Args:
140 store: StorageAdapter instance.
141 embed: EmbeddingAdapter instance.
142 user_id: User to consolidate.
143 namespace: Memory namespace.
144 min_memories: Minimum memories needed to form a cluster.
145 max_age_days: Only consider memories older than this many days.
146 with_llm_summary: If True, use LLM-powered abstractive summarization
147 instead of extractive. Falls back to extractive on failure.
148 summarizer_llm_provider: LLM provider ("openai", "anthropic",
149 "ollama", "custom"). Default "openai".
150 summarizer_llm_model: Model name override.
151 summarizer_prompt_template: Custom prompt template with ``{memories}``.
153 Returns:
154 Memory ID of the consolidated summary, or None if consolidation did not occur.
155 """
156 all_memories = store.get_all_by_user(
157 user_id,
158 lifecycle_filter=[LifecycleState.ACTIVE, LifecycleState.DECAYING],
159 namespace=namespace,
160 )
162 now = datetime.now(timezone.utc)
163 cutoff = now.timestamp() - (max_age_days * 86400)
165 # Filter old episodic memories
166 old_episodic = [
167 m
168 for m in all_memories
169 if m.memory_type == MemoryType.EPISODIC
170 and m.created_at.timestamp() < cutoff
171 and m.embedding is not None
172 ]
174 if len(old_episodic) < min_memories:
175 logger.info(
176 f"Consolidation skipped for {user_id}: only {len(old_episodic)} "
177 f"old episodic memories (min={min_memories})"
178 )
179 return None
181 # Simple greedy clustering by similarity
182 clusters = _cluster_by_similarity(old_episodic, threshold=0.75)
184 best_cluster = max(clusters, key=len)
185 if len(best_cluster) < min_memories:
186 logger.info(
187 f"Consolidation skipped for {user_id}: best cluster has "
188 f"{len(best_cluster)} memories (min={min_memories})"
189 )
190 return None
192 # Initialize LLM summarizer if requested
193 summarizer = _get_summarizer(
194 with_llm_summary,
195 summarizer_llm_provider,
196 summarizer_llm_model,
197 summarizer_prompt_template,
198 )
200 # Consolidate the best cluster
201 summary_memory = consolidate_cluster(
202 store=store,
203 embed=embed,
204 user_id=user_id,
205 cluster=best_cluster,
206 namespace=namespace,
207 summarizer=summarizer,
208 )
210 if summary_memory is None:
211 logger.warning(f"Consolidation failed for {user_id}: cluster processing returned None")
212 return None
214 # Store the consolidated memory
215 store.store(summary_memory)
216 logger.info(
217 f"Consolidated {len(best_cluster)} memories for {user_id} "
218 f"into semantic memory {summary_memory.memory_id}"
219 + (" (LLM summary)" if summarizer else "")
220 )
222 return summary_memory.memory_id
225def _cluster_by_similarity(
226 memories: list[MemoryObject],
227 threshold: float = 0.75,
228) -> list[list[MemoryObject]]:
229 """Greedy clustering of memories by embedding similarity."""
230 clusters: list[list[MemoryObject]] = []
231 unassigned = list(memories)
233 while unassigned:
234 seed = unassigned.pop(0)
235 cluster = [seed]
236 to_remove: list[int] = []
238 for i, candidate in enumerate(unassigned):
239 if candidate.embedding is None:
240 continue
241 sim = cosine_similarity(seed.embedding, candidate.embedding)
242 normalized = (sim + 1.0) / 2.0
243 if normalized >= threshold:
244 cluster.append(candidate)
245 to_remove.append(i)
247 # Remove in reverse order to maintain indices
248 for i in reversed(to_remove):
249 unassigned.pop(i)
251 clusters.append(cluster)
253 return clusters
256def _extractive_summary(memories: list[MemoryObject]) -> str:
257 """Generate an extractive summary from a cluster of memories.
259 Uses a simple TextRank-like approach: score sentences by their
260 average similarity to all other sentences, then pick the top ones.
261 """
262 # Collect all sentences
263 sentences: list[str] = []
264 for mem in memories:
265 for sent in mem.content.split("."):
266 sent = sent.strip()
267 if len(sent) > 10:
268 sentences.append(sent)
270 if not sentences:
271 return " ".join(m.content for m in memories[:3])
273 if len(sentences) <= 3:
274 return ". ".join(sentences) + "."
276 # Simple TF-IDF-like scoring: pick sentences with most overlap in keywords
277 word_counts: dict[str, int] = {}
278 for sent in sentences:
279 for word in sent.lower().split():
280 clean = word.strip(".,!?;:'\"()-")
281 if len(clean) > 2:
282 word_counts[clean] = word_counts.get(clean, 0) + 1
284 # Score each sentence by sum of word frequencies
285 sentence_scores: list[tuple[str, float]] = []
286 for sent in sentences:
287 score = 0.0
288 words = sent.lower().split()
289 for word in words:
290 clean = word.strip(".,!?;:'\"()-")
291 if len(clean) > 2:
292 score += word_counts.get(clean, 0)
293 sentence_scores.append((sent, score / max(len(words), 1)))
295 # Sort by score and pick top sentences (up to 3)
296 sentence_scores.sort(key=lambda x: x[1], reverse=True)
297 top_sentences = [s[0] for s in sentence_scores[:3]]
298 top_set = set(top_sentences)
300 # Preserve original order; set lookup is O(1) instead of O(n) per element
301 ordered = [s for s in sentences if s in top_set]
303 summary = ". ".join(ordered) + "."
304 # Cap summary length to avoid blowing up embedding or storage with
305 # a giant consolidated memory.
306 return summary[:1024]