Coverage for session_buddy / memory_optimizer.py: 64.72%
296 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1#!/usr/bin/env python3
2"""Memory Optimization for Session Management MCP Server.
4Provides conversation consolidation, summarization, and memory compression capabilities.
5"""
7import hashlib
8import json
9import operator
10from dataclasses import asdict, dataclass
11from datetime import datetime, timedelta
12from typing import Any
14from .reflection_tools import ReflectionDatabase
15from .utils.regex_patterns import SAFE_PATTERNS
18@dataclass(frozen=True)
19class ConversationData:
20 """Immutable conversation data structure."""
22 id: str
23 content: str
24 project: str | None
25 timestamp: str
26 metadata: dict[str, Any]
27 original_size: int
30@dataclass(frozen=True)
31class CompressionResults:
32 """Results from memory compression operation."""
34 status: str
35 dry_run: bool
36 total_conversations: int
37 conversations_to_keep: int
38 conversations_to_consolidate: int
39 clusters_created: int
40 consolidated_summaries: list[dict[str, Any]]
41 space_saved_estimate: int
42 compression_ratio: float
45@dataclass(frozen=True)
46class ConsolidatedConversation:
47 """Consolidated conversation metadata."""
49 summary: str
50 original_count: int
51 projects: list[str]
52 time_range: str
53 original_conversations: list[str]
54 compressed_size: int
55 original_size: int
58class ConversationSummarizer:
59 """Handles conversation summarization using various strategies."""
61 def __init__(self) -> None:
62 self.summarization_strategies = {
63 "extractive": self._extractive_summarization,
64 "template_based": self._template_based_summarization,
65 "keyword_based": self._keyword_based_summarization,
66 }
68 def _extractive_summarization(self, content: str, max_sentences: int = 3) -> str:
69 """Extract most important sentences from conversation."""
70 sentence_pattern = SAFE_PATTERNS["sentence_split"]
71 sentences = sentence_pattern.split(content)
72 sentences = [s.strip() for s in sentences if len(s.strip()) > 20]
74 # Score sentences based on various factors
75 scored_sentences = []
76 for sentence in sentences[:20]: # Limit to prevent performance issues
77 score = 0.0
79 # Length score (prefer medium-length sentences)
80 length = len(sentence.split())
81 if 10 <= length <= 30:
82 score += 0.3
84 # Technical keywords score
85 tech_keywords = [
86 "function",
87 "class",
88 "error",
89 "exception",
90 "import",
91 "def",
92 "async",
93 "await",
94 "return",
95 "variable",
96 "method",
97 "api",
98 "database",
99 "query",
100 "test",
101 "debug",
102 "fix",
103 "implement",
104 ]
105 for keyword in tech_keywords:
106 if keyword.lower() in sentence.lower():
107 score += 0.1
109 # Code presence score
110 if "`" in sentence or "def " in sentence or "class " in sentence:
111 score += 0.2
113 # Question/problem indicators
114 question_words = ["how", "why", "what", "when", "where", "which"]
115 if any(word in sentence.lower() for word in question_words):
116 score += 0.15
118 # Solution indicators
119 solution_words = ["solution", "fix", "resolve", "implement", "create"]
120 if any(word in sentence.lower() for word in solution_words):
121 score += 0.2
123 scored_sentences.append((score, sentence))
125 # Sort by score and take top sentences
126 scored_sentences.sort(key=operator.itemgetter(0), reverse=True)
127 top_sentences = [sent for score, sent in scored_sentences[:max_sentences]]
129 return ". ".join(top_sentences) + "."
131 def _template_based_summarization(self, content: str, max_length: int = 300) -> str:
132 """Create summary using templates based on content patterns."""
133 summary_parts = []
135 # Detect code blocks
136 code_pattern = SAFE_PATTERNS["python_code_block"]
137 code_blocks = code_pattern.findall(content)
138 if code_blocks: 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true
139 summary_parts.append(
140 f"Code discussion involving {len(code_blocks)} code block(s)",
141 )
143 # Detect errors/exceptions
144 errors_found = []
146 # Check for Python exceptions
147 exception_pattern = SAFE_PATTERNS["python_exception"]
148 exc_matches = exception_pattern.findall(content)
149 if exc_matches: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true
150 errors_found.extend(
151 [
152 match[0] if isinstance(match, tuple) else match
153 for match in exc_matches[:2]
154 ],
155 )
157 # Check for Python tracebacks
158 traceback_pattern = SAFE_PATTERNS["python_traceback"]
159 if traceback_pattern.search(content): 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true
160 errors_found.append("Python traceback")
162 if errors_found: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true
163 error_summary = ", ".join(set(errors_found))[:100]
164 summary_parts.append(f"Error troubleshooting: {error_summary}")
166 # Detect file/project references
167 files_mentioned = set()
169 file_pattern_names = [
170 "python_files",
171 "javascript_files",
172 "typescript_files",
173 "json_files",
174 "markdown_files",
175 ]
176 for pattern_name in file_pattern_names:
177 pattern = SAFE_PATTERNS[pattern_name]
178 matches = pattern.findall(content)
179 files_mentioned.update(matches[:5]) # Limit to 5 files
181 if files_mentioned:
182 files_str = ", ".join(sorted(files_mentioned))
183 summary_parts.append(f"Files discussed: {files_str}")
185 # Detect implementation topics
186 impl_keywords = {
187 "function": "function implementation",
188 "class": "class design",
189 "api": "API development",
190 "database": "database operations",
191 "test": "testing strategies",
192 "deploy": "deployment process",
193 "refactor": "code refactoring",
194 "optimization": "performance optimization",
195 }
197 topics_found = [
198 topic
199 for keyword, topic in impl_keywords.items()
200 if keyword in content.lower()
201 ]
203 if topics_found: 203 ↛ 208line 203 didn't jump to line 208 because the condition on line 203 was always true
204 topics_str = ", ".join(topics_found[:3])
205 summary_parts.append(f"Topics: {topics_str}")
207 # Combine parts and ensure length limit
208 full_summary = "; ".join(summary_parts)
209 if len(full_summary) > max_length: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true
210 full_summary = full_summary[:max_length] + "..."
212 return full_summary or "General development discussion"
214 def _keyword_based_summarization(self, content: str, max_keywords: int = 10) -> str:
215 """Create summary based on extracted keywords."""
216 # Clean content
217 code_block_pattern = SAFE_PATTERNS["code_block_cleanup"]
218 content_clean = code_block_pattern.sub("", content)
220 inline_code_pattern = SAFE_PATTERNS["inline_code_cleanup"]
221 content_clean = inline_code_pattern.sub("", content_clean)
223 # Extract potential keywords
224 word_pattern = SAFE_PATTERNS["word_extraction"]
225 words = word_pattern.findall(content_clean.lower())
227 # Filter common words
228 stop_words = {
229 "the",
230 "and",
231 "for",
232 "are",
233 "but",
234 "not",
235 "you",
236 "all",
237 "can",
238 "had",
239 "her",
240 "was",
241 "one",
242 "our",
243 "out",
244 "day",
245 "get",
246 "has",
247 "him",
248 "his",
249 "how",
250 "its",
251 "may",
252 "new",
253 "now",
254 "old",
255 "see",
256 "two",
257 "who",
258 "boy",
259 "did",
260 "way",
261 "use",
262 "man",
263 "say",
264 "she",
265 "too",
266 "any",
267 "here",
268 "much",
269 "where",
270 "your",
271 "them",
272 "well",
273 "were",
274 "been",
275 "have",
276 "there",
277 "what",
278 "would",
279 "make",
280 "like",
281 "into",
282 "time",
283 "will",
284 "about",
285 "think",
286 "never",
287 "after",
288 "should",
289 "could",
290 "also",
291 "just",
292 "first",
293 "over",
294 "back",
295 "other",
296 }
298 # Count word frequencies
299 word_counts: dict[str, int] = {}
300 for word in words:
301 if word not in stop_words and len(word) > 3:
302 word_counts[word] = word_counts.get(word, 0) + 1
304 # Get top keywords
305 top_keywords = sorted(
306 word_counts.items(), key=operator.itemgetter(1), reverse=True
307 )
308 keywords = [word for word, count in top_keywords[:max_keywords]]
310 return f"Keywords: {', '.join(keywords)}" if keywords else "General discussion"
312 def summarize_conversation(
313 self,
314 content: str,
315 strategy: str = "template_based",
316 ) -> str:
317 """Summarize a conversation using the specified strategy."""
318 if strategy not in self.summarization_strategies:
319 strategy = "template_based"
321 try:
322 summary = self.summarization_strategies[strategy](content)
323 return summary or "Unable to generate summary"
324 except Exception as e:
325 return f"Summary generation failed: {str(e)[:100]}"
328class ConversationClusterer:
329 """Groups related conversations for consolidation."""
331 def __init__(self) -> None:
332 self.similarity_threshold = 0.6
334 def cluster_conversations(
335 self,
336 conversations: list[dict[str, Any]],
337 ) -> list[list[dict[str, Any]]]:
338 """Group conversations into clusters based on similarity."""
339 if not conversations:
340 return []
342 clusters = []
343 used_conversations = set()
345 for i, conv in enumerate(conversations):
346 if i in used_conversations: 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true
347 continue
349 # Start new cluster
350 cluster = [conv]
351 used_conversations.add(i)
353 # Find similar conversations
354 for j, other_conv in enumerate(conversations[i + 1 :], i + 1):
355 if j in used_conversations: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true
356 continue
358 similarity = self._calculate_similarity(conv, other_conv)
359 if similarity >= self.similarity_threshold: 359 ↛ 360line 359 didn't jump to line 360 because the condition on line 359 was never true
360 cluster.append(other_conv)
361 used_conversations.add(j)
363 clusters.append(cluster)
365 return clusters
367 def _calculate_similarity(
368 self,
369 conv1: dict[str, Any],
370 conv2: dict[str, Any],
371 ) -> float:
372 """Calculate similarity between two conversations."""
373 similarity = 0.0
375 # Project similarity
376 if conv1.get("project") == conv2.get("project"):
377 similarity += 0.3
379 # Time proximity (conversations within same day)
380 from contextlib import suppress
382 with suppress(ValueError, TypeError):
383 time1 = datetime.fromisoformat(conv1.get("timestamp", ""))
384 time2 = datetime.fromisoformat(conv2.get("timestamp", ""))
385 if abs((time1 - time2).days) <= 1: 385 ↛ 389line 385 didn't jump to line 389
386 similarity += 0.2
388 # Content similarity (simple keyword overlap)
389 word_boundary_pattern = SAFE_PATTERNS["word_boundary"]
390 content1_words = set(
391 word_boundary_pattern.findall(conv1.get("content", "").lower()),
392 )
393 content2_words = set(
394 word_boundary_pattern.findall(conv2.get("content", "").lower()),
395 )
397 if content1_words and content2_words: 397 ↛ 398line 397 didn't jump to line 398 because the condition on line 397 was never true
398 overlap = len(content1_words & content2_words)
399 total = len(content1_words | content2_words)
400 if total > 0:
401 similarity += 0.5 * (overlap / total)
403 return min(similarity, 1.0)
406class RetentionPolicyManager:
407 """Manages conversation retention policies."""
409 def __init__(self) -> None:
410 self.default_policies = {
411 "max_age_days": 365, # Keep conversations for 1 year
412 "max_conversations": 10000, # Maximum number of conversations
413 "importance_threshold": 0.3, # Minimum importance score to keep
414 "consolidation_age_days": 30, # Consolidate conversations older than 30 days
415 "compression_ratio": 0.5, # Target 50% size reduction when consolidating
416 }
418 self.importance_factors = {
419 "has_code": 0.3,
420 "has_errors": 0.2,
421 "recent_access": 0.2,
422 "length_score": 0.1,
423 "project_relevance": 0.2,
424 }
426 def calculate_importance_score(self, conversation: dict[str, Any]) -> float:
427 """Calculate importance score for a conversation."""
428 score = 0.0
429 content = conversation.get("content", "")
431 # Has code blocks
432 if "```" in content or "def " in content or "class " in content:
433 score += self.importance_factors["has_code"]
435 # Has error/exception information
436 error_keywords = ["error", "exception", "traceback", "failed", "bug"]
437 if any(keyword in content.lower() for keyword in error_keywords):
438 score += self.importance_factors["has_errors"]
440 # Recent access (would need to track this separately)
441 # For now, use recency as proxy
442 from contextlib import suppress
444 with suppress(ValueError, TypeError):
445 conv_time = datetime.fromisoformat(conversation.get("timestamp", ""))
446 days_old = (datetime.now() - conv_time).days
447 if days_old < 7: 447 ↛ 449line 447 didn't jump to line 449 because the condition on line 447 was always true
448 score += self.importance_factors["recent_access"]
449 elif days_old < 30:
450 score += self.importance_factors["recent_access"] * 0.5
452 # Length score (longer conversations might be more important)
453 content_length = len(content)
454 if content_length > 1000: 454 ↛ 455line 454 didn't jump to line 455 because the condition on line 454 was never true
455 score += self.importance_factors["length_score"]
456 elif content_length > 500: 456 ↛ 457line 456 didn't jump to line 457 because the condition on line 456 was never true
457 score += self.importance_factors["length_score"] * 0.5
459 # Project relevance (current project gets boost)
460 # This would need current project context
461 score += (
462 self.importance_factors["project_relevance"] * 0.5
463 ) # Default middle score
465 return min(score, 1.0)
467 def get_conversations_for_retention(
468 self,
469 conversations: list[dict[str, Any]],
470 policy: dict[str, Any] | None = None,
471 ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
472 """Determine which conversations to keep vs consolidate/delete."""
473 if not policy: 473 ↛ 476line 473 didn't jump to line 476 because the condition on line 473 was always true
474 policy = self.default_policies.copy()
476 keep_conversations = []
477 consolidate_conversations = []
479 # Sort conversations by timestamp (newest first)
480 sorted_conversations = sorted(
481 conversations,
482 key=lambda x: x.get("timestamp", ""),
483 reverse=True,
484 )
486 cutoff_date = datetime.now() - timedelta(days=policy["consolidation_age_days"])
487 max_conversations = policy["max_conversations"]
488 importance_threshold = policy["importance_threshold"]
490 for i, conv in enumerate(sorted_conversations):
491 # Always keep recent conversations up to max limit
492 if i < max_conversations // 2: # Keep newest 50% of max limit 492 ↛ 497line 492 didn't jump to line 497 because the condition on line 492 was always true
493 keep_conversations.append(conv)
494 continue
496 # Check importance for older conversations
497 importance = self.calculate_importance_score(conv)
498 conv["importance_score"] = importance
500 # Check age
501 try:
502 conv_time = datetime.fromisoformat(conv.get("timestamp", ""))
503 is_old = conv_time < cutoff_date
504 except (ValueError, TypeError):
505 is_old = True # If we can't parse timestamp, consider it old
507 if importance >= importance_threshold:
508 keep_conversations.append(conv)
509 elif is_old:
510 consolidate_conversations.append(conv)
511 else:
512 # Recent but low importance - keep for now
513 keep_conversations.append(conv)
515 return keep_conversations, consolidate_conversations
518class MemoryOptimizer:
519 """Main class for memory optimization and compression."""
521 def __init__(self, reflection_db: ReflectionDatabase) -> None:
522 self.reflection_db = reflection_db
523 self.summarizer = ConversationSummarizer()
524 self.clusterer = ConversationClusterer()
525 self.retention_manager = RetentionPolicyManager()
527 # Compression statistics
528 self.compression_stats: dict[str, None | int | float | str] = {
529 "last_run": None,
530 "conversations_processed": 0,
531 "conversations_consolidated": 0,
532 "space_saved_bytes": 0,
533 "compression_ratio": 0.0,
534 }
536 async def compress_memory(
537 self,
538 policy: dict[str, Any] | None = None,
539 dry_run: bool = False,
540 ) -> dict[str, Any]:
541 """Main method to compress conversation memory."""
542 if not self._is_database_available():
543 return {"error": "Database not available"}
545 conversations = await self._load_conversations()
546 if not conversations:
547 return self._create_no_conversations_response()
549 return await self._perform_compression(conversations, policy, dry_run)
551 def _is_database_available(self) -> bool:
552 """Check if database connection is available."""
553 return (
554 hasattr(self.reflection_db, "conn") and self.reflection_db.conn is not None
555 )
557 def _create_no_conversations_response(self) -> dict[str, Any]:
558 """Create response for when no conversations are found."""
559 return {
560 "status": "no_conversations",
561 "message": "No conversations found to compress",
562 }
564 async def _load_conversations(self) -> list[ConversationData]:
565 """Load all conversations from database into structured format."""
566 if not self.reflection_db.conn: 566 ↛ 567line 566 didn't jump to line 567 because the condition on line 566 was never true
567 return []
569 cursor = self.reflection_db.conn.execute(
570 "SELECT id, content, project, timestamp, metadata FROM conversations "
571 "ORDER BY timestamp DESC",
572 )
574 return [
575 ConversationData(
576 id=conv_id,
577 content=content,
578 project=project,
579 timestamp=timestamp,
580 metadata=json.loads(metadata) if metadata else {},
581 original_size=len(content),
582 )
583 for conv_id, content, project, timestamp, metadata in cursor.fetchall()
584 ]
586 async def _perform_compression(
587 self,
588 conversations: list[ConversationData],
589 policy: dict[str, Any] | None,
590 dry_run: bool,
591 ) -> dict[str, Any]:
592 """Perform the actual compression workflow."""
593 # Convert to dicts for existing retention manager compatibility
594 conv_dicts = [self._to_dict(conv) for conv in conversations]
596 keep_conversations, consolidate_conversations = (
597 self.retention_manager.get_conversations_for_retention(conv_dicts, policy)
598 )
600 clusters = self.clusterer.cluster_conversations(consolidate_conversations)
601 consolidated_summaries: list[dict[str, Any]] = []
603 total_original_size, total_compressed_size = await self._process_clusters(
604 clusters,
605 consolidated_summaries,
606 dry_run,
607 )
609 results = self._create_compression_results(
610 conversations,
611 keep_conversations,
612 consolidate_conversations,
613 clusters,
614 consolidated_summaries,
615 total_original_size,
616 total_compressed_size,
617 dry_run,
618 )
620 self._update_compression_stats(results, consolidate_conversations, clusters)
622 return asdict(results)
624 def _to_dict(self, conv: ConversationData) -> dict[str, Any]:
625 """Convert ConversationData to dict for backward compatibility."""
626 return {
627 "id": conv.id,
628 "content": conv.content,
629 "project": conv.project,
630 "timestamp": conv.timestamp,
631 "metadata": conv.metadata,
632 "original_size": conv.original_size,
633 }
635 async def _process_clusters(
636 self,
637 clusters: list[list[dict[str, Any]]],
638 consolidated_summaries: list[dict[str, Any]],
639 dry_run: bool,
640 ) -> tuple[int, int]:
641 """Process conversation clusters and return size statistics."""
642 total_original_size = sum(
643 conv["original_size"] for cluster in clusters for conv in cluster
644 )
645 total_compressed_size = 0
647 for cluster in (c for c in clusters if len(c) > 1): 647 ↛ 648line 647 didn't jump to line 648 because the loop on line 647 never started
648 consolidated = self._create_consolidated_conversation(cluster)
649 total_compressed_size += consolidated.compressed_size
650 consolidated_summaries.append(asdict(consolidated))
652 if not dry_run:
653 await self._persist_consolidated_conversation(consolidated, cluster)
655 return total_original_size, total_compressed_size
657 def _create_consolidated_conversation(
658 self,
659 cluster: list[dict[str, Any]],
660 ) -> ConsolidatedConversation:
661 """Create a consolidated conversation from a cluster."""
662 combined_content = "\n\n---\n\n".join(conv["content"] for conv in cluster)
663 summary = self.summarizer.summarize_conversation(
664 combined_content,
665 "template_based",
666 )
668 projects = [conv["project"] for conv in cluster if conv.get("project")]
669 timestamps = [conv["timestamp"] for conv in cluster]
671 return ConsolidatedConversation(
672 summary=summary,
673 original_count=len(cluster),
674 projects=list(set(projects)), # Remove duplicates
675 time_range=f"{min(timestamps) if timestamps else ''} to {max(timestamps) if timestamps else ''}",
676 original_conversations=[conv["id"] for conv in cluster],
677 compressed_size=len(summary),
678 original_size=sum(conv["original_size"] for conv in cluster),
679 )
681 def _create_compression_results(
682 self,
683 conversations: list[ConversationData],
684 keep_conversations: list[dict[str, Any]],
685 consolidate_conversations: list[dict[str, Any]],
686 clusters: list[list[dict[str, Any]]],
687 consolidated_summaries: list[dict[str, Any]],
688 total_original_size: int,
689 total_compressed_size: int,
690 dry_run: bool,
691 ) -> CompressionResults:
692 """Create compression results structure."""
693 space_saved = max(0, total_original_size - total_compressed_size)
694 compression_ratio = (
695 space_saved / total_original_size if total_original_size > 0 else 0.0
696 )
698 return CompressionResults(
699 status="success",
700 dry_run=dry_run,
701 total_conversations=len(conversations),
702 conversations_to_keep=len(keep_conversations),
703 conversations_to_consolidate=len(consolidate_conversations),
704 clusters_created=len(clusters),
705 consolidated_summaries=consolidated_summaries,
706 space_saved_estimate=space_saved,
707 compression_ratio=compression_ratio,
708 )
710 def _update_compression_stats(
711 self,
712 results: CompressionResults,
713 consolidate_conversations: list[dict[str, Any]],
714 clusters: list[list[dict[str, Any]]],
715 ) -> None:
716 """Update internal compression statistics."""
717 self.compression_stats.update(
718 {
719 "last_run": datetime.now().isoformat(),
720 "conversations_processed": len(consolidate_conversations),
721 "conversations_consolidated": sum(
722 len(cluster) for cluster in clusters if len(cluster) > 1
723 ),
724 "space_saved_bytes": results.space_saved_estimate,
725 "compression_ratio": results.compression_ratio,
726 },
727 )
729 async def _persist_consolidated_conversation(
730 self,
731 consolidated_conv: ConsolidatedConversation,
732 original_cluster: list[dict[str, Any]],
733 ) -> None:
734 """Create a new consolidated conversation and remove originals."""
735 # Create new consolidated conversation
736 consolidated_id = hashlib.md5(
737 f"consolidated_{datetime.now().isoformat()}".encode(),
738 usedforsecurity=False,
739 ).hexdigest()
741 metadata = {
742 "type": "consolidated",
743 "original_count": consolidated_conv.original_count,
744 "original_conversations": consolidated_conv.original_conversations,
745 "projects": consolidated_conv.projects,
746 "compression_ratio": (
747 consolidated_conv.compressed_size / consolidated_conv.original_size
748 if consolidated_conv.original_size > 0
749 else 0.0
750 ),
751 }
753 # Insert consolidated conversation
754 if self.reflection_db.conn:
755 self.reflection_db.conn.execute(
756 """INSERT INTO conversations (id, content, project, timestamp, metadata)
757 VALUES (?, ?, ?, ?, ?)""",
758 (
759 consolidated_id,
760 consolidated_conv.summary,
761 ", ".join(consolidated_conv.projects)
762 if consolidated_conv.projects
763 else "multiple",
764 datetime.now().isoformat(),
765 json.dumps(metadata),
766 ),
767 )
769 # Remove original conversations
770 original_ids = [conv["id"] for conv in original_cluster]
771 if original_ids and self.reflection_db.conn:
772 placeholders = ",".join(["?" for _ in original_ids])
773 # Build SQL safely - placeholders generated from list length, not user input
774 query = "DELETE FROM conversations WHERE id IN (" + placeholders + ")"
775 self.reflection_db.conn.execute(query, original_ids)
777 # Commit changes
778 if self.reflection_db.conn:
779 self.reflection_db.conn.commit()
781 async def get_compression_stats(self) -> dict[str, Any]:
782 """Get compression statistics."""
783 return self.compression_stats.copy()
785 async def set_retention_policy(self, policy: dict[str, Any]) -> dict[str, Any]:
786 """Update retention policy settings."""
787 updated_policy = self.retention_manager.default_policies.copy()
788 updated_policy.update(policy)
790 # Validate policy values
791 if updated_policy.get("max_age_days", 0) < 1:
792 return {"error": "max_age_days must be at least 1"}
794 if updated_policy.get("max_conversations", 0) < 100:
795 return {"error": "max_conversations must be at least 100"}
797 self.retention_manager.default_policies = updated_policy
799 return {"status": "success", "updated_policy": updated_policy}