Coverage for session_mgmt_mcp/advanced_search.py: 16.88%
234 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 05:22 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 05:22 -0700
1#!/usr/bin/env python3
2"""Advanced Search Engine for Session Management.
4Provides enhanced search capabilities with faceted filtering, full-text search,
5and intelligent result ranking.
6"""
8import asyncio
9import hashlib
10import json
11import re
12import time
13from dataclasses import dataclass, field
14from datetime import UTC, datetime, timedelta
15from typing import Any
17from .reflection_tools import ReflectionDatabase
18from .search_enhanced import EnhancedSearchEngine
21@dataclass
22class SearchFilter:
23 """Represents a search filter criterion."""
25 field: str
26 operator: str # 'eq', 'ne', 'in', 'not_in', 'contains', 'starts_with', 'ends_with', 'range'
27 value: str | list[str] | tuple[Any, Any]
28 negate: bool = False
31@dataclass
32class SearchFacet:
33 """Represents a search facet with possible values."""
35 name: str
36 values: list[tuple[str, int]] # (value, count) tuples
37 facet_type: str = "terms" # 'terms', 'range', 'date'
40@dataclass
41class SearchResult:
42 """Enhanced search result with metadata."""
44 content_id: str
45 content_type: str
46 title: str
47 content: str
48 score: float
49 project: str | None = None
50 timestamp: datetime | None = None
51 metadata: dict[str, Any] = field(default_factory=dict)
52 highlights: list[str] = field(default_factory=list)
53 facets: dict[str, Any] = field(default_factory=dict)
56class AdvancedSearchEngine:
57 """Advanced search engine with faceted filtering and full-text search."""
59 def __init__(self, reflection_db: ReflectionDatabase) -> None:
60 self.reflection_db = reflection_db
61 self.enhanced_search = EnhancedSearchEngine(reflection_db)
62 self.index_cache: dict[str, datetime] = {}
64 # Search configuration
65 self.facet_configs = {
66 "project": {"type": "terms", "size": 20},
67 "content_type": {"type": "terms", "size": 10},
68 "date_range": {
69 "type": "date",
70 "ranges": ["1d", "7d", "30d", "90d", "365d"],
71 },
72 "author": {"type": "terms", "size": 15},
73 "tags": {"type": "terms", "size": 25},
74 "file_type": {"type": "terms", "size": 10},
75 "language": {"type": "terms", "size": 10},
76 "error_type": {"type": "terms", "size": 15},
77 }
79 async def search(
80 self,
81 query: str,
82 filters: list[SearchFilter] | None = None,
83 facets: list[str] | None = None,
84 sort_by: str = "relevance", # 'relevance', 'date', 'project'
85 limit: int = 20,
86 offset: int = 0,
87 include_highlights: bool = True,
88 ) -> dict[str, Any]:
89 """Perform advanced search with faceted filtering."""
90 # Ensure search index is up to date
91 await self._ensure_search_index()
93 # Build search query
94 search_query = self._build_search_query(query, filters)
96 # Execute search
97 results = await self._execute_search(search_query, sort_by, limit, offset)
99 # Add highlights if requested
100 if include_highlights:
101 results = await self._add_highlights(results, query)
103 # Calculate facets if requested
104 facet_results = {}
105 if facets:
106 facet_results = await self._calculate_facets(query, filters, facets)
108 return {
109 "results": results,
110 "facets": facet_results,
111 "total": len(results),
112 "query": query,
113 "filters": [f.__dict__ for f in filters] if filters else [],
114 "took": time.time() - time.time(), # Will be updated with actual timing
115 }
117 async def suggest_completions(
118 self,
119 query: str,
120 field: str = "content",
121 limit: int = 10,
122 ) -> list[dict[str, Any]]:
123 """Get search completion suggestions."""
124 # Simple prefix matching for now - could be enhanced with more sophisticated algorithms
125 sql = f"""
126 SELECT DISTINCT {field}, COUNT(*) as frequency
127 FROM search_index
128 WHERE {field} LIKE ?
129 GROUP BY {field}
130 ORDER BY frequency DESC, {field}
131 LIMIT ?
132 """
134 results = await asyncio.get_event_loop().run_in_executor(
135 None,
136 lambda: self.reflection_db.conn.execute(
137 sql,
138 [f"%{query}%", limit],
139 ).fetchall(),
140 )
142 suggestions = []
143 for row in results:
144 suggestions.append({"text": row[0], "frequency": row[1]})
146 return suggestions
148 async def get_similar_content(
149 self,
150 content_id: str,
151 content_type: str,
152 limit: int = 5,
153 ) -> list[SearchResult]:
154 """Find similar content using embeddings or text similarity."""
155 # Get the source content
156 sql = """
157 SELECT indexed_content, search_metadata
158 FROM search_index
159 WHERE content_id = ? AND content_type = ?
160 """
162 result = await asyncio.get_event_loop().run_in_executor(
163 None,
164 lambda: self.reflection_db.conn.execute(
165 sql,
166 [content_id, content_type],
167 ).fetchone(),
168 )
170 if not result:
171 return []
173 source_content = result[0]
175 # Use enhanced search for similarity
176 similar_results = await self.reflection_db.search_conversations(
177 query=source_content[:500], # Use first 500 chars as query
178 limit=limit + 1, # +1 to exclude the source itself
179 )
181 # Convert to SearchResult format and exclude source
182 search_results = []
183 for conv in similar_results:
184 if conv.get("conversation_id") != content_id:
185 search_results.append(
186 SearchResult(
187 content_id=conv.get("conversation_id", ""),
188 content_type="conversation",
189 title=f"Conversation from {conv.get('project', 'Unknown')}",
190 content=conv.get("content", ""),
191 score=conv.get("score", 0.0),
192 project=conv.get("project"),
193 timestamp=conv.get("timestamp"),
194 metadata=conv.get("metadata", {}),
195 ),
196 )
198 return search_results[:limit]
200 async def search_by_timeframe(
201 self,
202 timeframe: str, # '1h', '1d', '1w', '1m', '1y' or ISO date range
203 query: str | None = None,
204 project: str | None = None,
205 limit: int = 20,
206 ) -> list[SearchResult]:
207 """Search within a specific timeframe."""
208 # Parse timeframe
209 start_time, end_time = self._parse_timeframe(timeframe)
211 # Build time filter
212 time_filter = SearchFilter(
213 field="timestamp",
214 operator="range",
215 value=(start_time, end_time),
216 )
218 filters = [time_filter]
219 if project:
220 filters.append(SearchFilter(field="project", operator="eq", value=project))
222 # Perform search
223 search_results = await self.search(
224 query=query or "*",
225 filters=filters,
226 limit=limit,
227 sort_by="date",
228 )
230 return search_results["results"]
232 async def aggregate_metrics(
233 self,
234 metric_type: str, # 'activity', 'projects', 'content_types', 'errors'
235 timeframe: str = "30d",
236 filters: list[SearchFilter] | None = None,
237 ) -> dict[str, Any]:
238 """Calculate aggregate metrics from search data."""
239 start_time, end_time = self._parse_timeframe(timeframe)
240 base_conditions = ["last_indexed BETWEEN ? AND ?"]
241 params = [start_time, end_time]
243 # Add filter conditions
244 if filters:
245 filter_conditions, filter_params = self._build_filter_conditions(filters)
246 base_conditions.extend(filter_conditions)
247 params.extend(filter_params)
249 where_clause = " WHERE " + " AND ".join(base_conditions)
251 if metric_type == "activity":
252 sql = f"""
253 SELECT DATE_TRUNC('day', last_indexed) as day,
254 COUNT(*) as count,
255 COUNT(DISTINCT content_id) as unique_content
256 FROM search_index
257 {where_clause}
258 GROUP BY day
259 ORDER BY day
260 """
262 elif metric_type == "projects":
263 sql = f"""
264 SELECT JSON_EXTRACT(search_metadata, '$.project') as project,
265 COUNT(*) as count
266 FROM search_index
267 {where_clause}
268 AND JSON_EXTRACT(search_metadata, '$.project') IS NOT NULL
269 GROUP BY project
270 ORDER BY count DESC
271 """
273 elif metric_type == "content_types":
274 sql = f"""
275 SELECT content_type, COUNT(*) as count
276 FROM search_index
277 {where_clause}
278 GROUP BY content_type
279 ORDER BY count DESC
280 """
282 elif metric_type == "errors":
283 sql = f"""
284 SELECT JSON_EXTRACT(search_metadata, '$.error_type') as error_type,
285 COUNT(*) as count
286 FROM search_index
287 {where_clause}
288 AND JSON_EXTRACT(search_metadata, '$.error_type') IS NOT NULL
289 GROUP BY error_type
290 ORDER BY count DESC
291 """
292 else:
293 return {"error": f"Unknown metric type: {metric_type}"}
295 results = await asyncio.get_event_loop().run_in_executor(
296 None,
297 lambda: self.reflection_db.conn.execute(sql, params).fetchall(),
298 )
300 return {
301 "metric_type": metric_type,
302 "timeframe": timeframe,
303 "data": [{"key": row[0], "value": row[1]} for row in results]
304 if results
305 else [],
306 }
308 async def _ensure_search_index(self) -> None:
309 """Ensure search index is up to date."""
310 # Check when index was last updated
311 last_update = await self._get_last_index_update()
313 # Update if older than 1 hour or if never updated
314 if not last_update or (datetime.now(UTC) - last_update).total_seconds() > 3600:
315 await self._rebuild_search_index()
317 async def _get_last_index_update(self) -> datetime | None:
318 """Get timestamp of last index update."""
319 sql = "SELECT MAX(last_indexed) FROM search_index"
321 result = await asyncio.get_event_loop().run_in_executor(
322 None,
323 lambda: self.reflection_db.conn.execute(sql).fetchone(),
324 )
326 return result[0] if result and result[0] else None
328 async def _rebuild_search_index(self) -> None:
329 """Rebuild the search index from conversations and reflections."""
330 # Index conversations
331 await self._index_conversations()
333 # Index reflections
334 await self._index_reflections()
336 # Update facets
337 await self._update_search_facets()
339 async def _index_conversations(self) -> None:
340 """Index all conversations for search."""
341 sql = "SELECT id, content, project, timestamp, metadata FROM conversations"
342 results = await asyncio.get_event_loop().run_in_executor(
343 None,
344 lambda: self.reflection_db.conn.execute(sql).fetchall(),
345 )
347 for row in results:
348 conv_id, content, project, timestamp, metadata_json = row
350 # Extract metadata
351 metadata = json.loads(metadata_json) if metadata_json else {}
353 # Create indexed content with metadata for better search
354 indexed_content = content
355 if project:
356 indexed_content += f" project:{project}"
358 # Extract technical terms and patterns
359 tech_terms = self._extract_technical_terms(content)
360 if tech_terms:
361 indexed_content += " " + " ".join(tech_terms)
363 # Create search metadata
364 search_metadata = {
365 "project": project,
366 "timestamp": timestamp.isoformat() if timestamp else None,
367 "content_length": len(content),
368 "technical_terms": tech_terms,
369 **metadata,
370 }
372 # Insert or update search index
373 await asyncio.get_event_loop().run_in_executor(
374 None,
375 lambda: self.reflection_db.conn.execute(
376 """
377 INSERT OR REPLACE INTO search_index
378 (id, content_type, content_id, indexed_content, search_metadata, last_indexed)
379 VALUES (?, ?, ?, ?, ?, ?)
380 """,
381 [
382 f"conv_{conv_id}",
383 "conversation",
384 conv_id,
385 indexed_content,
386 json.dumps(search_metadata),
387 datetime.now(UTC),
388 ],
389 ),
390 )
392 self.reflection_db.conn.commit()
394 async def _index_reflections(self) -> None:
395 """Index all reflections for search."""
396 sql = "SELECT id, content, tags, timestamp, metadata FROM reflections"
397 results = await asyncio.get_event_loop().run_in_executor(
398 None,
399 lambda: self.reflection_db.conn.execute(sql).fetchall(),
400 )
402 for row in results:
403 refl_id, content, tags, timestamp, metadata_json = row
405 # Extract metadata
406 metadata = json.loads(metadata_json) if metadata_json else {}
408 # Create indexed content
409 indexed_content = content
410 if tags:
411 indexed_content += " " + " ".join(f"tag:{tag}" for tag in tags)
413 # Create search metadata
414 search_metadata = {
415 "tags": tags or [],
416 "timestamp": timestamp.isoformat() if timestamp else None,
417 "content_length": len(content),
418 **metadata,
419 }
421 # Insert or update search index
422 await asyncio.get_event_loop().run_in_executor(
423 None,
424 lambda: self.reflection_db.conn.execute(
425 """
426 INSERT OR REPLACE INTO search_index
427 (id, content_type, content_id, indexed_content, search_metadata, last_indexed)
428 VALUES (?, ?, ?, ?, ?, ?)
429 """,
430 [
431 f"refl_{refl_id}",
432 "reflection",
433 refl_id,
434 indexed_content,
435 json.dumps(search_metadata),
436 datetime.now(UTC),
437 ],
438 ),
439 )
441 self.reflection_db.conn.commit()
443 async def _update_search_facets(self) -> None:
444 """Update search facets based on indexed content."""
445 # Clear existing facets
446 await asyncio.get_event_loop().run_in_executor(
447 None,
448 lambda: self.reflection_db.conn.execute("DELETE FROM search_facets"),
449 )
451 # Generate facets from search metadata
452 facet_queries = {
453 "project": "JSON_EXTRACT(search_metadata, '$.project')",
454 "content_type": "content_type",
455 "tags": "JSON_EXTRACT(search_metadata, '$.tags')",
456 "technical_terms": "JSON_EXTRACT(search_metadata, '$.technical_terms')",
457 }
459 for facet_name, facet_expr in facet_queries.items():
460 sql = f"""
461 SELECT {facet_expr} as facet_value, COUNT(*) as count
462 FROM search_index
463 WHERE {facet_expr} IS NOT NULL
464 GROUP BY facet_value
465 ORDER BY count DESC
466 """
468 results = await asyncio.get_event_loop().run_in_executor(
469 None,
470 lambda: self.reflection_db.conn.execute(sql).fetchall(),
471 )
473 for facet_value, _count in results:
474 if isinstance(facet_value, str) and facet_value:
475 facet_id = hashlib.md5(
476 f"{facet_name}_{facet_value}".encode(),
477 ).hexdigest()
479 await asyncio.get_event_loop().run_in_executor(
480 None,
481 lambda: self.reflection_db.conn.execute(
482 """
483 INSERT INTO search_facets
484 (id, content_type, content_id, facet_name, facet_value, created_at)
485 VALUES (?, ?, ?, ?, ?, ?)
486 """,
487 [
488 facet_id,
489 "search_facet",
490 f"{facet_name}_{facet_value}",
491 facet_name,
492 facet_value,
493 datetime.now(UTC),
494 ],
495 ),
496 )
498 self.reflection_db.conn.commit()
500 def _extract_technical_terms(self, content: str) -> list[str]:
501 """Extract technical terms and patterns from content."""
502 terms = []
504 # Programming language keywords
505 lang_patterns = {
506 "python": r"\b(def|class|import|from|try|except|if|else|for|while|return)\b",
507 "javascript": r"\b(function|const|let|var|async|await|=>|class|export|import)\b",
508 "sql": r"\b(SELECT|FROM|WHERE|JOIN|INSERT|UPDATE|DELETE|CREATE|TABLE)\b",
509 "error": r"\b(Error|Exception|Traceback|Failed|TypeError|ValueError)\b",
510 }
512 for lang, pattern in lang_patterns.items():
513 if re.search(pattern, content, re.IGNORECASE):
514 terms.append(lang)
516 # Extract function names
517 func_matches = re.findall(r"\bdef\s+(\w+)", content)
518 terms.extend([f"function:{func}" for func in func_matches[:5]]) # Limit to 5
520 # Extract class names
521 class_matches = re.findall(r"\bclass\s+(\w+)", content)
522 terms.extend([f"class:{cls}" for cls in class_matches[:5]])
524 # Extract file extensions
525 file_matches = re.findall(r"\.(\w{2,4})\b", content)
526 terms.extend([f"filetype:{ext}" for ext in set(file_matches[:10])])
528 return terms[:20] # Limit total terms
530 def _build_search_query(
531 self,
532 query: str,
533 filters: list[SearchFilter] | None,
534 ) -> str:
535 """Build search query with filters."""
536 # For now, return simple query - could be enhanced with query parsing
537 return query
539 def _build_filter_conditions(
540 self,
541 filters: list[SearchFilter],
542 ) -> tuple[list[str], list[Any]]:
543 """Build SQL conditions from filters."""
544 conditions = []
545 params = []
547 for filt in filters:
548 if filt.field == "timestamp" and filt.operator == "range":
549 start_time, end_time = filt.value
550 condition = (
551 "JSON_EXTRACT(search_metadata, '$.timestamp') BETWEEN ? AND ?"
552 )
553 conditions.append(f"{'NOT ' if filt.negate else ''}{condition}")
554 params.extend([start_time.isoformat(), end_time.isoformat()])
556 elif filt.operator == "eq":
557 condition = f"JSON_EXTRACT(search_metadata, '$.{filt.field}') = ?"
558 conditions.append(f"{'NOT ' if filt.negate else ''}{condition}")
559 params.append(filt.value)
561 elif filt.operator == "contains":
562 condition = "indexed_content LIKE ?"
563 conditions.append(f"{'NOT ' if filt.negate else ''}{condition}")
564 params.append(f"%{filt.value}%")
566 return conditions, params
568 async def _execute_search(
569 self,
570 query: str,
571 sort_by: str,
572 limit: int,
573 offset: int,
574 ) -> list[SearchResult]:
575 """Execute the actual search."""
576 # Simple text search for now - could be enhanced with full-text search
577 sql = """
578 SELECT content_id, content_type, indexed_content, search_metadata, last_indexed
579 FROM search_index
580 WHERE indexed_content LIKE ?
581 """
582 params = [f"%{query}%"]
584 # Add sorting
585 if sort_by == "date":
586 sql += " ORDER BY last_indexed DESC"
587 elif sort_by == "project":
588 sql += " ORDER BY JSON_EXTRACT(search_metadata, '$.project')"
589 else: # relevance - simple for now
590 sql += " ORDER BY LENGTH(indexed_content) DESC" # Longer content = more relevant
592 sql += " LIMIT ? OFFSET ?"
593 params.extend([limit, offset])
595 results = await asyncio.get_event_loop().run_in_executor(
596 None,
597 lambda: self.reflection_db.conn.execute(sql, params).fetchall(),
598 )
600 search_results = []
601 for row in results:
602 (
603 content_id,
604 content_type,
605 indexed_content,
606 search_metadata_json,
607 last_indexed,
608 ) = row
610 metadata = json.loads(search_metadata_json) if search_metadata_json else {}
612 search_results.append(
613 SearchResult(
614 content_id=content_id,
615 content_type=content_type,
616 title=f"{content_type.title()} from {metadata.get('project', 'Unknown')}",
617 content=indexed_content[:500] + "..."
618 if len(indexed_content) > 500
619 else indexed_content,
620 score=0.8, # Simple scoring for now
621 project=metadata.get("project"),
622 timestamp=datetime.fromisoformat(metadata["timestamp"])
623 if metadata.get("timestamp")
624 else last_indexed,
625 metadata=metadata,
626 ),
627 )
629 return search_results
631 async def _add_highlights(
632 self,
633 results: list[SearchResult],
634 query: str,
635 ) -> list[SearchResult]:
636 """Add search highlights to results."""
637 query_terms = query.lower().split()
639 for result in results:
640 highlights = []
641 content_lower = result.content.lower()
643 for term in query_terms:
644 if term in content_lower:
645 # Find context around the term
646 start_pos = content_lower.find(term)
647 context_start = max(0, start_pos - 50)
648 context_end = min(len(result.content), start_pos + len(term) + 50)
650 highlight = result.content[context_start:context_end]
651 highlight = highlight.replace(term, f"<mark>{term}</mark>")
652 highlights.append(highlight)
654 result.highlights = highlights[:3] # Limit to 3 highlights
656 return results
658 async def _calculate_facets(
659 self,
660 query: str,
661 filters: list[SearchFilter] | None,
662 requested_facets: list[str],
663 ) -> dict[str, SearchFacet]:
664 """Calculate facet counts for search results."""
665 facets = {}
667 for facet_name in requested_facets:
668 if facet_name in self.facet_configs:
669 facet_config = self.facet_configs[facet_name]
671 sql = """
672 SELECT facet_value, COUNT(*) as count
673 FROM search_facets sf
674 JOIN search_index si ON sf.content_id = si.id
675 WHERE sf.facet_name = ? AND si.indexed_content LIKE ?
676 GROUP BY facet_value
677 ORDER BY count DESC
678 LIMIT ?
679 """
681 results = await asyncio.get_event_loop().run_in_executor(
682 None,
683 lambda: self.reflection_db.conn.execute(
684 sql,
685 [facet_name, f"%{query}%", facet_config["size"]],
686 ).fetchall(),
687 )
689 facets[facet_name] = SearchFacet(
690 name=facet_name,
691 values=[(row[0], row[1]) for row in results],
692 facet_type=facet_config["type"],
693 )
695 return facets
697 def _parse_timeframe(self, timeframe: str) -> tuple[datetime, datetime]:
698 """Parse timeframe string into start and end times."""
699 now = datetime.now(UTC)
701 if timeframe == "1h":
702 start_time = now - timedelta(hours=1)
703 elif timeframe == "1d":
704 start_time = now - timedelta(days=1)
705 elif timeframe == "1w":
706 start_time = now - timedelta(weeks=1)
707 elif timeframe == "1m":
708 start_time = now - timedelta(days=30)
709 elif timeframe == "1y":
710 start_time = now - timedelta(days=365)
711 else:
712 # Try to parse as ISO date range or default to 30 days
713 start_time = now - timedelta(days=30)
715 return start_time, now