Coverage for session_mgmt_mcp/memory_optimizer.py: 7.43%
246 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 05:22 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 05:22 -0700
1#!/usr/bin/env python3
2"""Memory Optimization for Session Management MCP Server.
4Provides conversation consolidation, summarization, and memory compression capabilities.
5"""
7import hashlib
8import json
9import re
10from datetime import datetime, timedelta
11from typing import Any
13from .reflection_tools import ReflectionDatabase
16class ConversationSummarizer:
17 """Handles conversation summarization using various strategies."""
19 def __init__(self) -> None:
20 self.summarization_strategies = {
21 "extractive": self._extractive_summarization,
22 "template_based": self._template_based_summarization,
23 "keyword_based": self._keyword_based_summarization,
24 }
26 def _extractive_summarization(self, content: str, max_sentences: int = 3) -> str:
27 """Extract most important sentences from conversation."""
28 sentences = re.split(r"[.!?]+", content)
29 sentences = [s.strip() for s in sentences if len(s.strip()) > 20]
31 # Score sentences based on various factors
32 scored_sentences = []
33 for sentence in sentences[:20]: # Limit to prevent performance issues
34 score = 0
36 # Length score (prefer medium-length sentences)
37 length = len(sentence.split())
38 if 10 <= length <= 30:
39 score += 0.3
41 # Technical keywords score
42 tech_keywords = [
43 "function",
44 "class",
45 "error",
46 "exception",
47 "import",
48 "def",
49 "async",
50 "await",
51 "return",
52 "variable",
53 "method",
54 "api",
55 "database",
56 "query",
57 "test",
58 "debug",
59 "fix",
60 "implement",
61 ]
62 for keyword in tech_keywords:
63 if keyword.lower() in sentence.lower():
64 score += 0.1
66 # Code presence score
67 if "`" in sentence or "def " in sentence or "class " in sentence:
68 score += 0.2
70 # Question/problem indicators
71 question_words = ["how", "why", "what", "when", "where", "which"]
72 if any(word in sentence.lower() for word in question_words):
73 score += 0.15
75 # Solution indicators
76 solution_words = ["solution", "fix", "resolve", "implement", "create"]
77 if any(word in sentence.lower() for word in solution_words):
78 score += 0.2
80 scored_sentences.append((score, sentence))
82 # Sort by score and take top sentences
83 scored_sentences.sort(key=lambda x: x[0], reverse=True)
84 top_sentences = [sent for score, sent in scored_sentences[:max_sentences]]
86 return ". ".join(top_sentences) + "."
88 def _template_based_summarization(self, content: str, max_length: int = 300) -> str:
89 """Create summary using templates based on content patterns."""
90 summary_parts = []
92 # Detect code blocks
93 code_blocks = re.findall(r"```[\w]*\n(.*?)\n```", content, re.DOTALL)
94 if code_blocks:
95 summary_parts.append(
96 f"Code discussion involving {len(code_blocks)} code block(s)",
97 )
99 # Detect errors/exceptions
100 error_patterns = [
101 r"(\w+Error): (.+)",
102 r"Exception: (.+)",
103 r"Traceback \(most recent call last\):",
104 r"error: (.+)",
105 ]
106 errors_found = []
107 for pattern in error_patterns:
108 matches = re.findall(pattern, content, re.IGNORECASE)
109 if matches:
110 if isinstance(matches[0], tuple):
111 errors_found.extend([match[0] for match in matches[:2]])
112 else:
113 errors_found.extend(matches[:2])
115 if errors_found:
116 error_summary = ", ".join(set(errors_found))[:100]
117 summary_parts.append(f"Error troubleshooting: {error_summary}")
119 # Detect file/project references
120 file_patterns = [
121 r"(\w+\.py)",
122 r"(\w+\.js)",
123 r"(\w+\.ts)",
124 r"(\w+\.json)",
125 r"(\w+\.md)",
126 ]
127 files_mentioned = set()
128 for pattern in file_patterns:
129 matches = re.findall(pattern, content)
130 files_mentioned.update(matches[:5]) # Limit to 5 files
132 if files_mentioned:
133 files_str = ", ".join(sorted(files_mentioned))
134 summary_parts.append(f"Files discussed: {files_str}")
136 # Detect implementation topics
137 impl_keywords = {
138 "function": "function implementation",
139 "class": "class design",
140 "api": "API development",
141 "database": "database operations",
142 "test": "testing strategies",
143 "deploy": "deployment process",
144 "refactor": "code refactoring",
145 "optimization": "performance optimization",
146 }
148 topics_found = []
149 for keyword, topic in impl_keywords.items():
150 if keyword in content.lower():
151 topics_found.append(topic)
153 if topics_found:
154 topics_str = ", ".join(topics_found[:3])
155 summary_parts.append(f"Topics: {topics_str}")
157 # Combine parts and ensure length limit
158 full_summary = "; ".join(summary_parts)
159 if len(full_summary) > max_length:
160 full_summary = full_summary[:max_length] + "..."
162 return full_summary or "General development discussion"
164 def _keyword_based_summarization(self, content: str, max_keywords: int = 10) -> str:
165 """Create summary based on extracted keywords."""
166 # Clean content
167 content_clean = re.sub(r"```.*?```", "", content, flags=re.DOTALL)
168 content_clean = re.sub(r"`[^`]+`", "", content_clean)
170 # Extract potential keywords
171 words = re.findall(r"\b[a-zA-Z]{3,}\b", content_clean.lower())
173 # Filter common words
174 stop_words = {
175 "the",
176 "and",
177 "for",
178 "are",
179 "but",
180 "not",
181 "you",
182 "all",
183 "can",
184 "had",
185 "her",
186 "was",
187 "one",
188 "our",
189 "out",
190 "day",
191 "get",
192 "has",
193 "him",
194 "his",
195 "how",
196 "its",
197 "may",
198 "new",
199 "now",
200 "old",
201 "see",
202 "two",
203 "who",
204 "boy",
205 "did",
206 "way",
207 "use",
208 "man",
209 "say",
210 "she",
211 "too",
212 "any",
213 "here",
214 "much",
215 "where",
216 "your",
217 "them",
218 "well",
219 "were",
220 "been",
221 "have",
222 "there",
223 "what",
224 "would",
225 "make",
226 "like",
227 "into",
228 "time",
229 "will",
230 "about",
231 "think",
232 "never",
233 "after",
234 "should",
235 "could",
236 "also",
237 "just",
238 "first",
239 "over",
240 "back",
241 "other",
242 }
244 # Count word frequencies
245 word_counts = {}
246 for word in words:
247 if word not in stop_words and len(word) > 3:
248 word_counts[word] = word_counts.get(word, 0) + 1
250 # Get top keywords
251 top_keywords = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
252 keywords = [word for word, count in top_keywords[:max_keywords]]
254 return f"Keywords: {', '.join(keywords)}" if keywords else "General discussion"
256 def summarize_conversation(
257 self,
258 content: str,
259 strategy: str = "template_based",
260 ) -> str:
261 """Summarize a conversation using the specified strategy."""
262 if strategy not in self.summarization_strategies:
263 strategy = "template_based"
265 try:
266 summary = self.summarization_strategies[strategy](content)
267 return summary or "Unable to generate summary"
268 except Exception as e:
269 return f"Summary generation failed: {str(e)[:100]}"
272class ConversationClusterer:
273 """Groups related conversations for consolidation."""
275 def __init__(self) -> None:
276 self.similarity_threshold = 0.6
278 def cluster_conversations(
279 self,
280 conversations: list[dict[str, Any]],
281 ) -> list[list[dict[str, Any]]]:
282 """Group conversations into clusters based on similarity."""
283 if not conversations:
284 return []
286 clusters = []
287 used_conversations = set()
289 for i, conv in enumerate(conversations):
290 if i in used_conversations:
291 continue
293 # Start new cluster
294 cluster = [conv]
295 used_conversations.add(i)
297 # Find similar conversations
298 for j, other_conv in enumerate(conversations[i + 1 :], i + 1):
299 if j in used_conversations:
300 continue
302 similarity = self._calculate_similarity(conv, other_conv)
303 if similarity >= self.similarity_threshold:
304 cluster.append(other_conv)
305 used_conversations.add(j)
307 clusters.append(cluster)
309 return clusters
311 def _calculate_similarity(
312 self,
313 conv1: dict[str, Any],
314 conv2: dict[str, Any],
315 ) -> float:
316 """Calculate similarity between two conversations."""
317 similarity = 0.0
319 # Project similarity
320 if conv1.get("project") == conv2.get("project"):
321 similarity += 0.3
323 # Time proximity (conversations within same day)
324 try:
325 time1 = datetime.fromisoformat(conv1.get("timestamp", ""))
326 time2 = datetime.fromisoformat(conv2.get("timestamp", ""))
327 if abs((time1 - time2).days) <= 1:
328 similarity += 0.2
329 except (ValueError, TypeError):
330 pass
332 # Content similarity (simple keyword overlap)
333 content1_words = set(re.findall(r"\b\w+\b", conv1.get("content", "").lower()))
334 content2_words = set(re.findall(r"\b\w+\b", conv2.get("content", "").lower()))
336 if content1_words and content2_words:
337 overlap = len(content1_words & content2_words)
338 total = len(content1_words | content2_words)
339 if total > 0:
340 similarity += 0.5 * (overlap / total)
342 return min(similarity, 1.0)
345class RetentionPolicyManager:
346 """Manages conversation retention policies."""
348 def __init__(self) -> None:
349 self.default_policies = {
350 "max_age_days": 365, # Keep conversations for 1 year
351 "max_conversations": 10000, # Maximum number of conversations
352 "importance_threshold": 0.3, # Minimum importance score to keep
353 "consolidation_age_days": 30, # Consolidate conversations older than 30 days
354 "compression_ratio": 0.5, # Target 50% size reduction when consolidating
355 }
357 self.importance_factors = {
358 "has_code": 0.3,
359 "has_errors": 0.2,
360 "recent_access": 0.2,
361 "length_score": 0.1,
362 "project_relevance": 0.2,
363 }
365 def calculate_importance_score(self, conversation: dict[str, Any]) -> float:
366 """Calculate importance score for a conversation."""
367 score = 0.0
368 content = conversation.get("content", "")
370 # Has code blocks
371 if "```" in content or "def " in content or "class " in content:
372 score += self.importance_factors["has_code"]
374 # Has error/exception information
375 error_keywords = ["error", "exception", "traceback", "failed", "bug"]
376 if any(keyword in content.lower() for keyword in error_keywords):
377 score += self.importance_factors["has_errors"]
379 # Recent access (would need to track this separately)
380 # For now, use recency as proxy
381 try:
382 conv_time = datetime.fromisoformat(conversation.get("timestamp", ""))
383 days_old = (datetime.now() - conv_time).days
384 if days_old < 7:
385 score += self.importance_factors["recent_access"]
386 elif days_old < 30:
387 score += self.importance_factors["recent_access"] * 0.5
388 except (ValueError, TypeError):
389 pass
391 # Length score (longer conversations might be more important)
392 content_length = len(content)
393 if content_length > 1000:
394 score += self.importance_factors["length_score"]
395 elif content_length > 500:
396 score += self.importance_factors["length_score"] * 0.5
398 # Project relevance (current project gets boost)
399 # This would need current project context
400 score += (
401 self.importance_factors["project_relevance"] * 0.5
402 ) # Default middle score
404 return min(score, 1.0)
406 def get_conversations_for_retention(
407 self,
408 conversations: list[dict[str, Any]],
409 policy: dict[str, Any] | None = None,
410 ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
411 """Determine which conversations to keep vs consolidate/delete."""
412 if not policy:
413 policy = self.default_policies.copy()
415 keep_conversations = []
416 consolidate_conversations = []
418 # Sort conversations by timestamp (newest first)
419 sorted_conversations = sorted(
420 conversations,
421 key=lambda x: x.get("timestamp", ""),
422 reverse=True,
423 )
425 cutoff_date = datetime.now() - timedelta(days=policy["consolidation_age_days"])
426 max_conversations = policy["max_conversations"]
427 importance_threshold = policy["importance_threshold"]
429 for i, conv in enumerate(sorted_conversations):
430 # Always keep recent conversations up to max limit
431 if i < max_conversations // 2: # Keep newest 50% of max limit
432 keep_conversations.append(conv)
433 continue
435 # Check importance for older conversations
436 importance = self.calculate_importance_score(conv)
437 conv["importance_score"] = importance
439 # Check age
440 try:
441 conv_time = datetime.fromisoformat(conv.get("timestamp", ""))
442 is_old = conv_time < cutoff_date
443 except (ValueError, TypeError):
444 is_old = True # If we can't parse timestamp, consider it old
446 if importance >= importance_threshold:
447 keep_conversations.append(conv)
448 elif is_old:
449 consolidate_conversations.append(conv)
450 else:
451 # Recent but low importance - keep for now
452 keep_conversations.append(conv)
454 return keep_conversations, consolidate_conversations
457class MemoryOptimizer:
458 """Main class for memory optimization and compression."""
460 def __init__(self, reflection_db: ReflectionDatabase) -> None:
461 self.reflection_db = reflection_db
462 self.summarizer = ConversationSummarizer()
463 self.clusterer = ConversationClusterer()
464 self.retention_manager = RetentionPolicyManager()
466 # Compression statistics
467 self.compression_stats = {
468 "last_run": None,
469 "conversations_processed": 0,
470 "conversations_consolidated": 0,
471 "space_saved_bytes": 0,
472 "compression_ratio": 0.0,
473 }
475 async def compress_memory(
476 self,
477 policy: dict[str, Any] | None = None,
478 dry_run: bool = False,
479 ) -> dict[str, Any]:
480 """Main method to compress conversation memory."""
481 if not hasattr(self.reflection_db, "conn") or not self.reflection_db.conn:
482 return {"error": "Database not available"}
484 # Get all conversations
485 cursor = self.reflection_db.conn.execute(
486 "SELECT id, content, project, timestamp, metadata FROM conversations ORDER BY timestamp DESC",
487 )
488 conversations = []
489 for row in cursor.fetchall():
490 conv_id, content, project, timestamp, metadata = row
491 conversations.append(
492 {
493 "id": conv_id,
494 "content": content,
495 "project": project,
496 "timestamp": timestamp,
497 "metadata": json.loads(metadata) if metadata else {},
498 "original_size": len(content),
499 },
500 )
502 if not conversations:
503 return {
504 "status": "no_conversations",
505 "message": "No conversations found to compress",
506 }
508 # Apply retention policy
509 keep_conversations, consolidate_conversations = (
510 self.retention_manager.get_conversations_for_retention(
511 conversations,
512 policy,
513 )
514 )
516 # Cluster conversations for consolidation
517 clusters = self.clusterer.cluster_conversations(consolidate_conversations)
519 results = {
520 "status": "success",
521 "dry_run": dry_run,
522 "total_conversations": len(conversations),
523 "conversations_to_keep": len(keep_conversations),
524 "conversations_to_consolidate": len(consolidate_conversations),
525 "clusters_created": len(clusters),
526 "consolidated_summaries": [],
527 "space_saved_estimate": 0,
528 "compression_ratio": 0.0,
529 }
531 # Process clusters
532 total_original_size = sum(
533 conv["original_size"] for conv in consolidate_conversations
534 )
535 total_compressed_size = 0
537 for _i, cluster in enumerate(clusters):
538 if len(cluster) <= 1:
539 continue # Skip single-conversation clusters
541 # Create consolidated summary
542 combined_content = "\n\n---\n\n".join([conv["content"] for conv in cluster])
543 summary = self.summarizer.summarize_conversation(
544 combined_content,
545 "template_based",
546 )
548 # Create consolidated metadata
549 projects = list(
550 {conv["project"] for conv in cluster if conv["project"]},
551 )
552 timestamps = [conv["timestamp"] for conv in cluster]
553 min_time = min(timestamps) if timestamps else ""
554 max_time = max(timestamps) if timestamps else ""
556 consolidated_conv = {
557 "summary": summary,
558 "original_count": len(cluster),
559 "projects": projects,
560 "time_range": f"{min_time} to {max_time}",
561 "original_conversations": [conv["id"] for conv in cluster],
562 "compressed_size": len(summary),
563 "original_size": sum(conv["original_size"] for conv in cluster),
564 }
566 total_compressed_size += len(summary)
567 results["consolidated_summaries"].append(consolidated_conv)
569 # Actually perform consolidation if not dry run
570 if not dry_run:
571 await self._create_consolidated_conversation(consolidated_conv, cluster)
573 # Calculate compression statistics
574 if total_original_size > 0:
575 space_saved = total_original_size - total_compressed_size
576 compression_ratio = space_saved / total_original_size
577 results["space_saved_estimate"] = space_saved
578 results["compression_ratio"] = compression_ratio
580 # Update compression stats
581 self.compression_stats.update(
582 {
583 "last_run": datetime.now().isoformat(),
584 "conversations_processed": len(consolidate_conversations),
585 "conversations_consolidated": sum(
586 len(cluster) for cluster in clusters if len(cluster) > 1
587 ),
588 "space_saved_bytes": results["space_saved_estimate"],
589 "compression_ratio": results["compression_ratio"],
590 },
591 )
593 return results
595 async def _create_consolidated_conversation(
596 self,
597 consolidated_conv: dict[str, Any],
598 original_cluster: list[dict[str, Any]],
599 ) -> None:
600 """Create a new consolidated conversation and remove originals."""
601 # Create new consolidated conversation
602 consolidated_id = hashlib.md5(
603 f"consolidated_{datetime.now().isoformat()}".encode(),
604 ).hexdigest()
606 metadata = {
607 "type": "consolidated",
608 "original_count": consolidated_conv["original_count"],
609 "original_conversations": consolidated_conv["original_conversations"],
610 "projects": consolidated_conv["projects"],
611 "compression_ratio": consolidated_conv["compressed_size"]
612 / consolidated_conv["original_size"],
613 }
615 # Insert consolidated conversation
616 self.reflection_db.conn.execute(
617 """INSERT INTO conversations (id, content, project, timestamp, metadata)
618 VALUES (?, ?, ?, ?, ?)""",
619 (
620 consolidated_id,
621 consolidated_conv["summary"],
622 ", ".join(consolidated_conv["projects"])
623 if consolidated_conv["projects"]
624 else "multiple",
625 datetime.now().isoformat(),
626 json.dumps(metadata),
627 ),
628 )
630 # Remove original conversations
631 original_ids = [conv["id"] for conv in original_cluster]
632 if original_ids:
633 placeholders = ",".join(["?" for _ in original_ids])
634 self.reflection_db.conn.execute(
635 f"DELETE FROM conversations WHERE id IN ({placeholders})",
636 original_ids,
637 )
639 # Commit changes
640 self.reflection_db.conn.commit()
642 async def get_compression_stats(self) -> dict[str, Any]:
643 """Get compression statistics."""
644 return self.compression_stats.copy()
646 async def set_retention_policy(self, policy: dict[str, Any]) -> dict[str, Any]:
647 """Update retention policy settings."""
648 updated_policy = self.retention_manager.default_policies.copy()
649 updated_policy.update(policy)
651 # Validate policy values
652 if updated_policy.get("max_age_days", 0) < 1:
653 return {"error": "max_age_days must be at least 1"}
655 if updated_policy.get("max_conversations", 0) < 100:
656 return {"error": "max_conversations must be at least 100"}
658 self.retention_manager.default_policies = updated_policy
660 return {"status": "success", "updated_policy": updated_policy}