Coverage for session_buddy / tools / search_tools.py: 16.05%
330 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1#!/usr/bin/env python3
2"""Search and reflection tools for session-mgmt-mcp.
4Following crackerjack architecture patterns with focused, single-responsibility tools
5for conversation memory, semantic search, and knowledge retrieval.
7Refactored to use utility modules for reduced code duplication.
8"""
10from __future__ import annotations
12import operator
13from datetime import datetime, timedelta
14from typing import TYPE_CHECKING, Any
16from session_buddy.utils.database_helpers import require_reflection_database
17from session_buddy.utils.error_handlers import _get_logger, validate_required
18from session_buddy.utils.messages import ToolMessages
19from session_buddy.utils.tool_wrapper import (
20 execute_database_tool,
21 execute_simple_database_tool,
22)
24if TYPE_CHECKING:
25 from session_buddy.adapters.reflection_adapter import (
26 ReflectionDatabaseAdapter as ReflectionDatabase,
27 )
30# ============================================================================
31# Token Optimization (Standalone - No Database)
32# ============================================================================
35async def _optimize_search_results_impl(
36 results: list[dict[str, Any]],
37 optimize_tokens: bool,
38 max_tokens: int,
39 query: str,
40) -> dict[str, Any]:
41 """Apply token optimization to search results if available."""
42 try:
43 from session_buddy.token_optimizer import TokenOptimizer
45 if optimize_tokens and results:
46 optimizer = TokenOptimizer()
47 (
48 optimized_results,
49 optimization_info,
50 ) = await optimizer.optimize_search_results(
51 results, "truncate_old", max_tokens
52 )
53 return {
54 "results": optimized_results,
55 "optimized": True,
56 "optimization_info": optimization_info,
57 }
59 return {"results": results, "optimized": False, "token_count": 0}
60 except ImportError:
61 _get_logger().info("Token optimizer not available, returning results as-is")
62 return {"results": results, "optimized": False, "token_count": 0}
63 except Exception as e:
64 _get_logger().exception(f"Search optimization failed: {e}")
65 return {"results": results, "optimized": False, "error": str(e)}
68# ============================================================================
69# Store Reflection
70# ============================================================================
73async def _store_reflection_operation(
74 db: ReflectionDatabase, content: str, tags: list[str]
75) -> dict[str, Any]:
76 """Execute reflection storage operation."""
77 reflection_id = await db.store_reflection(content, tags)
78 return {"success": True, "id": reflection_id, "content": content, "tags": tags}
81def _format_store_reflection(result: dict[str, Any]) -> str:
82 """Format reflection storage result."""
83 tag_text = f" (tags: {', '.join(result['tags'])})" if result["tags"] else ""
84 return f"✅ Reflection stored successfully with ID: {result['id']}{tag_text}"
87async def _store_reflection_impl(content: str, tags: list[str] | None = None) -> str:
88 """Store an important insight or reflection for future reference."""
90 def validator() -> None:
91 validate_required(content, "content")
93 async def operation(db: ReflectionDatabase) -> dict[str, Any]:
94 return await _store_reflection_operation(db, content, tags or [])
96 return await execute_database_tool(
97 operation, _format_store_reflection, "Store reflection", validator
98 )
101# ============================================================================
102# Quick Search
103# ============================================================================
106async def _quick_search_operation(
107 db: ReflectionDatabase,
108 query: str,
109 project: str | None,
110 min_score: float,
111 limit: int = 5,
112) -> str:
113 """Execute quick search and format results."""
114 total_results = await db.search_conversations(
115 query=query, project=project, min_score=min_score, limit=limit
116 )
118 if not total_results:
119 return f"🔍 No results found for '{query}'"
121 top_result = total_results[0]
122 result = f"🔍 **{len(total_results)} results** for '{query}'\n\n"
123 result += f"**Top Result** (score: {top_result.get('similarity', 'N/A')}):\n"
124 result += f"{top_result.get('content', '')[:200]}..."
126 if len(total_results) > 1:
127 result += f"\n\n💡 Use get_more_results to see additional {len(total_results) - 1} results"
129 return result
132async def _quick_search_impl(
133 query: str,
134 project: str | None = None,
135 min_score: float = 0.7,
136 limit: int = 5,
137) -> str:
138 """Quick search that returns only the count and top result for fast overview."""
140 async def operation(db: ReflectionDatabase) -> str:
141 return await _quick_search_operation(db, query, project, min_score, limit)
143 return await execute_simple_database_tool(operation, "Quick search")
146# ============================================================================
147# Search Summary
148# ============================================================================
151def _extract_key_terms(all_content: str) -> list[str]:
152 """Extract key terms from content."""
153 word_freq: dict[str, int] = {}
154 for word in all_content.split():
155 if len(word) > 4: # Skip short words
156 word_freq[word.lower()] = word_freq.get(word.lower(), 0) + 1
158 if word_freq:
159 top_words = sorted(word_freq.items(), key=operator.itemgetter(1), reverse=True)[
160 :5
161 ]
162 return [w[0] for w in top_words]
163 return []
166async def _format_search_summary(query: str, results: list[dict[str, Any]]) -> str:
167 """Format complete search summary."""
168 if not results:
169 return f"🔍 No results found for '{query}'"
171 lines = [
172 f"🔍 **Search Summary for '{query}'**\n",
173 f"**Found**: {len(results)} relevant conversations\n",
174 ]
176 # Time distribution
177 dates = [r.get("timestamp", "") for r in results if r.get("timestamp")]
178 if dates:
179 lines.append(f"**Time Range**: {min(dates)} to {max(dates)}\n")
181 # Key themes
182 all_content = " ".join([r.get("content", "")[:100] for r in results])
183 key_terms = _extract_key_terms(all_content)
184 if key_terms:
185 lines.append(f"**Key Terms**: {', '.join(key_terms)}\n")
187 lines.append("\n💡 Use search with same query to see individual results")
189 return "".join(lines)
192async def _search_summary_operation(
193 db: ReflectionDatabase, query: str, project: str | None, min_score: float
194) -> str:
195 """Execute search summary operation."""
196 results = await db.search_conversations(
197 query=query, project=project, min_score=min_score, limit=20
198 )
199 return await _format_search_summary(query, results)
202async def _search_summary_impl(
203 query: str,
204 project: str | None = None,
205 min_score: float = 0.7,
206) -> str:
207 """Get aggregated insights from search results without individual result details."""
209 async def operation(db: ReflectionDatabase) -> str:
210 return await _search_summary_operation(db, query, project, min_score)
212 return await execute_simple_database_tool(operation, "Search summary")
215# ============================================================================
216# Pagination - Get More Results
217# ============================================================================
220def _build_pagination_output(
221 query: str,
222 offset: int,
223 paginated_results: list[dict[str, Any]],
224 total_results: int,
225 limit: int,
226) -> str:
227 """Build the complete output for paginated results."""
228 if not paginated_results:
229 return f"🔍 No more results for '{query}' (offset: {offset})"
231 output = f"🔍 **Results {offset + 1}-{offset + len(paginated_results)}** for '{query}'\n\n"
233 for i, result in enumerate(paginated_results, offset + 1):
234 if result.get("timestamp"):
235 output += f"**{i}.** ({result['timestamp']}) "
236 else:
237 output += f"**{i}.** "
238 output += f"{result.get('content', '')[:150]}...\n\n"
240 if offset + limit < total_results:
241 remaining = total_results - (offset + limit)
242 output += f"💡 {remaining} more results available"
244 return output
247async def _get_more_results_operation(
248 db: ReflectionDatabase,
249 query: str,
250 offset: int,
251 limit: int,
252 project: str | None,
253) -> str:
254 """Execute pagination operation."""
255 results = await db.search_conversations(
256 query=query, project=project, limit=limit + offset
257 )
258 paginated_results = results[offset : offset + limit]
259 return _build_pagination_output(
260 query, offset, paginated_results, len(results), limit
261 )
264async def _get_more_results_impl(
265 query: str,
266 offset: int = 3,
267 limit: int = 3,
268 project: str | None = None,
269) -> str:
270 """Get additional search results after an initial search (pagination support)."""
272 async def operation(db: ReflectionDatabase) -> str:
273 return await _get_more_results_operation(db, query, offset, limit, project)
275 return await execute_simple_database_tool(operation, "Get more results")
278# ============================================================================
279# Search by File
280# ============================================================================
283def _extract_file_excerpt(content: str, file_path: str) -> str:
284 """Extract a relevant excerpt from content based on the file path."""
285 if file_path in content:
286 start = max(0, content.find(file_path) - 50)
287 end = min(len(content), content.find(file_path) + len(file_path) + 100)
288 return content[start:end]
289 return content[:150]
292async def _format_file_search_results(
293 file_path: str, results: list[dict[str, Any]]
294) -> str:
295 """Format file search results."""
296 if not results:
297 return f"🔍 No conversations found about file: {file_path}"
299 output = f"🔍 **{len(results)} conversations** about `{file_path}`\n\n"
301 for i, result in enumerate(results, 1):
302 output += f"**{i}.** "
303 if result.get("timestamp"):
304 output += f"({result['timestamp']}) "
306 excerpt = _extract_file_excerpt(result.get("content", ""), file_path)
307 output += f"{excerpt}...\n\n"
309 return output
312async def _search_by_file_operation(
313 db: ReflectionDatabase, file_path: str, limit: int, project: str | None
314) -> str:
315 """Execute file search operation."""
316 results = await db.search_conversations(
317 query=file_path, project=project, limit=limit
318 )
319 return await _format_file_search_results(file_path, results)
322async def _search_by_file_impl(
323 file_path: str,
324 limit: int = 10,
325 project: str | None = None,
326) -> str:
327 """Search for conversations that analyzed a specific file."""
329 async def operation(db: ReflectionDatabase) -> str:
330 return await _search_by_file_operation(db, file_path, limit, project)
332 return await execute_simple_database_tool(operation, "Search by file")
335# ============================================================================
336# Search by Concept
337# ============================================================================
340def _extract_relevant_excerpt(content: str, concept: str) -> str:
341 """Extract a relevant excerpt from content based on the concept."""
342 if concept.lower() in content.lower():
343 start = max(0, content.lower().find(concept.lower()) - 75)
344 end = min(len(content), start + 200)
345 return content[start:end]
346 return content[:150]
349def _extract_mentioned_files(results: list[dict[str, Any]]) -> list[str]:
350 """Extract mentioned files from search results."""
351 try:
352 from session_buddy.utils.regex_patterns import SAFE_PATTERNS
354 all_content = " ".join([r.get("content", "") for r in results])
355 files = []
357 for pattern_name in (
358 "python_files",
359 "javascript_files",
360 "config_files",
361 "documentation_files",
362 ):
363 pattern = SAFE_PATTERNS[pattern_name]
364 matches = pattern.findall(all_content)
365 files.extend(matches)
367 return list(set(files))[:10] if files else []
368 except Exception:
369 return []
372async def _format_concept_results(
373 concept: str, results: list[dict[str, Any]], include_files: bool
374) -> str:
375 """Format concept search results."""
376 if not results:
377 return f"🔍 No conversations found about concept: {concept}"
379 output = f"🔍 **{len(results)} conversations** about `{concept}`\n\n"
381 for i, result in enumerate(results, 1):
382 output += f"**{i}.** "
383 if result.get("timestamp"):
384 output += f"({result['timestamp']}) "
385 if result.get("similarity"):
386 output += f"(relevance: {result['similarity']:.2f}) "
388 excerpt = _extract_relevant_excerpt(result.get("content", ""), concept)
389 output += f"{excerpt}...\n\n"
391 if include_files:
392 files = _extract_mentioned_files(results)
393 if files:
394 output += f"📁 **Related Files**: {', '.join(files)}"
396 return output
399async def _search_by_concept_operation(
400 db: ReflectionDatabase,
401 concept: str,
402 include_files: bool,
403 limit: int,
404 project: str | None,
405) -> str:
406 """Execute concept search operation."""
407 results = await db.search_conversations(
408 query=concept, project=project, limit=limit, min_score=0.6
409 )
410 return await _format_concept_results(concept, results, include_files)
413async def _search_by_concept_impl(
414 concept: str,
415 include_files: bool = True,
416 limit: int = 10,
417 project: str | None = None,
418) -> str:
419 """Search for conversations about a specific development concept."""
421 async def operation(db: ReflectionDatabase) -> str:
422 return await _search_by_concept_operation(
423 db, concept, include_files, limit, project
424 )
426 return await execute_simple_database_tool(operation, "Search by concept")
429# ============================================================================
430# Database Management
431# ============================================================================
434async def _reset_reflection_database_impl() -> str:
435 """Reset the reflection database connection to fix lock issues."""
436 try:
437 await require_reflection_database()
438 return "✅ Reflection database connection verified successfully"
439 except Exception as e:
440 return ToolMessages.operation_failed("Database reset", e)
443async def _reflection_stats_operation(db: ReflectionDatabase) -> str:
444 """Execute reflection stats operation."""
445 stats = await db.get_stats()
446 output = "📊 **Reflection Database Statistics**\n\n"
447 for key, value in stats.items():
448 output += f"**{key.replace('_', ' ').title()}**: {value}\n"
449 return output
452async def _reflection_stats_impl() -> str:
453 """Get statistics about the reflection database."""
455 async def operation(db: ReflectionDatabase) -> str:
456 return await _reflection_stats_operation(db)
458 return await execute_simple_database_tool(operation, "Reflection stats")
461# ============================================================================
462# Search Code
463# ============================================================================
466def _extract_code_blocks_from_content(content: str) -> list[str]:
467 """Extract code blocks from content using regex patterns."""
468 try:
469 from session_buddy.utils.regex_patterns import SAFE_PATTERNS
471 code_pattern = SAFE_PATTERNS["generic_code_block"]
472 matches = code_pattern.findall(content)
473 return matches if matches is not None else []
474 except Exception:
475 return []
478async def _format_code_search_results(
479 query: str, results: list[dict[str, Any]], pattern_type: str | None
480) -> str:
481 """Format code search results."""
482 if not results:
483 return f"🔍 No code patterns found for: {query}"
485 output = f"🔍 **{len(results)} code patterns** for `{query}`"
486 if pattern_type:
487 output += f" (type: {pattern_type})"
488 output += "\n\n"
490 for i, result in enumerate(results, 1):
491 output += f"**{i}.** "
492 if result.get("timestamp"):
493 output += f"({result['timestamp']}) "
495 content = result.get("content", "")
496 code_blocks = _extract_code_blocks_from_content(content)
498 if code_blocks:
499 code = code_blocks[0][:200]
500 output += f"\n```\n{code}...\n```\n\n"
501 else:
502 if query.lower() in content.lower():
503 start = max(0, content.lower().find(query.lower()) - 50)
504 end = min(len(content), start + 150)
505 excerpt = content[start:end]
506 else:
507 excerpt = content[:100]
508 output += f"{excerpt}...\n\n"
510 return output
513async def _search_code_operation(
514 db: ReflectionDatabase,
515 query: str,
516 pattern_type: str | None,
517 limit: int,
518 project: str | None,
519) -> str:
520 """Execute code search operation."""
521 code_query = f"code {query}"
522 if pattern_type:
523 code_query += f" {pattern_type}"
525 results = await db.search_conversations(
526 query=code_query, project=project, limit=limit, min_score=0.5
527 )
528 return await _format_code_search_results(query, results, pattern_type)
531async def _search_code_impl(
532 query: str,
533 pattern_type: str | None = None,
534 limit: int = 10,
535 project: str | None = None,
536) -> str:
537 """Search for code patterns in conversations using AST parsing."""
539 async def operation(db: ReflectionDatabase) -> str:
540 return await _search_code_operation(db, query, pattern_type, limit, project)
542 return await execute_simple_database_tool(operation, "Search code")
545# ============================================================================
546# Search Errors
547# ============================================================================
550def _find_best_error_excerpt(content: str) -> str:
551 """Find the most relevant excerpt from content based on error keywords."""
552 error_keywords = ["error", "exception", "traceback", "failed", "fix"]
553 best_excerpt = ""
554 best_score = 0
556 for keyword in error_keywords:
557 if keyword in content.lower():
558 start = max(0, content.lower().find(keyword) - 75)
559 end = min(len(content), start + 200)
560 excerpt = content[start:end]
561 score = content.lower().count(keyword)
562 if score > best_score:
563 best_score = score
564 best_excerpt = excerpt
566 return best_excerpt or content[:150]
569async def _format_error_search_results(
570 query: str, results: list[dict[str, Any]], error_type: str | None
571) -> str:
572 """Format error search results."""
573 if not results:
574 return f"🔍 No error patterns found for: {query}"
576 output = f"🔍 **{len(results)} error contexts** for `{query}`"
577 if error_type:
578 output += f" (type: {error_type})"
579 output += "\n\n"
581 for i, result in enumerate(results, 1):
582 output += f"**{i}.** "
583 if result.get("timestamp"):
584 output += f"({result['timestamp']}) "
586 best_excerpt = _find_best_error_excerpt(result.get("content", ""))
587 output += f"{best_excerpt}...\n\n"
589 return output
592async def _search_errors_operation(
593 db: ReflectionDatabase,
594 query: str,
595 error_type: str | None,
596 limit: int,
597 project: str | None,
598) -> str:
599 """Execute error search operation."""
600 error_query = f"error {query}"
601 if error_type:
602 error_query += f" {error_type}"
604 results = await db.search_conversations(
605 query=error_query, project=project, limit=limit, min_score=0.4
606 )
607 return await _format_error_search_results(query, results, error_type)
610async def _search_errors_impl(
611 query: str,
612 error_type: str | None = None,
613 limit: int = 10,
614 project: str | None = None,
615) -> str:
616 """Search for error patterns and debugging contexts in conversations."""
618 async def operation(db: ReflectionDatabase) -> str:
619 return await _search_errors_operation(db, query, error_type, limit, project)
621 return await execute_simple_database_tool(operation, "Search errors")
624# ============================================================================
625# Temporal Search
626# ============================================================================
629def _parse_time_expression(time_expression: str) -> datetime | None:
630 """Parse natural language time expression into datetime."""
631 now = datetime.now()
633 if "yesterday" in time_expression.lower():
634 return now - timedelta(days=1)
635 if "last week" in time_expression.lower():
636 return now - timedelta(days=7)
637 if "last month" in time_expression.lower():
638 return now - timedelta(days=30)
639 if "today" in time_expression.lower():
640 return now - timedelta(hours=24)
642 return None
645async def _format_temporal_results(
646 time_expression: str, query: str | None, results: list[dict[str, Any]]
647) -> str:
648 """Format temporal search results."""
649 if not results:
650 return f"🔍 No conversations found for time period: {time_expression}"
652 output = f"🔍 **{len(results)} conversations** from `{time_expression}`"
653 if query:
654 output += f" matching `{query}`"
655 output += "\n\n"
657 for i, result in enumerate(results, 1):
658 output += f"**{i}.** "
659 if result.get("timestamp"):
660 output += f"({result['timestamp']}) "
662 content = result.get("content", "")
663 output += f"{content[:150]}...\n\n"
665 return output
668async def _search_temporal_operation(
669 db: ReflectionDatabase,
670 time_expression: str,
671 query: str | None,
672 limit: int,
673 project: str | None,
674) -> str:
675 """Execute temporal search operation."""
676 start_time = _parse_time_expression(time_expression)
677 search_query = query or ""
678 results = await db.search_conversations(
679 query=search_query, project=project, limit=limit * 2
680 )
682 if start_time:
683 # Simplified filter - would need proper timestamp parsing
684 filtered_results = results.copy()
685 results = filtered_results[:limit]
687 return await _format_temporal_results(time_expression, query, results)
690async def _search_temporal_impl(
691 time_expression: str,
692 query: str | None = None,
693 limit: int = 10,
694 project: str | None = None,
695) -> str:
696 """Search conversations within a specific time range using natural language."""
698 async def operation(db: ReflectionDatabase) -> str:
699 return await _search_temporal_operation(
700 db, time_expression, query, limit, project
701 )
703 return await execute_simple_database_tool(operation, "Temporal search")
706# ============================================================================
707# MCP Tool Registration
708# ============================================================================
711def register_search_tools(mcp: Any) -> None:
712 """Register all search-related MCP tools.
714 Args:
715 mcp: FastMCP server instance
717 """
719 @mcp.tool() # type: ignore[misc]
720 async def _optimize_search_results(
721 results: list[dict[str, Any]],
722 optimize_tokens: bool,
723 max_tokens: int,
724 query: str,
725 ) -> dict[str, Any]:
726 return await _optimize_search_results_impl(
727 results, optimize_tokens, max_tokens, query
728 )
730 @mcp.tool() # type: ignore[misc]
731 async def store_reflection(content: str, tags: list[str] | None = None) -> str:
732 return await _store_reflection_impl(content, tags)
734 @mcp.tool() # type: ignore[misc]
735 async def quick_search(
736 query: str, project: str | None = None, min_score: float = 0.7, limit: int = 5
737 ) -> str:
738 # Note: For quick search, we're using the limit to determine how many results to return,
739 # but the underlying implementation may not use this parameter directly
740 return await _quick_search_impl(query, project, min_score)
742 @mcp.tool() # type: ignore[misc]
743 async def search_summary(
744 query: str, project: str | None = None, min_score: float = 0.7
745 ) -> str:
746 return await _search_summary_impl(query, project, min_score)
748 @mcp.tool() # type: ignore[misc]
749 async def get_more_results(
750 query: str, offset: int = 3, limit: int = 3, project: str | None = None
751 ) -> str:
752 return await _get_more_results_impl(query, offset, limit, project)
754 @mcp.tool() # type: ignore[misc]
755 async def search_by_file(
756 file_path: str, limit: int = 10, project: str | None = None
757 ) -> str:
758 return await _search_by_file_impl(file_path, limit, project)
760 @mcp.tool() # type: ignore[misc]
761 async def search_by_concept(
762 concept: str,
763 include_files: bool = True,
764 limit: int = 10,
765 project: str | None = None,
766 ) -> str:
767 return await _search_by_concept_impl(concept, include_files, limit, project)
769 @mcp.tool() # type: ignore[misc]
770 async def reset_reflection_database() -> str:
771 return await _reset_reflection_database_impl()
773 @mcp.tool() # type: ignore[misc]
774 async def reflection_stats() -> str:
775 return await _reflection_stats_impl()
777 @mcp.tool() # type: ignore[misc]
778 async def search_code(
779 query: str,
780 pattern_type: str | None = None,
781 limit: int = 10,
782 project: str | None = None,
783 ) -> str:
784 return await _search_code_impl(query, pattern_type, limit, project)
786 @mcp.tool() # type: ignore[misc]
787 async def search_errors(
788 query: str,
789 error_type: str | None = None,
790 limit: int = 10,
791 project: str | None = None,
792 ) -> str:
793 return await _search_errors_impl(query, error_type, limit, project)
795 @mcp.tool() # type: ignore[misc]
796 async def search_temporal(
797 time_expression: str,
798 query: str | None = None,
799 limit: int = 10,
800 project: str | None = None,
801 ) -> str:
802 return await _search_temporal_impl(time_expression, query, limit, project)