Coverage for session_buddy / memory / conscious_agent.py: 73.94%
160 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1"""
2Conscious Agent - Background memory optimization inspired by Memori.
4Analyzes conversation patterns to promote frequently-accessed memories
5from long-term to short-term storage for faster retrieval.
6"""
8import asyncio
9import contextlib
10import logging
11from dataclasses import dataclass
12from datetime import datetime, timedelta
13from typing import Any
15logger = logging.getLogger(__name__)
18@dataclass
19class MemoryAccessPattern:
20 """Tracks memory access frequency and recency."""
22 memory_id: str
23 access_count: int
24 last_accessed: datetime
25 access_velocity: float # accesses per hour
26 semantic_importance: float # 0.0-1.0
27 category: str # facts, preferences, skills, rules, context
30@dataclass
31class PromotionCandidate:
32 """Memory candidate for promotion to short-term storage."""
34 memory_id: str
35 priority_score: float
36 reason: str
37 current_tier: str # long_term, short_term, working
40class ConsciousAgent:
41 """
42 Background agent that analyzes memory patterns and optimizes storage.
44 Inspired by Memori's Conscious Agent pattern but adapted for
45 session-mgmt-mcp's development workflow context.
46 """
48 def __init__(
49 self,
50 reflection_db: Any,
51 analysis_interval_hours: int = 6,
52 promotion_threshold: float = 0.75,
53 ):
54 """
55 Initialize the Conscious Agent.
57 Args:
58 reflection_db: ReflectionDatabase instance
59 analysis_interval_hours: How often to run analysis (default: 6 hours)
60 promotion_threshold: Minimum score for promotion (0.0-1.0)
62 """
63 self.reflection_db = reflection_db
64 self.analysis_interval = timedelta(hours=analysis_interval_hours)
65 self.promotion_threshold = promotion_threshold
66 self._running = False
67 self._task: asyncio.Task[None] | None = None
69 async def start(self) -> None:
70 """Start the background conscious agent."""
71 if self._running:
72 logger.warning("Conscious agent already running")
73 return
75 self._running = True
76 self._task = asyncio.create_task(self._run_loop())
77 logger.info(
78 f"Conscious agent started (interval: {self.analysis_interval.total_seconds() / 3600:.1f}h)"
79 )
81 async def stop(self) -> None:
82 """Stop the background conscious agent."""
83 if not self._running:
84 return
86 self._running = False
87 if self._task:
88 self._task.cancel()
89 with contextlib.suppress(asyncio.CancelledError):
90 await self._task
91 logger.info("Conscious agent stopped")
93 async def _run_loop(self) -> None:
94 """Main background loop for memory analysis."""
95 while self._running:
96 try:
97 await self._analyze_and_optimize()
98 await asyncio.sleep(self.analysis_interval.total_seconds())
99 except asyncio.CancelledError:
100 break
101 except Exception as e:
102 logger.exception(f"Conscious agent error: {e}")
103 # Continue running despite errors
104 await asyncio.sleep(300) # Wait 5 minutes before retry
106 async def _analyze_and_optimize(self) -> dict[str, Any]:
107 """
108 Analyze memory patterns and optimize storage.
110 Returns:
111 dict: Analysis results with promotion statistics
113 """
114 logger.info("Running conscious agent memory analysis...")
116 # 1. Analyze access patterns
117 patterns = await self._analyze_access_patterns()
119 # 2. Calculate priority scores
120 candidates = await self._calculate_promotion_priorities(patterns)
122 # 3. Promote high-priority memories
123 promoted = await self._promote_memories(candidates)
125 # 4. Demote stale memories
126 demoted = await self._demote_stale_memories()
128 results = {
129 "timestamp": datetime.now().isoformat(),
130 "patterns_analyzed": len(patterns),
131 "promotion_candidates": len(candidates),
132 "promoted_count": len(promoted),
133 "demoted_count": len(demoted),
134 "promoted_ids": promoted,
135 "demoted_ids": demoted,
136 }
138 logger.info(
139 f"Conscious agent analysis complete: "
140 f"{results['promoted_count']} promoted, "
141 f"{results['demoted_count']} demoted"
142 )
144 return results
146 async def _analyze_access_patterns(self) -> list[MemoryAccessPattern]:
147 """
148 Analyze memory access patterns from database.
150 Returns:
151 list[MemoryAccessPattern]: Access patterns for all memories
153 """
154 # Query DuckDB for access patterns in v2 tables
155 import duckdb # Local import to avoid hard dep when unused
156 from session_buddy.settings import get_database_path
158 patterns: list[MemoryAccessPattern] = []
159 try:
160 conn = duckdb.connect(
161 get_database_path(),
162 config={"allow_unsigned_extensions": True},
163 )
164 except Exception:
165 return patterns
167 try:
168 rows = conn.execute(
169 """
170 WITH base AS (
171 SELECT
172 l.memory_id,
173 COUNT(*) AS access_count,
174 MIN(l.timestamp) AS first_access,
175 MAX(l.timestamp) AS last_accessed
176 FROM memory_access_log l
177 GROUP BY l.memory_id
178 )
179 SELECT
180 b.memory_id,
181 b.access_count,
182 b.first_access,
183 b.last_accessed,
184 c.category,
185 COALESCE(c.importance_score, 0.5) AS importance
186 FROM base b
187 JOIN conversations_v2 c ON c.id = b.memory_id
188 """
189 ).fetchall()
191 now = datetime.now()
192 for r in rows:
193 memory_id = str(r[0])
194 access_count = int(r[1])
195 first_access = r[2]
196 last_accessed = r[3]
197 category = str(r[4])
198 importance = float(r[5])
200 try:
201 # Compute accesses per hour since first access
202 hours = max((now - first_access).total_seconds() / 3600.0, 1e-6)
203 velocity = access_count / hours
204 except Exception:
205 velocity = float(access_count)
207 # Coerce last_accessed to datetime if needed
208 if not isinstance(last_accessed, datetime): 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true
209 try:
210 last_accessed = datetime.fromisoformat(str(last_accessed))
211 except Exception:
212 last_accessed = now
214 patterns.append(
215 MemoryAccessPattern(
216 memory_id=memory_id,
217 access_count=access_count,
218 last_accessed=last_accessed,
219 access_velocity=velocity,
220 semantic_importance=importance,
221 category=category,
222 )
223 )
224 except Exception:
225 # If tables missing or query fails, return empty list
226 return []
227 finally:
228 with contextlib.suppress(Exception):
229 conn.close()
231 return patterns
233 async def _calculate_promotion_priorities(
234 self, patterns: list[MemoryAccessPattern]
235 ) -> list[PromotionCandidate]:
236 """
237 Calculate promotion priority scores for memories.
239 Priority score factors:
240 - Access frequency (40%)
241 - Recency (30%)
242 - Semantic importance (20%)
243 - Category weight (10%)
245 Args:
246 patterns: List of memory access patterns
248 Returns:
249 list[PromotionCandidate]: Sorted by priority score (highest first)
251 """
252 candidates: list[PromotionCandidate] = []
254 for pattern in patterns:
255 # Calculate weighted score
256 frequency_score = min(pattern.access_count / 10.0, 1.0) # Normalize to 0-1
257 recency_score = self._calculate_recency_score(pattern.last_accessed)
258 semantic_score = pattern.semantic_importance
259 category_score = self._get_category_weight(pattern.category)
261 priority_score = (
262 frequency_score * 0.4
263 + recency_score * 0.3
264 + semantic_score * 0.2
265 + category_score * 0.1
266 )
268 if priority_score >= self.promotion_threshold: 268 ↛ 254line 268 didn't jump to line 254 because the condition on line 268 was always true
269 candidate = PromotionCandidate(
270 memory_id=pattern.memory_id,
271 priority_score=priority_score,
272 reason=self._generate_promotion_reason(pattern, priority_score),
273 current_tier="long_term", # Assume long-term by default
274 )
275 candidates.append(candidate)
277 # Sort by priority score (highest first)
278 candidates.sort(key=lambda c: c.priority_score, reverse=True)
280 return candidates
282 def _calculate_recency_score(self, last_accessed: datetime) -> float:
283 """
284 Calculate recency score (0.0-1.0) based on time since last access.
286 Args:
287 last_accessed: Timestamp of last access
289 Returns:
290 float: Recency score (1.0 = accessed now, 0.0 = very old)
292 """
293 time_delta = datetime.now() - last_accessed
294 hours_ago = time_delta.total_seconds() / 3600
296 # Exponential decay: score = e^(-hours/24)
297 # Recent (0-6h): 0.78-1.0
298 # Medium (6-24h): 0.37-0.78
299 # Old (24h+): 0.0-0.37
300 import math
302 return math.exp(-hours_ago / 24)
304 def _get_category_weight(self, category: str) -> float:
305 """
306 Get importance weight for memory category.
308 Args:
309 category: Memory category (facts, preferences, skills, rules, context)
311 Returns:
312 float: Category weight (0.0-1.0)
314 """
315 weights = {
316 "preferences": 1.0, # User preferences are highest priority
317 "skills": 0.9, # User skills/knowledge
318 "rules": 0.8, # Learned rules/patterns
319 "facts": 0.7, # Factual information
320 "context": 0.6, # Contextual information
321 }
322 return weights.get(category, 0.5)
324 def _generate_promotion_reason(
325 self, pattern: MemoryAccessPattern, score: float
326 ) -> str:
327 """Generate human-readable promotion reason."""
328 reasons = []
330 if pattern.access_count > 5: 330 ↛ 333line 330 didn't jump to line 333 because the condition on line 330 was always true
331 reasons.append(f"high access frequency ({pattern.access_count}x)")
333 recency_hours = (datetime.now() - pattern.last_accessed).total_seconds() / 3600
334 if recency_hours < 6: 334 ↛ 337line 334 didn't jump to line 337 because the condition on line 334 was always true
335 reasons.append("recently accessed")
337 if pattern.semantic_importance > 0.8: 337 ↛ 340line 337 didn't jump to line 340 because the condition on line 337 was always true
338 reasons.append("high semantic importance")
340 if pattern.category in ("preferences", "skills"): 340 ↛ 343line 340 didn't jump to line 343 because the condition on line 340 was always true
341 reasons.append(f"critical category ({pattern.category})")
343 reason = ", ".join(reasons) if reasons else "high priority score"
344 return f"{reason} (score: {score:.2f})"
346 async def _promote_memories(
347 self, candidates: list[PromotionCandidate]
348 ) -> list[str]:
349 """
350 Promote high-priority memories to short-term storage.
352 Args:
353 candidates: Sorted list of promotion candidates
355 Returns:
356 list[str]: IDs of promoted memories
358 """
359 promoted: list[str] = []
361 import duckdb
362 from session_buddy.settings import get_database_path
364 for candidate in candidates:
365 try:
366 conn = duckdb.connect(
367 get_database_path(),
368 config={"allow_unsigned_extensions": True},
369 )
370 conn.execute(
371 "UPDATE conversations_v2 SET memory_tier='short_term' WHERE id=?",
372 [candidate.memory_id],
373 )
374 conn.execute(
375 "INSERT INTO memory_promotions (id, memory_id, from_tier, to_tier, reason, priority_score) VALUES (?, ?, ?, ?, ?, ?)",
376 [
377 f"prom_{candidate.memory_id}",
378 candidate.memory_id,
379 candidate.current_tier,
380 "short_term",
381 candidate.reason,
382 candidate.priority_score,
383 ],
384 )
385 conn.close()
386 promoted.append(candidate.memory_id)
387 logger.debug(
388 f"Promoted memory {candidate.memory_id}: {candidate.reason}"
389 )
391 except Exception as e:
392 logger.exception(f"Failed to promote memory {candidate.memory_id}: {e}")
394 return promoted
396 async def _demote_stale_memories(self) -> list[str]:
397 """
398 Demote stale memories from short-term to long-term storage.
400 Returns:
401 list[str]: IDs of demoted memories
403 """
404 demoted: list[str] = []
406 import duckdb
407 from session_buddy.settings import get_database_path
409 conn = duckdb.connect(
410 str(get_database_path()), config={"allow_unsigned_extensions": True}
411 )
412 rows = conn.execute(
413 """
414 SELECT c.id
415 FROM conversations_v2 c
416 LEFT JOIN (
417 SELECT memory_id, MAX(timestamp) AS last_access
418 FROM memory_access_log
419 GROUP BY memory_id
420 ) a ON a.memory_id = c.id
421 WHERE c.memory_tier='short_term'
422 AND (a.last_access IS NULL OR a.last_access < NOW() - INTERVAL 7 DAY)
423 """
424 ).fetchall()
425 for (mid,) in rows:
426 conn.execute(
427 "UPDATE conversations_v2 SET memory_tier='long_term' WHERE id=?",
428 [mid],
429 )
430 demoted.append(str(mid))
431 conn.close()
432 return demoted
434 async def force_analysis(self) -> dict[str, Any]:
435 """
436 Force immediate analysis (for testing/debugging).
438 Returns:
439 dict: Analysis results
441 """
442 logger.info("Forcing conscious agent analysis...")
443 return await self._analyze_and_optimize()