Coverage for session_buddy / memory / conscious_agent.py: 73.94%

160 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1""" 

2Conscious Agent - Background memory optimization inspired by Memori. 

3 

4Analyzes conversation patterns to promote frequently-accessed memories 

5from long-term to short-term storage for faster retrieval. 

6""" 

7 

8import asyncio 

9import contextlib 

10import logging 

11from dataclasses import dataclass 

12from datetime import datetime, timedelta 

13from typing import Any 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18@dataclass 

19class MemoryAccessPattern: 

20 """Tracks memory access frequency and recency.""" 

21 

22 memory_id: str 

23 access_count: int 

24 last_accessed: datetime 

25 access_velocity: float # accesses per hour 

26 semantic_importance: float # 0.0-1.0 

27 category: str # facts, preferences, skills, rules, context 

28 

29 

30@dataclass 

31class PromotionCandidate: 

32 """Memory candidate for promotion to short-term storage.""" 

33 

34 memory_id: str 

35 priority_score: float 

36 reason: str 

37 current_tier: str # long_term, short_term, working 

38 

39 

40class ConsciousAgent: 

41 """ 

42 Background agent that analyzes memory patterns and optimizes storage. 

43 

44 Inspired by Memori's Conscious Agent pattern but adapted for 

45 session-mgmt-mcp's development workflow context. 

46 """ 

47 

48 def __init__( 

49 self, 

50 reflection_db: Any, 

51 analysis_interval_hours: int = 6, 

52 promotion_threshold: float = 0.75, 

53 ): 

54 """ 

55 Initialize the Conscious Agent. 

56 

57 Args: 

58 reflection_db: ReflectionDatabase instance 

59 analysis_interval_hours: How often to run analysis (default: 6 hours) 

60 promotion_threshold: Minimum score for promotion (0.0-1.0) 

61 

62 """ 

63 self.reflection_db = reflection_db 

64 self.analysis_interval = timedelta(hours=analysis_interval_hours) 

65 self.promotion_threshold = promotion_threshold 

66 self._running = False 

67 self._task: asyncio.Task[None] | None = None 

68 

69 async def start(self) -> None: 

70 """Start the background conscious agent.""" 

71 if self._running: 

72 logger.warning("Conscious agent already running") 

73 return 

74 

75 self._running = True 

76 self._task = asyncio.create_task(self._run_loop()) 

77 logger.info( 

78 f"Conscious agent started (interval: {self.analysis_interval.total_seconds() / 3600:.1f}h)" 

79 ) 

80 

81 async def stop(self) -> None: 

82 """Stop the background conscious agent.""" 

83 if not self._running: 

84 return 

85 

86 self._running = False 

87 if self._task: 

88 self._task.cancel() 

89 with contextlib.suppress(asyncio.CancelledError): 

90 await self._task 

91 logger.info("Conscious agent stopped") 

92 

93 async def _run_loop(self) -> None: 

94 """Main background loop for memory analysis.""" 

95 while self._running: 

96 try: 

97 await self._analyze_and_optimize() 

98 await asyncio.sleep(self.analysis_interval.total_seconds()) 

99 except asyncio.CancelledError: 

100 break 

101 except Exception as e: 

102 logger.exception(f"Conscious agent error: {e}") 

103 # Continue running despite errors 

104 await asyncio.sleep(300) # Wait 5 minutes before retry 

105 

106 async def _analyze_and_optimize(self) -> dict[str, Any]: 

107 """ 

108 Analyze memory patterns and optimize storage. 

109 

110 Returns: 

111 dict: Analysis results with promotion statistics 

112 

113 """ 

114 logger.info("Running conscious agent memory analysis...") 

115 

116 # 1. Analyze access patterns 

117 patterns = await self._analyze_access_patterns() 

118 

119 # 2. Calculate priority scores 

120 candidates = await self._calculate_promotion_priorities(patterns) 

121 

122 # 3. Promote high-priority memories 

123 promoted = await self._promote_memories(candidates) 

124 

125 # 4. Demote stale memories 

126 demoted = await self._demote_stale_memories() 

127 

128 results = { 

129 "timestamp": datetime.now().isoformat(), 

130 "patterns_analyzed": len(patterns), 

131 "promotion_candidates": len(candidates), 

132 "promoted_count": len(promoted), 

133 "demoted_count": len(demoted), 

134 "promoted_ids": promoted, 

135 "demoted_ids": demoted, 

136 } 

137 

138 logger.info( 

139 f"Conscious agent analysis complete: " 

140 f"{results['promoted_count']} promoted, " 

141 f"{results['demoted_count']} demoted" 

142 ) 

143 

144 return results 

145 

146 async def _analyze_access_patterns(self) -> list[MemoryAccessPattern]: 

147 """ 

148 Analyze memory access patterns from database. 

149 

150 Returns: 

151 list[MemoryAccessPattern]: Access patterns for all memories 

152 

153 """ 

154 # Query DuckDB for access patterns in v2 tables 

155 import duckdb # Local import to avoid hard dep when unused 

156 from session_buddy.settings import get_database_path 

157 

158 patterns: list[MemoryAccessPattern] = [] 

159 try: 

160 conn = duckdb.connect( 

161 get_database_path(), 

162 config={"allow_unsigned_extensions": True}, 

163 ) 

164 except Exception: 

165 return patterns 

166 

167 try: 

168 rows = conn.execute( 

169 """ 

170 WITH base AS ( 

171 SELECT 

172 l.memory_id, 

173 COUNT(*) AS access_count, 

174 MIN(l.timestamp) AS first_access, 

175 MAX(l.timestamp) AS last_accessed 

176 FROM memory_access_log l 

177 GROUP BY l.memory_id 

178 ) 

179 SELECT 

180 b.memory_id, 

181 b.access_count, 

182 b.first_access, 

183 b.last_accessed, 

184 c.category, 

185 COALESCE(c.importance_score, 0.5) AS importance 

186 FROM base b 

187 JOIN conversations_v2 c ON c.id = b.memory_id 

188 """ 

189 ).fetchall() 

190 

191 now = datetime.now() 

192 for r in rows: 

193 memory_id = str(r[0]) 

194 access_count = int(r[1]) 

195 first_access = r[2] 

196 last_accessed = r[3] 

197 category = str(r[4]) 

198 importance = float(r[5]) 

199 

200 try: 

201 # Compute accesses per hour since first access 

202 hours = max((now - first_access).total_seconds() / 3600.0, 1e-6) 

203 velocity = access_count / hours 

204 except Exception: 

205 velocity = float(access_count) 

206 

207 # Coerce last_accessed to datetime if needed 

208 if not isinstance(last_accessed, datetime): 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true

209 try: 

210 last_accessed = datetime.fromisoformat(str(last_accessed)) 

211 except Exception: 

212 last_accessed = now 

213 

214 patterns.append( 

215 MemoryAccessPattern( 

216 memory_id=memory_id, 

217 access_count=access_count, 

218 last_accessed=last_accessed, 

219 access_velocity=velocity, 

220 semantic_importance=importance, 

221 category=category, 

222 ) 

223 ) 

224 except Exception: 

225 # If tables missing or query fails, return empty list 

226 return [] 

227 finally: 

228 with contextlib.suppress(Exception): 

229 conn.close() 

230 

231 return patterns 

232 

233 async def _calculate_promotion_priorities( 

234 self, patterns: list[MemoryAccessPattern] 

235 ) -> list[PromotionCandidate]: 

236 """ 

237 Calculate promotion priority scores for memories. 

238 

239 Priority score factors: 

240 - Access frequency (40%) 

241 - Recency (30%) 

242 - Semantic importance (20%) 

243 - Category weight (10%) 

244 

245 Args: 

246 patterns: List of memory access patterns 

247 

248 Returns: 

249 list[PromotionCandidate]: Sorted by priority score (highest first) 

250 

251 """ 

252 candidates: list[PromotionCandidate] = [] 

253 

254 for pattern in patterns: 

255 # Calculate weighted score 

256 frequency_score = min(pattern.access_count / 10.0, 1.0) # Normalize to 0-1 

257 recency_score = self._calculate_recency_score(pattern.last_accessed) 

258 semantic_score = pattern.semantic_importance 

259 category_score = self._get_category_weight(pattern.category) 

260 

261 priority_score = ( 

262 frequency_score * 0.4 

263 + recency_score * 0.3 

264 + semantic_score * 0.2 

265 + category_score * 0.1 

266 ) 

267 

268 if priority_score >= self.promotion_threshold: 268 ↛ 254line 268 didn't jump to line 254 because the condition on line 268 was always true

269 candidate = PromotionCandidate( 

270 memory_id=pattern.memory_id, 

271 priority_score=priority_score, 

272 reason=self._generate_promotion_reason(pattern, priority_score), 

273 current_tier="long_term", # Assume long-term by default 

274 ) 

275 candidates.append(candidate) 

276 

277 # Sort by priority score (highest first) 

278 candidates.sort(key=lambda c: c.priority_score, reverse=True) 

279 

280 return candidates 

281 

282 def _calculate_recency_score(self, last_accessed: datetime) -> float: 

283 """ 

284 Calculate recency score (0.0-1.0) based on time since last access. 

285 

286 Args: 

287 last_accessed: Timestamp of last access 

288 

289 Returns: 

290 float: Recency score (1.0 = accessed now, 0.0 = very old) 

291 

292 """ 

293 time_delta = datetime.now() - last_accessed 

294 hours_ago = time_delta.total_seconds() / 3600 

295 

296 # Exponential decay: score = e^(-hours/24) 

297 # Recent (0-6h): 0.78-1.0 

298 # Medium (6-24h): 0.37-0.78 

299 # Old (24h+): 0.0-0.37 

300 import math 

301 

302 return math.exp(-hours_ago / 24) 

303 

304 def _get_category_weight(self, category: str) -> float: 

305 """ 

306 Get importance weight for memory category. 

307 

308 Args: 

309 category: Memory category (facts, preferences, skills, rules, context) 

310 

311 Returns: 

312 float: Category weight (0.0-1.0) 

313 

314 """ 

315 weights = { 

316 "preferences": 1.0, # User preferences are highest priority 

317 "skills": 0.9, # User skills/knowledge 

318 "rules": 0.8, # Learned rules/patterns 

319 "facts": 0.7, # Factual information 

320 "context": 0.6, # Contextual information 

321 } 

322 return weights.get(category, 0.5) 

323 

324 def _generate_promotion_reason( 

325 self, pattern: MemoryAccessPattern, score: float 

326 ) -> str: 

327 """Generate human-readable promotion reason.""" 

328 reasons = [] 

329 

330 if pattern.access_count > 5: 330 ↛ 333line 330 didn't jump to line 333 because the condition on line 330 was always true

331 reasons.append(f"high access frequency ({pattern.access_count}x)") 

332 

333 recency_hours = (datetime.now() - pattern.last_accessed).total_seconds() / 3600 

334 if recency_hours < 6: 334 ↛ 337line 334 didn't jump to line 337 because the condition on line 334 was always true

335 reasons.append("recently accessed") 

336 

337 if pattern.semantic_importance > 0.8: 337 ↛ 340line 337 didn't jump to line 340 because the condition on line 337 was always true

338 reasons.append("high semantic importance") 

339 

340 if pattern.category in ("preferences", "skills"): 340 ↛ 343line 340 didn't jump to line 343 because the condition on line 340 was always true

341 reasons.append(f"critical category ({pattern.category})") 

342 

343 reason = ", ".join(reasons) if reasons else "high priority score" 

344 return f"{reason} (score: {score:.2f})" 

345 

346 async def _promote_memories( 

347 self, candidates: list[PromotionCandidate] 

348 ) -> list[str]: 

349 """ 

350 Promote high-priority memories to short-term storage. 

351 

352 Args: 

353 candidates: Sorted list of promotion candidates 

354 

355 Returns: 

356 list[str]: IDs of promoted memories 

357 

358 """ 

359 promoted: list[str] = [] 

360 

361 import duckdb 

362 from session_buddy.settings import get_database_path 

363 

364 for candidate in candidates: 

365 try: 

366 conn = duckdb.connect( 

367 get_database_path(), 

368 config={"allow_unsigned_extensions": True}, 

369 ) 

370 conn.execute( 

371 "UPDATE conversations_v2 SET memory_tier='short_term' WHERE id=?", 

372 [candidate.memory_id], 

373 ) 

374 conn.execute( 

375 "INSERT INTO memory_promotions (id, memory_id, from_tier, to_tier, reason, priority_score) VALUES (?, ?, ?, ?, ?, ?)", 

376 [ 

377 f"prom_{candidate.memory_id}", 

378 candidate.memory_id, 

379 candidate.current_tier, 

380 "short_term", 

381 candidate.reason, 

382 candidate.priority_score, 

383 ], 

384 ) 

385 conn.close() 

386 promoted.append(candidate.memory_id) 

387 logger.debug( 

388 f"Promoted memory {candidate.memory_id}: {candidate.reason}" 

389 ) 

390 

391 except Exception as e: 

392 logger.exception(f"Failed to promote memory {candidate.memory_id}: {e}") 

393 

394 return promoted 

395 

396 async def _demote_stale_memories(self) -> list[str]: 

397 """ 

398 Demote stale memories from short-term to long-term storage. 

399 

400 Returns: 

401 list[str]: IDs of demoted memories 

402 

403 """ 

404 demoted: list[str] = [] 

405 

406 import duckdb 

407 from session_buddy.settings import get_database_path 

408 

409 conn = duckdb.connect( 

410 str(get_database_path()), config={"allow_unsigned_extensions": True} 

411 ) 

412 rows = conn.execute( 

413 """ 

414 SELECT c.id 

415 FROM conversations_v2 c 

416 LEFT JOIN ( 

417 SELECT memory_id, MAX(timestamp) AS last_access 

418 FROM memory_access_log 

419 GROUP BY memory_id 

420 ) a ON a.memory_id = c.id 

421 WHERE c.memory_tier='short_term' 

422 AND (a.last_access IS NULL OR a.last_access < NOW() - INTERVAL 7 DAY) 

423 """ 

424 ).fetchall() 

425 for (mid,) in rows: 

426 conn.execute( 

427 "UPDATE conversations_v2 SET memory_tier='long_term' WHERE id=?", 

428 [mid], 

429 ) 

430 demoted.append(str(mid)) 

431 conn.close() 

432 return demoted 

433 

434 async def force_analysis(self) -> dict[str, Any]: 

435 """ 

436 Force immediate analysis (for testing/debugging). 

437 

438 Returns: 

439 dict: Analysis results 

440 

441 """ 

442 logger.info("Forcing conscious agent analysis...") 

443 return await self._analyze_and_optimize()