Coverage for session_mgmt_mcp/memory_optimizer.py: 7.43%

246 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-01 05:22 -0700

1#!/usr/bin/env python3 

2"""Memory Optimization for Session Management MCP Server. 

3 

4Provides conversation consolidation, summarization, and memory compression capabilities. 

5""" 

6 

7import hashlib 

8import json 

9import re 

10from datetime import datetime, timedelta 

11from typing import Any 

12 

13from .reflection_tools import ReflectionDatabase 

14 

15 

16class ConversationSummarizer: 

17 """Handles conversation summarization using various strategies.""" 

18 

19 def __init__(self) -> None: 

20 self.summarization_strategies = { 

21 "extractive": self._extractive_summarization, 

22 "template_based": self._template_based_summarization, 

23 "keyword_based": self._keyword_based_summarization, 

24 } 

25 

26 def _extractive_summarization(self, content: str, max_sentences: int = 3) -> str: 

27 """Extract most important sentences from conversation.""" 

28 sentences = re.split(r"[.!?]+", content) 

29 sentences = [s.strip() for s in sentences if len(s.strip()) > 20] 

30 

31 # Score sentences based on various factors 

32 scored_sentences = [] 

33 for sentence in sentences[:20]: # Limit to prevent performance issues 

34 score = 0 

35 

36 # Length score (prefer medium-length sentences) 

37 length = len(sentence.split()) 

38 if 10 <= length <= 30: 

39 score += 0.3 

40 

41 # Technical keywords score 

42 tech_keywords = [ 

43 "function", 

44 "class", 

45 "error", 

46 "exception", 

47 "import", 

48 "def", 

49 "async", 

50 "await", 

51 "return", 

52 "variable", 

53 "method", 

54 "api", 

55 "database", 

56 "query", 

57 "test", 

58 "debug", 

59 "fix", 

60 "implement", 

61 ] 

62 for keyword in tech_keywords: 

63 if keyword.lower() in sentence.lower(): 

64 score += 0.1 

65 

66 # Code presence score 

67 if "`" in sentence or "def " in sentence or "class " in sentence: 

68 score += 0.2 

69 

70 # Question/problem indicators 

71 question_words = ["how", "why", "what", "when", "where", "which"] 

72 if any(word in sentence.lower() for word in question_words): 

73 score += 0.15 

74 

75 # Solution indicators 

76 solution_words = ["solution", "fix", "resolve", "implement", "create"] 

77 if any(word in sentence.lower() for word in solution_words): 

78 score += 0.2 

79 

80 scored_sentences.append((score, sentence)) 

81 

82 # Sort by score and take top sentences 

83 scored_sentences.sort(key=lambda x: x[0], reverse=True) 

84 top_sentences = [sent for score, sent in scored_sentences[:max_sentences]] 

85 

86 return ". ".join(top_sentences) + "." 

87 

88 def _template_based_summarization(self, content: str, max_length: int = 300) -> str: 

89 """Create summary using templates based on content patterns.""" 

90 summary_parts = [] 

91 

92 # Detect code blocks 

93 code_blocks = re.findall(r"```[\w]*\n(.*?)\n```", content, re.DOTALL) 

94 if code_blocks: 

95 summary_parts.append( 

96 f"Code discussion involving {len(code_blocks)} code block(s)", 

97 ) 

98 

99 # Detect errors/exceptions 

100 error_patterns = [ 

101 r"(\w+Error): (.+)", 

102 r"Exception: (.+)", 

103 r"Traceback \(most recent call last\):", 

104 r"error: (.+)", 

105 ] 

106 errors_found = [] 

107 for pattern in error_patterns: 

108 matches = re.findall(pattern, content, re.IGNORECASE) 

109 if matches: 

110 if isinstance(matches[0], tuple): 

111 errors_found.extend([match[0] for match in matches[:2]]) 

112 else: 

113 errors_found.extend(matches[:2]) 

114 

115 if errors_found: 

116 error_summary = ", ".join(set(errors_found))[:100] 

117 summary_parts.append(f"Error troubleshooting: {error_summary}") 

118 

119 # Detect file/project references 

120 file_patterns = [ 

121 r"(\w+\.py)", 

122 r"(\w+\.js)", 

123 r"(\w+\.ts)", 

124 r"(\w+\.json)", 

125 r"(\w+\.md)", 

126 ] 

127 files_mentioned = set() 

128 for pattern in file_patterns: 

129 matches = re.findall(pattern, content) 

130 files_mentioned.update(matches[:5]) # Limit to 5 files 

131 

132 if files_mentioned: 

133 files_str = ", ".join(sorted(files_mentioned)) 

134 summary_parts.append(f"Files discussed: {files_str}") 

135 

136 # Detect implementation topics 

137 impl_keywords = { 

138 "function": "function implementation", 

139 "class": "class design", 

140 "api": "API development", 

141 "database": "database operations", 

142 "test": "testing strategies", 

143 "deploy": "deployment process", 

144 "refactor": "code refactoring", 

145 "optimization": "performance optimization", 

146 } 

147 

148 topics_found = [] 

149 for keyword, topic in impl_keywords.items(): 

150 if keyword in content.lower(): 

151 topics_found.append(topic) 

152 

153 if topics_found: 

154 topics_str = ", ".join(topics_found[:3]) 

155 summary_parts.append(f"Topics: {topics_str}") 

156 

157 # Combine parts and ensure length limit 

158 full_summary = "; ".join(summary_parts) 

159 if len(full_summary) > max_length: 

160 full_summary = full_summary[:max_length] + "..." 

161 

162 return full_summary or "General development discussion" 

163 

164 def _keyword_based_summarization(self, content: str, max_keywords: int = 10) -> str: 

165 """Create summary based on extracted keywords.""" 

166 # Clean content 

167 content_clean = re.sub(r"```.*?```", "", content, flags=re.DOTALL) 

168 content_clean = re.sub(r"`[^`]+`", "", content_clean) 

169 

170 # Extract potential keywords 

171 words = re.findall(r"\b[a-zA-Z]{3,}\b", content_clean.lower()) 

172 

173 # Filter common words 

174 stop_words = { 

175 "the", 

176 "and", 

177 "for", 

178 "are", 

179 "but", 

180 "not", 

181 "you", 

182 "all", 

183 "can", 

184 "had", 

185 "her", 

186 "was", 

187 "one", 

188 "our", 

189 "out", 

190 "day", 

191 "get", 

192 "has", 

193 "him", 

194 "his", 

195 "how", 

196 "its", 

197 "may", 

198 "new", 

199 "now", 

200 "old", 

201 "see", 

202 "two", 

203 "who", 

204 "boy", 

205 "did", 

206 "way", 

207 "use", 

208 "man", 

209 "say", 

210 "she", 

211 "too", 

212 "any", 

213 "here", 

214 "much", 

215 "where", 

216 "your", 

217 "them", 

218 "well", 

219 "were", 

220 "been", 

221 "have", 

222 "there", 

223 "what", 

224 "would", 

225 "make", 

226 "like", 

227 "into", 

228 "time", 

229 "will", 

230 "about", 

231 "think", 

232 "never", 

233 "after", 

234 "should", 

235 "could", 

236 "also", 

237 "just", 

238 "first", 

239 "over", 

240 "back", 

241 "other", 

242 } 

243 

244 # Count word frequencies 

245 word_counts = {} 

246 for word in words: 

247 if word not in stop_words and len(word) > 3: 

248 word_counts[word] = word_counts.get(word, 0) + 1 

249 

250 # Get top keywords 

251 top_keywords = sorted(word_counts.items(), key=lambda x: x[1], reverse=True) 

252 keywords = [word for word, count in top_keywords[:max_keywords]] 

253 

254 return f"Keywords: {', '.join(keywords)}" if keywords else "General discussion" 

255 

256 def summarize_conversation( 

257 self, 

258 content: str, 

259 strategy: str = "template_based", 

260 ) -> str: 

261 """Summarize a conversation using the specified strategy.""" 

262 if strategy not in self.summarization_strategies: 

263 strategy = "template_based" 

264 

265 try: 

266 summary = self.summarization_strategies[strategy](content) 

267 return summary or "Unable to generate summary" 

268 except Exception as e: 

269 return f"Summary generation failed: {str(e)[:100]}" 

270 

271 

272class ConversationClusterer: 

273 """Groups related conversations for consolidation.""" 

274 

275 def __init__(self) -> None: 

276 self.similarity_threshold = 0.6 

277 

278 def cluster_conversations( 

279 self, 

280 conversations: list[dict[str, Any]], 

281 ) -> list[list[dict[str, Any]]]: 

282 """Group conversations into clusters based on similarity.""" 

283 if not conversations: 

284 return [] 

285 

286 clusters = [] 

287 used_conversations = set() 

288 

289 for i, conv in enumerate(conversations): 

290 if i in used_conversations: 

291 continue 

292 

293 # Start new cluster 

294 cluster = [conv] 

295 used_conversations.add(i) 

296 

297 # Find similar conversations 

298 for j, other_conv in enumerate(conversations[i + 1 :], i + 1): 

299 if j in used_conversations: 

300 continue 

301 

302 similarity = self._calculate_similarity(conv, other_conv) 

303 if similarity >= self.similarity_threshold: 

304 cluster.append(other_conv) 

305 used_conversations.add(j) 

306 

307 clusters.append(cluster) 

308 

309 return clusters 

310 

311 def _calculate_similarity( 

312 self, 

313 conv1: dict[str, Any], 

314 conv2: dict[str, Any], 

315 ) -> float: 

316 """Calculate similarity between two conversations.""" 

317 similarity = 0.0 

318 

319 # Project similarity 

320 if conv1.get("project") == conv2.get("project"): 

321 similarity += 0.3 

322 

323 # Time proximity (conversations within same day) 

324 try: 

325 time1 = datetime.fromisoformat(conv1.get("timestamp", "")) 

326 time2 = datetime.fromisoformat(conv2.get("timestamp", "")) 

327 if abs((time1 - time2).days) <= 1: 

328 similarity += 0.2 

329 except (ValueError, TypeError): 

330 pass 

331 

332 # Content similarity (simple keyword overlap) 

333 content1_words = set(re.findall(r"\b\w+\b", conv1.get("content", "").lower())) 

334 content2_words = set(re.findall(r"\b\w+\b", conv2.get("content", "").lower())) 

335 

336 if content1_words and content2_words: 

337 overlap = len(content1_words & content2_words) 

338 total = len(content1_words | content2_words) 

339 if total > 0: 

340 similarity += 0.5 * (overlap / total) 

341 

342 return min(similarity, 1.0) 

343 

344 

345class RetentionPolicyManager: 

346 """Manages conversation retention policies.""" 

347 

348 def __init__(self) -> None: 

349 self.default_policies = { 

350 "max_age_days": 365, # Keep conversations for 1 year 

351 "max_conversations": 10000, # Maximum number of conversations 

352 "importance_threshold": 0.3, # Minimum importance score to keep 

353 "consolidation_age_days": 30, # Consolidate conversations older than 30 days 

354 "compression_ratio": 0.5, # Target 50% size reduction when consolidating 

355 } 

356 

357 self.importance_factors = { 

358 "has_code": 0.3, 

359 "has_errors": 0.2, 

360 "recent_access": 0.2, 

361 "length_score": 0.1, 

362 "project_relevance": 0.2, 

363 } 

364 

365 def calculate_importance_score(self, conversation: dict[str, Any]) -> float: 

366 """Calculate importance score for a conversation.""" 

367 score = 0.0 

368 content = conversation.get("content", "") 

369 

370 # Has code blocks 

371 if "```" in content or "def " in content or "class " in content: 

372 score += self.importance_factors["has_code"] 

373 

374 # Has error/exception information 

375 error_keywords = ["error", "exception", "traceback", "failed", "bug"] 

376 if any(keyword in content.lower() for keyword in error_keywords): 

377 score += self.importance_factors["has_errors"] 

378 

379 # Recent access (would need to track this separately) 

380 # For now, use recency as proxy 

381 try: 

382 conv_time = datetime.fromisoformat(conversation.get("timestamp", "")) 

383 days_old = (datetime.now() - conv_time).days 

384 if days_old < 7: 

385 score += self.importance_factors["recent_access"] 

386 elif days_old < 30: 

387 score += self.importance_factors["recent_access"] * 0.5 

388 except (ValueError, TypeError): 

389 pass 

390 

391 # Length score (longer conversations might be more important) 

392 content_length = len(content) 

393 if content_length > 1000: 

394 score += self.importance_factors["length_score"] 

395 elif content_length > 500: 

396 score += self.importance_factors["length_score"] * 0.5 

397 

398 # Project relevance (current project gets boost) 

399 # This would need current project context 

400 score += ( 

401 self.importance_factors["project_relevance"] * 0.5 

402 ) # Default middle score 

403 

404 return min(score, 1.0) 

405 

406 def get_conversations_for_retention( 

407 self, 

408 conversations: list[dict[str, Any]], 

409 policy: dict[str, Any] | None = None, 

410 ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: 

411 """Determine which conversations to keep vs consolidate/delete.""" 

412 if not policy: 

413 policy = self.default_policies.copy() 

414 

415 keep_conversations = [] 

416 consolidate_conversations = [] 

417 

418 # Sort conversations by timestamp (newest first) 

419 sorted_conversations = sorted( 

420 conversations, 

421 key=lambda x: x.get("timestamp", ""), 

422 reverse=True, 

423 ) 

424 

425 cutoff_date = datetime.now() - timedelta(days=policy["consolidation_age_days"]) 

426 max_conversations = policy["max_conversations"] 

427 importance_threshold = policy["importance_threshold"] 

428 

429 for i, conv in enumerate(sorted_conversations): 

430 # Always keep recent conversations up to max limit 

431 if i < max_conversations // 2: # Keep newest 50% of max limit 

432 keep_conversations.append(conv) 

433 continue 

434 

435 # Check importance for older conversations 

436 importance = self.calculate_importance_score(conv) 

437 conv["importance_score"] = importance 

438 

439 # Check age 

440 try: 

441 conv_time = datetime.fromisoformat(conv.get("timestamp", "")) 

442 is_old = conv_time < cutoff_date 

443 except (ValueError, TypeError): 

444 is_old = True # If we can't parse timestamp, consider it old 

445 

446 if importance >= importance_threshold: 

447 keep_conversations.append(conv) 

448 elif is_old: 

449 consolidate_conversations.append(conv) 

450 else: 

451 # Recent but low importance - keep for now 

452 keep_conversations.append(conv) 

453 

454 return keep_conversations, consolidate_conversations 

455 

456 

457class MemoryOptimizer: 

458 """Main class for memory optimization and compression.""" 

459 

460 def __init__(self, reflection_db: ReflectionDatabase) -> None: 

461 self.reflection_db = reflection_db 

462 self.summarizer = ConversationSummarizer() 

463 self.clusterer = ConversationClusterer() 

464 self.retention_manager = RetentionPolicyManager() 

465 

466 # Compression statistics 

467 self.compression_stats = { 

468 "last_run": None, 

469 "conversations_processed": 0, 

470 "conversations_consolidated": 0, 

471 "space_saved_bytes": 0, 

472 "compression_ratio": 0.0, 

473 } 

474 

475 async def compress_memory( 

476 self, 

477 policy: dict[str, Any] | None = None, 

478 dry_run: bool = False, 

479 ) -> dict[str, Any]: 

480 """Main method to compress conversation memory.""" 

481 if not hasattr(self.reflection_db, "conn") or not self.reflection_db.conn: 

482 return {"error": "Database not available"} 

483 

484 # Get all conversations 

485 cursor = self.reflection_db.conn.execute( 

486 "SELECT id, content, project, timestamp, metadata FROM conversations ORDER BY timestamp DESC", 

487 ) 

488 conversations = [] 

489 for row in cursor.fetchall(): 

490 conv_id, content, project, timestamp, metadata = row 

491 conversations.append( 

492 { 

493 "id": conv_id, 

494 "content": content, 

495 "project": project, 

496 "timestamp": timestamp, 

497 "metadata": json.loads(metadata) if metadata else {}, 

498 "original_size": len(content), 

499 }, 

500 ) 

501 

502 if not conversations: 

503 return { 

504 "status": "no_conversations", 

505 "message": "No conversations found to compress", 

506 } 

507 

508 # Apply retention policy 

509 keep_conversations, consolidate_conversations = ( 

510 self.retention_manager.get_conversations_for_retention( 

511 conversations, 

512 policy, 

513 ) 

514 ) 

515 

516 # Cluster conversations for consolidation 

517 clusters = self.clusterer.cluster_conversations(consolidate_conversations) 

518 

519 results = { 

520 "status": "success", 

521 "dry_run": dry_run, 

522 "total_conversations": len(conversations), 

523 "conversations_to_keep": len(keep_conversations), 

524 "conversations_to_consolidate": len(consolidate_conversations), 

525 "clusters_created": len(clusters), 

526 "consolidated_summaries": [], 

527 "space_saved_estimate": 0, 

528 "compression_ratio": 0.0, 

529 } 

530 

531 # Process clusters 

532 total_original_size = sum( 

533 conv["original_size"] for conv in consolidate_conversations 

534 ) 

535 total_compressed_size = 0 

536 

537 for _i, cluster in enumerate(clusters): 

538 if len(cluster) <= 1: 

539 continue # Skip single-conversation clusters 

540 

541 # Create consolidated summary 

542 combined_content = "\n\n---\n\n".join([conv["content"] for conv in cluster]) 

543 summary = self.summarizer.summarize_conversation( 

544 combined_content, 

545 "template_based", 

546 ) 

547 

548 # Create consolidated metadata 

549 projects = list( 

550 {conv["project"] for conv in cluster if conv["project"]}, 

551 ) 

552 timestamps = [conv["timestamp"] for conv in cluster] 

553 min_time = min(timestamps) if timestamps else "" 

554 max_time = max(timestamps) if timestamps else "" 

555 

556 consolidated_conv = { 

557 "summary": summary, 

558 "original_count": len(cluster), 

559 "projects": projects, 

560 "time_range": f"{min_time} to {max_time}", 

561 "original_conversations": [conv["id"] for conv in cluster], 

562 "compressed_size": len(summary), 

563 "original_size": sum(conv["original_size"] for conv in cluster), 

564 } 

565 

566 total_compressed_size += len(summary) 

567 results["consolidated_summaries"].append(consolidated_conv) 

568 

569 # Actually perform consolidation if not dry run 

570 if not dry_run: 

571 await self._create_consolidated_conversation(consolidated_conv, cluster) 

572 

573 # Calculate compression statistics 

574 if total_original_size > 0: 

575 space_saved = total_original_size - total_compressed_size 

576 compression_ratio = space_saved / total_original_size 

577 results["space_saved_estimate"] = space_saved 

578 results["compression_ratio"] = compression_ratio 

579 

580 # Update compression stats 

581 self.compression_stats.update( 

582 { 

583 "last_run": datetime.now().isoformat(), 

584 "conversations_processed": len(consolidate_conversations), 

585 "conversations_consolidated": sum( 

586 len(cluster) for cluster in clusters if len(cluster) > 1 

587 ), 

588 "space_saved_bytes": results["space_saved_estimate"], 

589 "compression_ratio": results["compression_ratio"], 

590 }, 

591 ) 

592 

593 return results 

594 

595 async def _create_consolidated_conversation( 

596 self, 

597 consolidated_conv: dict[str, Any], 

598 original_cluster: list[dict[str, Any]], 

599 ) -> None: 

600 """Create a new consolidated conversation and remove originals.""" 

601 # Create new consolidated conversation 

602 consolidated_id = hashlib.md5( 

603 f"consolidated_{datetime.now().isoformat()}".encode(), 

604 ).hexdigest() 

605 

606 metadata = { 

607 "type": "consolidated", 

608 "original_count": consolidated_conv["original_count"], 

609 "original_conversations": consolidated_conv["original_conversations"], 

610 "projects": consolidated_conv["projects"], 

611 "compression_ratio": consolidated_conv["compressed_size"] 

612 / consolidated_conv["original_size"], 

613 } 

614 

615 # Insert consolidated conversation 

616 self.reflection_db.conn.execute( 

617 """INSERT INTO conversations (id, content, project, timestamp, metadata) 

618 VALUES (?, ?, ?, ?, ?)""", 

619 ( 

620 consolidated_id, 

621 consolidated_conv["summary"], 

622 ", ".join(consolidated_conv["projects"]) 

623 if consolidated_conv["projects"] 

624 else "multiple", 

625 datetime.now().isoformat(), 

626 json.dumps(metadata), 

627 ), 

628 ) 

629 

630 # Remove original conversations 

631 original_ids = [conv["id"] for conv in original_cluster] 

632 if original_ids: 

633 placeholders = ",".join(["?" for _ in original_ids]) 

634 self.reflection_db.conn.execute( 

635 f"DELETE FROM conversations WHERE id IN ({placeholders})", 

636 original_ids, 

637 ) 

638 

639 # Commit changes 

640 self.reflection_db.conn.commit() 

641 

642 async def get_compression_stats(self) -> dict[str, Any]: 

643 """Get compression statistics.""" 

644 return self.compression_stats.copy() 

645 

646 async def set_retention_policy(self, policy: dict[str, Any]) -> dict[str, Any]: 

647 """Update retention policy settings.""" 

648 updated_policy = self.retention_manager.default_policies.copy() 

649 updated_policy.update(policy) 

650 

651 # Validate policy values 

652 if updated_policy.get("max_age_days", 0) < 1: 

653 return {"error": "max_age_days must be at least 1"} 

654 

655 if updated_policy.get("max_conversations", 0) < 100: 

656 return {"error": "max_conversations must be at least 100"} 

657 

658 self.retention_manager.default_policies = updated_policy 

659 

660 return {"status": "success", "updated_policy": updated_policy}