Coverage for session_buddy / memory_optimizer.py: 64.72%

296 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1#!/usr/bin/env python3 

2"""Memory Optimization for Session Management MCP Server. 

3 

4Provides conversation consolidation, summarization, and memory compression capabilities. 

5""" 

6 

7import hashlib 

8import json 

9import operator 

10from dataclasses import asdict, dataclass 

11from datetime import datetime, timedelta 

12from typing import Any 

13 

14from .reflection_tools import ReflectionDatabase 

15from .utils.regex_patterns import SAFE_PATTERNS 

16 

17 

18@dataclass(frozen=True) 

19class ConversationData: 

20 """Immutable conversation data structure.""" 

21 

22 id: str 

23 content: str 

24 project: str | None 

25 timestamp: str 

26 metadata: dict[str, Any] 

27 original_size: int 

28 

29 

30@dataclass(frozen=True) 

31class CompressionResults: 

32 """Results from memory compression operation.""" 

33 

34 status: str 

35 dry_run: bool 

36 total_conversations: int 

37 conversations_to_keep: int 

38 conversations_to_consolidate: int 

39 clusters_created: int 

40 consolidated_summaries: list[dict[str, Any]] 

41 space_saved_estimate: int 

42 compression_ratio: float 

43 

44 

45@dataclass(frozen=True) 

46class ConsolidatedConversation: 

47 """Consolidated conversation metadata.""" 

48 

49 summary: str 

50 original_count: int 

51 projects: list[str] 

52 time_range: str 

53 original_conversations: list[str] 

54 compressed_size: int 

55 original_size: int 

56 

57 

58class ConversationSummarizer: 

59 """Handles conversation summarization using various strategies.""" 

60 

61 def __init__(self) -> None: 

62 self.summarization_strategies = { 

63 "extractive": self._extractive_summarization, 

64 "template_based": self._template_based_summarization, 

65 "keyword_based": self._keyword_based_summarization, 

66 } 

67 

68 def _extractive_summarization(self, content: str, max_sentences: int = 3) -> str: 

69 """Extract most important sentences from conversation.""" 

70 sentence_pattern = SAFE_PATTERNS["sentence_split"] 

71 sentences = sentence_pattern.split(content) 

72 sentences = [s.strip() for s in sentences if len(s.strip()) > 20] 

73 

74 # Score sentences based on various factors 

75 scored_sentences = [] 

76 for sentence in sentences[:20]: # Limit to prevent performance issues 

77 score = 0.0 

78 

79 # Length score (prefer medium-length sentences) 

80 length = len(sentence.split()) 

81 if 10 <= length <= 30: 

82 score += 0.3 

83 

84 # Technical keywords score 

85 tech_keywords = [ 

86 "function", 

87 "class", 

88 "error", 

89 "exception", 

90 "import", 

91 "def", 

92 "async", 

93 "await", 

94 "return", 

95 "variable", 

96 "method", 

97 "api", 

98 "database", 

99 "query", 

100 "test", 

101 "debug", 

102 "fix", 

103 "implement", 

104 ] 

105 for keyword in tech_keywords: 

106 if keyword.lower() in sentence.lower(): 

107 score += 0.1 

108 

109 # Code presence score 

110 if "`" in sentence or "def " in sentence or "class " in sentence: 

111 score += 0.2 

112 

113 # Question/problem indicators 

114 question_words = ["how", "why", "what", "when", "where", "which"] 

115 if any(word in sentence.lower() for word in question_words): 

116 score += 0.15 

117 

118 # Solution indicators 

119 solution_words = ["solution", "fix", "resolve", "implement", "create"] 

120 if any(word in sentence.lower() for word in solution_words): 

121 score += 0.2 

122 

123 scored_sentences.append((score, sentence)) 

124 

125 # Sort by score and take top sentences 

126 scored_sentences.sort(key=operator.itemgetter(0), reverse=True) 

127 top_sentences = [sent for score, sent in scored_sentences[:max_sentences]] 

128 

129 return ". ".join(top_sentences) + "." 

130 

131 def _template_based_summarization(self, content: str, max_length: int = 300) -> str: 

132 """Create summary using templates based on content patterns.""" 

133 summary_parts = [] 

134 

135 # Detect code blocks 

136 code_pattern = SAFE_PATTERNS["python_code_block"] 

137 code_blocks = code_pattern.findall(content) 

138 if code_blocks: 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true

139 summary_parts.append( 

140 f"Code discussion involving {len(code_blocks)} code block(s)", 

141 ) 

142 

143 # Detect errors/exceptions 

144 errors_found = [] 

145 

146 # Check for Python exceptions 

147 exception_pattern = SAFE_PATTERNS["python_exception"] 

148 exc_matches = exception_pattern.findall(content) 

149 if exc_matches: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true

150 errors_found.extend( 

151 [ 

152 match[0] if isinstance(match, tuple) else match 

153 for match in exc_matches[:2] 

154 ], 

155 ) 

156 

157 # Check for Python tracebacks 

158 traceback_pattern = SAFE_PATTERNS["python_traceback"] 

159 if traceback_pattern.search(content): 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true

160 errors_found.append("Python traceback") 

161 

162 if errors_found: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true

163 error_summary = ", ".join(set(errors_found))[:100] 

164 summary_parts.append(f"Error troubleshooting: {error_summary}") 

165 

166 # Detect file/project references 

167 files_mentioned = set() 

168 

169 file_pattern_names = [ 

170 "python_files", 

171 "javascript_files", 

172 "typescript_files", 

173 "json_files", 

174 "markdown_files", 

175 ] 

176 for pattern_name in file_pattern_names: 

177 pattern = SAFE_PATTERNS[pattern_name] 

178 matches = pattern.findall(content) 

179 files_mentioned.update(matches[:5]) # Limit to 5 files 

180 

181 if files_mentioned: 

182 files_str = ", ".join(sorted(files_mentioned)) 

183 summary_parts.append(f"Files discussed: {files_str}") 

184 

185 # Detect implementation topics 

186 impl_keywords = { 

187 "function": "function implementation", 

188 "class": "class design", 

189 "api": "API development", 

190 "database": "database operations", 

191 "test": "testing strategies", 

192 "deploy": "deployment process", 

193 "refactor": "code refactoring", 

194 "optimization": "performance optimization", 

195 } 

196 

197 topics_found = [ 

198 topic 

199 for keyword, topic in impl_keywords.items() 

200 if keyword in content.lower() 

201 ] 

202 

203 if topics_found: 203 ↛ 208line 203 didn't jump to line 208 because the condition on line 203 was always true

204 topics_str = ", ".join(topics_found[:3]) 

205 summary_parts.append(f"Topics: {topics_str}") 

206 

207 # Combine parts and ensure length limit 

208 full_summary = "; ".join(summary_parts) 

209 if len(full_summary) > max_length: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true

210 full_summary = full_summary[:max_length] + "..." 

211 

212 return full_summary or "General development discussion" 

213 

214 def _keyword_based_summarization(self, content: str, max_keywords: int = 10) -> str: 

215 """Create summary based on extracted keywords.""" 

216 # Clean content 

217 code_block_pattern = SAFE_PATTERNS["code_block_cleanup"] 

218 content_clean = code_block_pattern.sub("", content) 

219 

220 inline_code_pattern = SAFE_PATTERNS["inline_code_cleanup"] 

221 content_clean = inline_code_pattern.sub("", content_clean) 

222 

223 # Extract potential keywords 

224 word_pattern = SAFE_PATTERNS["word_extraction"] 

225 words = word_pattern.findall(content_clean.lower()) 

226 

227 # Filter common words 

228 stop_words = { 

229 "the", 

230 "and", 

231 "for", 

232 "are", 

233 "but", 

234 "not", 

235 "you", 

236 "all", 

237 "can", 

238 "had", 

239 "her", 

240 "was", 

241 "one", 

242 "our", 

243 "out", 

244 "day", 

245 "get", 

246 "has", 

247 "him", 

248 "his", 

249 "how", 

250 "its", 

251 "may", 

252 "new", 

253 "now", 

254 "old", 

255 "see", 

256 "two", 

257 "who", 

258 "boy", 

259 "did", 

260 "way", 

261 "use", 

262 "man", 

263 "say", 

264 "she", 

265 "too", 

266 "any", 

267 "here", 

268 "much", 

269 "where", 

270 "your", 

271 "them", 

272 "well", 

273 "were", 

274 "been", 

275 "have", 

276 "there", 

277 "what", 

278 "would", 

279 "make", 

280 "like", 

281 "into", 

282 "time", 

283 "will", 

284 "about", 

285 "think", 

286 "never", 

287 "after", 

288 "should", 

289 "could", 

290 "also", 

291 "just", 

292 "first", 

293 "over", 

294 "back", 

295 "other", 

296 } 

297 

298 # Count word frequencies 

299 word_counts: dict[str, int] = {} 

300 for word in words: 

301 if word not in stop_words and len(word) > 3: 

302 word_counts[word] = word_counts.get(word, 0) + 1 

303 

304 # Get top keywords 

305 top_keywords = sorted( 

306 word_counts.items(), key=operator.itemgetter(1), reverse=True 

307 ) 

308 keywords = [word for word, count in top_keywords[:max_keywords]] 

309 

310 return f"Keywords: {', '.join(keywords)}" if keywords else "General discussion" 

311 

312 def summarize_conversation( 

313 self, 

314 content: str, 

315 strategy: str = "template_based", 

316 ) -> str: 

317 """Summarize a conversation using the specified strategy.""" 

318 if strategy not in self.summarization_strategies: 

319 strategy = "template_based" 

320 

321 try: 

322 summary = self.summarization_strategies[strategy](content) 

323 return summary or "Unable to generate summary" 

324 except Exception as e: 

325 return f"Summary generation failed: {str(e)[:100]}" 

326 

327 

328class ConversationClusterer: 

329 """Groups related conversations for consolidation.""" 

330 

331 def __init__(self) -> None: 

332 self.similarity_threshold = 0.6 

333 

334 def cluster_conversations( 

335 self, 

336 conversations: list[dict[str, Any]], 

337 ) -> list[list[dict[str, Any]]]: 

338 """Group conversations into clusters based on similarity.""" 

339 if not conversations: 

340 return [] 

341 

342 clusters = [] 

343 used_conversations = set() 

344 

345 for i, conv in enumerate(conversations): 

346 if i in used_conversations: 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true

347 continue 

348 

349 # Start new cluster 

350 cluster = [conv] 

351 used_conversations.add(i) 

352 

353 # Find similar conversations 

354 for j, other_conv in enumerate(conversations[i + 1 :], i + 1): 

355 if j in used_conversations: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true

356 continue 

357 

358 similarity = self._calculate_similarity(conv, other_conv) 

359 if similarity >= self.similarity_threshold: 359 ↛ 360line 359 didn't jump to line 360 because the condition on line 359 was never true

360 cluster.append(other_conv) 

361 used_conversations.add(j) 

362 

363 clusters.append(cluster) 

364 

365 return clusters 

366 

367 def _calculate_similarity( 

368 self, 

369 conv1: dict[str, Any], 

370 conv2: dict[str, Any], 

371 ) -> float: 

372 """Calculate similarity between two conversations.""" 

373 similarity = 0.0 

374 

375 # Project similarity 

376 if conv1.get("project") == conv2.get("project"): 

377 similarity += 0.3 

378 

379 # Time proximity (conversations within same day) 

380 from contextlib import suppress 

381 

382 with suppress(ValueError, TypeError): 

383 time1 = datetime.fromisoformat(conv1.get("timestamp", "")) 

384 time2 = datetime.fromisoformat(conv2.get("timestamp", "")) 

385 if abs((time1 - time2).days) <= 1: 385 ↛ 389line 385 didn't jump to line 389

386 similarity += 0.2 

387 

388 # Content similarity (simple keyword overlap) 

389 word_boundary_pattern = SAFE_PATTERNS["word_boundary"] 

390 content1_words = set( 

391 word_boundary_pattern.findall(conv1.get("content", "").lower()), 

392 ) 

393 content2_words = set( 

394 word_boundary_pattern.findall(conv2.get("content", "").lower()), 

395 ) 

396 

397 if content1_words and content2_words: 397 ↛ 398line 397 didn't jump to line 398 because the condition on line 397 was never true

398 overlap = len(content1_words & content2_words) 

399 total = len(content1_words | content2_words) 

400 if total > 0: 

401 similarity += 0.5 * (overlap / total) 

402 

403 return min(similarity, 1.0) 

404 

405 

406class RetentionPolicyManager: 

407 """Manages conversation retention policies.""" 

408 

409 def __init__(self) -> None: 

410 self.default_policies = { 

411 "max_age_days": 365, # Keep conversations for 1 year 

412 "max_conversations": 10000, # Maximum number of conversations 

413 "importance_threshold": 0.3, # Minimum importance score to keep 

414 "consolidation_age_days": 30, # Consolidate conversations older than 30 days 

415 "compression_ratio": 0.5, # Target 50% size reduction when consolidating 

416 } 

417 

418 self.importance_factors = { 

419 "has_code": 0.3, 

420 "has_errors": 0.2, 

421 "recent_access": 0.2, 

422 "length_score": 0.1, 

423 "project_relevance": 0.2, 

424 } 

425 

426 def calculate_importance_score(self, conversation: dict[str, Any]) -> float: 

427 """Calculate importance score for a conversation.""" 

428 score = 0.0 

429 content = conversation.get("content", "") 

430 

431 # Has code blocks 

432 if "```" in content or "def " in content or "class " in content: 

433 score += self.importance_factors["has_code"] 

434 

435 # Has error/exception information 

436 error_keywords = ["error", "exception", "traceback", "failed", "bug"] 

437 if any(keyword in content.lower() for keyword in error_keywords): 

438 score += self.importance_factors["has_errors"] 

439 

440 # Recent access (would need to track this separately) 

441 # For now, use recency as proxy 

442 from contextlib import suppress 

443 

444 with suppress(ValueError, TypeError): 

445 conv_time = datetime.fromisoformat(conversation.get("timestamp", "")) 

446 days_old = (datetime.now() - conv_time).days 

447 if days_old < 7: 447 ↛ 449line 447 didn't jump to line 449 because the condition on line 447 was always true

448 score += self.importance_factors["recent_access"] 

449 elif days_old < 30: 

450 score += self.importance_factors["recent_access"] * 0.5 

451 

452 # Length score (longer conversations might be more important) 

453 content_length = len(content) 

454 if content_length > 1000: 454 ↛ 455line 454 didn't jump to line 455 because the condition on line 454 was never true

455 score += self.importance_factors["length_score"] 

456 elif content_length > 500: 456 ↛ 457line 456 didn't jump to line 457 because the condition on line 456 was never true

457 score += self.importance_factors["length_score"] * 0.5 

458 

459 # Project relevance (current project gets boost) 

460 # This would need current project context 

461 score += ( 

462 self.importance_factors["project_relevance"] * 0.5 

463 ) # Default middle score 

464 

465 return min(score, 1.0) 

466 

467 def get_conversations_for_retention( 

468 self, 

469 conversations: list[dict[str, Any]], 

470 policy: dict[str, Any] | None = None, 

471 ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: 

472 """Determine which conversations to keep vs consolidate/delete.""" 

473 if not policy: 473 ↛ 476line 473 didn't jump to line 476 because the condition on line 473 was always true

474 policy = self.default_policies.copy() 

475 

476 keep_conversations = [] 

477 consolidate_conversations = [] 

478 

479 # Sort conversations by timestamp (newest first) 

480 sorted_conversations = sorted( 

481 conversations, 

482 key=lambda x: x.get("timestamp", ""), 

483 reverse=True, 

484 ) 

485 

486 cutoff_date = datetime.now() - timedelta(days=policy["consolidation_age_days"]) 

487 max_conversations = policy["max_conversations"] 

488 importance_threshold = policy["importance_threshold"] 

489 

490 for i, conv in enumerate(sorted_conversations): 

491 # Always keep recent conversations up to max limit 

492 if i < max_conversations // 2: # Keep newest 50% of max limit 492 ↛ 497line 492 didn't jump to line 497 because the condition on line 492 was always true

493 keep_conversations.append(conv) 

494 continue 

495 

496 # Check importance for older conversations 

497 importance = self.calculate_importance_score(conv) 

498 conv["importance_score"] = importance 

499 

500 # Check age 

501 try: 

502 conv_time = datetime.fromisoformat(conv.get("timestamp", "")) 

503 is_old = conv_time < cutoff_date 

504 except (ValueError, TypeError): 

505 is_old = True # If we can't parse timestamp, consider it old 

506 

507 if importance >= importance_threshold: 

508 keep_conversations.append(conv) 

509 elif is_old: 

510 consolidate_conversations.append(conv) 

511 else: 

512 # Recent but low importance - keep for now 

513 keep_conversations.append(conv) 

514 

515 return keep_conversations, consolidate_conversations 

516 

517 

518class MemoryOptimizer: 

519 """Main class for memory optimization and compression.""" 

520 

521 def __init__(self, reflection_db: ReflectionDatabase) -> None: 

522 self.reflection_db = reflection_db 

523 self.summarizer = ConversationSummarizer() 

524 self.clusterer = ConversationClusterer() 

525 self.retention_manager = RetentionPolicyManager() 

526 

527 # Compression statistics 

528 self.compression_stats: dict[str, None | int | float | str] = { 

529 "last_run": None, 

530 "conversations_processed": 0, 

531 "conversations_consolidated": 0, 

532 "space_saved_bytes": 0, 

533 "compression_ratio": 0.0, 

534 } 

535 

536 async def compress_memory( 

537 self, 

538 policy: dict[str, Any] | None = None, 

539 dry_run: bool = False, 

540 ) -> dict[str, Any]: 

541 """Main method to compress conversation memory.""" 

542 if not self._is_database_available(): 

543 return {"error": "Database not available"} 

544 

545 conversations = await self._load_conversations() 

546 if not conversations: 

547 return self._create_no_conversations_response() 

548 

549 return await self._perform_compression(conversations, policy, dry_run) 

550 

551 def _is_database_available(self) -> bool: 

552 """Check if database connection is available.""" 

553 return ( 

554 hasattr(self.reflection_db, "conn") and self.reflection_db.conn is not None 

555 ) 

556 

557 def _create_no_conversations_response(self) -> dict[str, Any]: 

558 """Create response for when no conversations are found.""" 

559 return { 

560 "status": "no_conversations", 

561 "message": "No conversations found to compress", 

562 } 

563 

564 async def _load_conversations(self) -> list[ConversationData]: 

565 """Load all conversations from database into structured format.""" 

566 if not self.reflection_db.conn: 566 ↛ 567line 566 didn't jump to line 567 because the condition on line 566 was never true

567 return [] 

568 

569 cursor = self.reflection_db.conn.execute( 

570 "SELECT id, content, project, timestamp, metadata FROM conversations " 

571 "ORDER BY timestamp DESC", 

572 ) 

573 

574 return [ 

575 ConversationData( 

576 id=conv_id, 

577 content=content, 

578 project=project, 

579 timestamp=timestamp, 

580 metadata=json.loads(metadata) if metadata else {}, 

581 original_size=len(content), 

582 ) 

583 for conv_id, content, project, timestamp, metadata in cursor.fetchall() 

584 ] 

585 

586 async def _perform_compression( 

587 self, 

588 conversations: list[ConversationData], 

589 policy: dict[str, Any] | None, 

590 dry_run: bool, 

591 ) -> dict[str, Any]: 

592 """Perform the actual compression workflow.""" 

593 # Convert to dicts for existing retention manager compatibility 

594 conv_dicts = [self._to_dict(conv) for conv in conversations] 

595 

596 keep_conversations, consolidate_conversations = ( 

597 self.retention_manager.get_conversations_for_retention(conv_dicts, policy) 

598 ) 

599 

600 clusters = self.clusterer.cluster_conversations(consolidate_conversations) 

601 consolidated_summaries: list[dict[str, Any]] = [] 

602 

603 total_original_size, total_compressed_size = await self._process_clusters( 

604 clusters, 

605 consolidated_summaries, 

606 dry_run, 

607 ) 

608 

609 results = self._create_compression_results( 

610 conversations, 

611 keep_conversations, 

612 consolidate_conversations, 

613 clusters, 

614 consolidated_summaries, 

615 total_original_size, 

616 total_compressed_size, 

617 dry_run, 

618 ) 

619 

620 self._update_compression_stats(results, consolidate_conversations, clusters) 

621 

622 return asdict(results) 

623 

624 def _to_dict(self, conv: ConversationData) -> dict[str, Any]: 

625 """Convert ConversationData to dict for backward compatibility.""" 

626 return { 

627 "id": conv.id, 

628 "content": conv.content, 

629 "project": conv.project, 

630 "timestamp": conv.timestamp, 

631 "metadata": conv.metadata, 

632 "original_size": conv.original_size, 

633 } 

634 

635 async def _process_clusters( 

636 self, 

637 clusters: list[list[dict[str, Any]]], 

638 consolidated_summaries: list[dict[str, Any]], 

639 dry_run: bool, 

640 ) -> tuple[int, int]: 

641 """Process conversation clusters and return size statistics.""" 

642 total_original_size = sum( 

643 conv["original_size"] for cluster in clusters for conv in cluster 

644 ) 

645 total_compressed_size = 0 

646 

647 for cluster in (c for c in clusters if len(c) > 1): 647 ↛ 648line 647 didn't jump to line 648 because the loop on line 647 never started

648 consolidated = self._create_consolidated_conversation(cluster) 

649 total_compressed_size += consolidated.compressed_size 

650 consolidated_summaries.append(asdict(consolidated)) 

651 

652 if not dry_run: 

653 await self._persist_consolidated_conversation(consolidated, cluster) 

654 

655 return total_original_size, total_compressed_size 

656 

657 def _create_consolidated_conversation( 

658 self, 

659 cluster: list[dict[str, Any]], 

660 ) -> ConsolidatedConversation: 

661 """Create a consolidated conversation from a cluster.""" 

662 combined_content = "\n\n---\n\n".join(conv["content"] for conv in cluster) 

663 summary = self.summarizer.summarize_conversation( 

664 combined_content, 

665 "template_based", 

666 ) 

667 

668 projects = [conv["project"] for conv in cluster if conv.get("project")] 

669 timestamps = [conv["timestamp"] for conv in cluster] 

670 

671 return ConsolidatedConversation( 

672 summary=summary, 

673 original_count=len(cluster), 

674 projects=list(set(projects)), # Remove duplicates 

675 time_range=f"{min(timestamps) if timestamps else ''} to {max(timestamps) if timestamps else ''}", 

676 original_conversations=[conv["id"] for conv in cluster], 

677 compressed_size=len(summary), 

678 original_size=sum(conv["original_size"] for conv in cluster), 

679 ) 

680 

681 def _create_compression_results( 

682 self, 

683 conversations: list[ConversationData], 

684 keep_conversations: list[dict[str, Any]], 

685 consolidate_conversations: list[dict[str, Any]], 

686 clusters: list[list[dict[str, Any]]], 

687 consolidated_summaries: list[dict[str, Any]], 

688 total_original_size: int, 

689 total_compressed_size: int, 

690 dry_run: bool, 

691 ) -> CompressionResults: 

692 """Create compression results structure.""" 

693 space_saved = max(0, total_original_size - total_compressed_size) 

694 compression_ratio = ( 

695 space_saved / total_original_size if total_original_size > 0 else 0.0 

696 ) 

697 

698 return CompressionResults( 

699 status="success", 

700 dry_run=dry_run, 

701 total_conversations=len(conversations), 

702 conversations_to_keep=len(keep_conversations), 

703 conversations_to_consolidate=len(consolidate_conversations), 

704 clusters_created=len(clusters), 

705 consolidated_summaries=consolidated_summaries, 

706 space_saved_estimate=space_saved, 

707 compression_ratio=compression_ratio, 

708 ) 

709 

710 def _update_compression_stats( 

711 self, 

712 results: CompressionResults, 

713 consolidate_conversations: list[dict[str, Any]], 

714 clusters: list[list[dict[str, Any]]], 

715 ) -> None: 

716 """Update internal compression statistics.""" 

717 self.compression_stats.update( 

718 { 

719 "last_run": datetime.now().isoformat(), 

720 "conversations_processed": len(consolidate_conversations), 

721 "conversations_consolidated": sum( 

722 len(cluster) for cluster in clusters if len(cluster) > 1 

723 ), 

724 "space_saved_bytes": results.space_saved_estimate, 

725 "compression_ratio": results.compression_ratio, 

726 }, 

727 ) 

728 

729 async def _persist_consolidated_conversation( 

730 self, 

731 consolidated_conv: ConsolidatedConversation, 

732 original_cluster: list[dict[str, Any]], 

733 ) -> None: 

734 """Create a new consolidated conversation and remove originals.""" 

735 # Create new consolidated conversation 

736 consolidated_id = hashlib.md5( 

737 f"consolidated_{datetime.now().isoformat()}".encode(), 

738 usedforsecurity=False, 

739 ).hexdigest() 

740 

741 metadata = { 

742 "type": "consolidated", 

743 "original_count": consolidated_conv.original_count, 

744 "original_conversations": consolidated_conv.original_conversations, 

745 "projects": consolidated_conv.projects, 

746 "compression_ratio": ( 

747 consolidated_conv.compressed_size / consolidated_conv.original_size 

748 if consolidated_conv.original_size > 0 

749 else 0.0 

750 ), 

751 } 

752 

753 # Insert consolidated conversation 

754 if self.reflection_db.conn: 

755 self.reflection_db.conn.execute( 

756 """INSERT INTO conversations (id, content, project, timestamp, metadata) 

757 VALUES (?, ?, ?, ?, ?)""", 

758 ( 

759 consolidated_id, 

760 consolidated_conv.summary, 

761 ", ".join(consolidated_conv.projects) 

762 if consolidated_conv.projects 

763 else "multiple", 

764 datetime.now().isoformat(), 

765 json.dumps(metadata), 

766 ), 

767 ) 

768 

769 # Remove original conversations 

770 original_ids = [conv["id"] for conv in original_cluster] 

771 if original_ids and self.reflection_db.conn: 

772 placeholders = ",".join(["?" for _ in original_ids]) 

773 # Build SQL safely - placeholders generated from list length, not user input 

774 query = "DELETE FROM conversations WHERE id IN (" + placeholders + ")" 

775 self.reflection_db.conn.execute(query, original_ids) 

776 

777 # Commit changes 

778 if self.reflection_db.conn: 

779 self.reflection_db.conn.commit() 

780 

781 async def get_compression_stats(self) -> dict[str, Any]: 

782 """Get compression statistics.""" 

783 return self.compression_stats.copy() 

784 

785 async def set_retention_policy(self, policy: dict[str, Any]) -> dict[str, Any]: 

786 """Update retention policy settings.""" 

787 updated_policy = self.retention_manager.default_policies.copy() 

788 updated_policy.update(policy) 

789 

790 # Validate policy values 

791 if updated_policy.get("max_age_days", 0) < 1: 

792 return {"error": "max_age_days must be at least 1"} 

793 

794 if updated_policy.get("max_conversations", 0) < 100: 

795 return {"error": "max_conversations must be at least 100"} 

796 

797 self.retention_manager.default_policies = updated_policy 

798 

799 return {"status": "success", "updated_policy": updated_policy}