Coverage for session_mgmt_mcp/advanced_search.py: 16.88%

234 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-01 05:22 -0700

1#!/usr/bin/env python3 

2"""Advanced Search Engine for Session Management. 

3 

4Provides enhanced search capabilities with faceted filtering, full-text search, 

5and intelligent result ranking. 

6""" 

7 

8import asyncio 

9import hashlib 

10import json 

11import re 

12import time 

13from dataclasses import dataclass, field 

14from datetime import UTC, datetime, timedelta 

15from typing import Any 

16 

17from .reflection_tools import ReflectionDatabase 

18from .search_enhanced import EnhancedSearchEngine 

19 

20 

21@dataclass 

22class SearchFilter: 

23 """Represents a search filter criterion.""" 

24 

25 field: str 

26 operator: str # 'eq', 'ne', 'in', 'not_in', 'contains', 'starts_with', 'ends_with', 'range' 

27 value: str | list[str] | tuple[Any, Any] 

28 negate: bool = False 

29 

30 

31@dataclass 

32class SearchFacet: 

33 """Represents a search facet with possible values.""" 

34 

35 name: str 

36 values: list[tuple[str, int]] # (value, count) tuples 

37 facet_type: str = "terms" # 'terms', 'range', 'date' 

38 

39 

40@dataclass 

41class SearchResult: 

42 """Enhanced search result with metadata.""" 

43 

44 content_id: str 

45 content_type: str 

46 title: str 

47 content: str 

48 score: float 

49 project: str | None = None 

50 timestamp: datetime | None = None 

51 metadata: dict[str, Any] = field(default_factory=dict) 

52 highlights: list[str] = field(default_factory=list) 

53 facets: dict[str, Any] = field(default_factory=dict) 

54 

55 

56class AdvancedSearchEngine: 

57 """Advanced search engine with faceted filtering and full-text search.""" 

58 

59 def __init__(self, reflection_db: ReflectionDatabase) -> None: 

60 self.reflection_db = reflection_db 

61 self.enhanced_search = EnhancedSearchEngine(reflection_db) 

62 self.index_cache: dict[str, datetime] = {} 

63 

64 # Search configuration 

65 self.facet_configs = { 

66 "project": {"type": "terms", "size": 20}, 

67 "content_type": {"type": "terms", "size": 10}, 

68 "date_range": { 

69 "type": "date", 

70 "ranges": ["1d", "7d", "30d", "90d", "365d"], 

71 }, 

72 "author": {"type": "terms", "size": 15}, 

73 "tags": {"type": "terms", "size": 25}, 

74 "file_type": {"type": "terms", "size": 10}, 

75 "language": {"type": "terms", "size": 10}, 

76 "error_type": {"type": "terms", "size": 15}, 

77 } 

78 

79 async def search( 

80 self, 

81 query: str, 

82 filters: list[SearchFilter] | None = None, 

83 facets: list[str] | None = None, 

84 sort_by: str = "relevance", # 'relevance', 'date', 'project' 

85 limit: int = 20, 

86 offset: int = 0, 

87 include_highlights: bool = True, 

88 ) -> dict[str, Any]: 

89 """Perform advanced search with faceted filtering.""" 

90 # Ensure search index is up to date 

91 await self._ensure_search_index() 

92 

93 # Build search query 

94 search_query = self._build_search_query(query, filters) 

95 

96 # Execute search 

97 results = await self._execute_search(search_query, sort_by, limit, offset) 

98 

99 # Add highlights if requested 

100 if include_highlights: 

101 results = await self._add_highlights(results, query) 

102 

103 # Calculate facets if requested 

104 facet_results = {} 

105 if facets: 

106 facet_results = await self._calculate_facets(query, filters, facets) 

107 

108 return { 

109 "results": results, 

110 "facets": facet_results, 

111 "total": len(results), 

112 "query": query, 

113 "filters": [f.__dict__ for f in filters] if filters else [], 

114 "took": time.time() - time.time(), # Will be updated with actual timing 

115 } 

116 

117 async def suggest_completions( 

118 self, 

119 query: str, 

120 field: str = "content", 

121 limit: int = 10, 

122 ) -> list[dict[str, Any]]: 

123 """Get search completion suggestions.""" 

124 # Simple prefix matching for now - could be enhanced with more sophisticated algorithms 

125 sql = f""" 

126 SELECT DISTINCT {field}, COUNT(*) as frequency 

127 FROM search_index 

128 WHERE {field} LIKE ? 

129 GROUP BY {field} 

130 ORDER BY frequency DESC, {field} 

131 LIMIT ? 

132 """ 

133 

134 results = await asyncio.get_event_loop().run_in_executor( 

135 None, 

136 lambda: self.reflection_db.conn.execute( 

137 sql, 

138 [f"%{query}%", limit], 

139 ).fetchall(), 

140 ) 

141 

142 suggestions = [] 

143 for row in results: 

144 suggestions.append({"text": row[0], "frequency": row[1]}) 

145 

146 return suggestions 

147 

148 async def get_similar_content( 

149 self, 

150 content_id: str, 

151 content_type: str, 

152 limit: int = 5, 

153 ) -> list[SearchResult]: 

154 """Find similar content using embeddings or text similarity.""" 

155 # Get the source content 

156 sql = """ 

157 SELECT indexed_content, search_metadata 

158 FROM search_index 

159 WHERE content_id = ? AND content_type = ? 

160 """ 

161 

162 result = await asyncio.get_event_loop().run_in_executor( 

163 None, 

164 lambda: self.reflection_db.conn.execute( 

165 sql, 

166 [content_id, content_type], 

167 ).fetchone(), 

168 ) 

169 

170 if not result: 

171 return [] 

172 

173 source_content = result[0] 

174 

175 # Use enhanced search for similarity 

176 similar_results = await self.reflection_db.search_conversations( 

177 query=source_content[:500], # Use first 500 chars as query 

178 limit=limit + 1, # +1 to exclude the source itself 

179 ) 

180 

181 # Convert to SearchResult format and exclude source 

182 search_results = [] 

183 for conv in similar_results: 

184 if conv.get("conversation_id") != content_id: 

185 search_results.append( 

186 SearchResult( 

187 content_id=conv.get("conversation_id", ""), 

188 content_type="conversation", 

189 title=f"Conversation from {conv.get('project', 'Unknown')}", 

190 content=conv.get("content", ""), 

191 score=conv.get("score", 0.0), 

192 project=conv.get("project"), 

193 timestamp=conv.get("timestamp"), 

194 metadata=conv.get("metadata", {}), 

195 ), 

196 ) 

197 

198 return search_results[:limit] 

199 

200 async def search_by_timeframe( 

201 self, 

202 timeframe: str, # '1h', '1d', '1w', '1m', '1y' or ISO date range 

203 query: str | None = None, 

204 project: str | None = None, 

205 limit: int = 20, 

206 ) -> list[SearchResult]: 

207 """Search within a specific timeframe.""" 

208 # Parse timeframe 

209 start_time, end_time = self._parse_timeframe(timeframe) 

210 

211 # Build time filter 

212 time_filter = SearchFilter( 

213 field="timestamp", 

214 operator="range", 

215 value=(start_time, end_time), 

216 ) 

217 

218 filters = [time_filter] 

219 if project: 

220 filters.append(SearchFilter(field="project", operator="eq", value=project)) 

221 

222 # Perform search 

223 search_results = await self.search( 

224 query=query or "*", 

225 filters=filters, 

226 limit=limit, 

227 sort_by="date", 

228 ) 

229 

230 return search_results["results"] 

231 

232 async def aggregate_metrics( 

233 self, 

234 metric_type: str, # 'activity', 'projects', 'content_types', 'errors' 

235 timeframe: str = "30d", 

236 filters: list[SearchFilter] | None = None, 

237 ) -> dict[str, Any]: 

238 """Calculate aggregate metrics from search data.""" 

239 start_time, end_time = self._parse_timeframe(timeframe) 

240 base_conditions = ["last_indexed BETWEEN ? AND ?"] 

241 params = [start_time, end_time] 

242 

243 # Add filter conditions 

244 if filters: 

245 filter_conditions, filter_params = self._build_filter_conditions(filters) 

246 base_conditions.extend(filter_conditions) 

247 params.extend(filter_params) 

248 

249 where_clause = " WHERE " + " AND ".join(base_conditions) 

250 

251 if metric_type == "activity": 

252 sql = f""" 

253 SELECT DATE_TRUNC('day', last_indexed) as day, 

254 COUNT(*) as count, 

255 COUNT(DISTINCT content_id) as unique_content 

256 FROM search_index 

257 {where_clause} 

258 GROUP BY day 

259 ORDER BY day 

260 """ 

261 

262 elif metric_type == "projects": 

263 sql = f""" 

264 SELECT JSON_EXTRACT(search_metadata, '$.project') as project, 

265 COUNT(*) as count 

266 FROM search_index 

267 {where_clause} 

268 AND JSON_EXTRACT(search_metadata, '$.project') IS NOT NULL 

269 GROUP BY project 

270 ORDER BY count DESC 

271 """ 

272 

273 elif metric_type == "content_types": 

274 sql = f""" 

275 SELECT content_type, COUNT(*) as count 

276 FROM search_index 

277 {where_clause} 

278 GROUP BY content_type 

279 ORDER BY count DESC 

280 """ 

281 

282 elif metric_type == "errors": 

283 sql = f""" 

284 SELECT JSON_EXTRACT(search_metadata, '$.error_type') as error_type, 

285 COUNT(*) as count 

286 FROM search_index 

287 {where_clause} 

288 AND JSON_EXTRACT(search_metadata, '$.error_type') IS NOT NULL 

289 GROUP BY error_type 

290 ORDER BY count DESC 

291 """ 

292 else: 

293 return {"error": f"Unknown metric type: {metric_type}"} 

294 

295 results = await asyncio.get_event_loop().run_in_executor( 

296 None, 

297 lambda: self.reflection_db.conn.execute(sql, params).fetchall(), 

298 ) 

299 

300 return { 

301 "metric_type": metric_type, 

302 "timeframe": timeframe, 

303 "data": [{"key": row[0], "value": row[1]} for row in results] 

304 if results 

305 else [], 

306 } 

307 

308 async def _ensure_search_index(self) -> None: 

309 """Ensure search index is up to date.""" 

310 # Check when index was last updated 

311 last_update = await self._get_last_index_update() 

312 

313 # Update if older than 1 hour or if never updated 

314 if not last_update or (datetime.now(UTC) - last_update).total_seconds() > 3600: 

315 await self._rebuild_search_index() 

316 

317 async def _get_last_index_update(self) -> datetime | None: 

318 """Get timestamp of last index update.""" 

319 sql = "SELECT MAX(last_indexed) FROM search_index" 

320 

321 result = await asyncio.get_event_loop().run_in_executor( 

322 None, 

323 lambda: self.reflection_db.conn.execute(sql).fetchone(), 

324 ) 

325 

326 return result[0] if result and result[0] else None 

327 

328 async def _rebuild_search_index(self) -> None: 

329 """Rebuild the search index from conversations and reflections.""" 

330 # Index conversations 

331 await self._index_conversations() 

332 

333 # Index reflections 

334 await self._index_reflections() 

335 

336 # Update facets 

337 await self._update_search_facets() 

338 

339 async def _index_conversations(self) -> None: 

340 """Index all conversations for search.""" 

341 sql = "SELECT id, content, project, timestamp, metadata FROM conversations" 

342 results = await asyncio.get_event_loop().run_in_executor( 

343 None, 

344 lambda: self.reflection_db.conn.execute(sql).fetchall(), 

345 ) 

346 

347 for row in results: 

348 conv_id, content, project, timestamp, metadata_json = row 

349 

350 # Extract metadata 

351 metadata = json.loads(metadata_json) if metadata_json else {} 

352 

353 # Create indexed content with metadata for better search 

354 indexed_content = content 

355 if project: 

356 indexed_content += f" project:{project}" 

357 

358 # Extract technical terms and patterns 

359 tech_terms = self._extract_technical_terms(content) 

360 if tech_terms: 

361 indexed_content += " " + " ".join(tech_terms) 

362 

363 # Create search metadata 

364 search_metadata = { 

365 "project": project, 

366 "timestamp": timestamp.isoformat() if timestamp else None, 

367 "content_length": len(content), 

368 "technical_terms": tech_terms, 

369 **metadata, 

370 } 

371 

372 # Insert or update search index 

373 await asyncio.get_event_loop().run_in_executor( 

374 None, 

375 lambda: self.reflection_db.conn.execute( 

376 """ 

377 INSERT OR REPLACE INTO search_index 

378 (id, content_type, content_id, indexed_content, search_metadata, last_indexed) 

379 VALUES (?, ?, ?, ?, ?, ?) 

380 """, 

381 [ 

382 f"conv_{conv_id}", 

383 "conversation", 

384 conv_id, 

385 indexed_content, 

386 json.dumps(search_metadata), 

387 datetime.now(UTC), 

388 ], 

389 ), 

390 ) 

391 

392 self.reflection_db.conn.commit() 

393 

394 async def _index_reflections(self) -> None: 

395 """Index all reflections for search.""" 

396 sql = "SELECT id, content, tags, timestamp, metadata FROM reflections" 

397 results = await asyncio.get_event_loop().run_in_executor( 

398 None, 

399 lambda: self.reflection_db.conn.execute(sql).fetchall(), 

400 ) 

401 

402 for row in results: 

403 refl_id, content, tags, timestamp, metadata_json = row 

404 

405 # Extract metadata 

406 metadata = json.loads(metadata_json) if metadata_json else {} 

407 

408 # Create indexed content 

409 indexed_content = content 

410 if tags: 

411 indexed_content += " " + " ".join(f"tag:{tag}" for tag in tags) 

412 

413 # Create search metadata 

414 search_metadata = { 

415 "tags": tags or [], 

416 "timestamp": timestamp.isoformat() if timestamp else None, 

417 "content_length": len(content), 

418 **metadata, 

419 } 

420 

421 # Insert or update search index 

422 await asyncio.get_event_loop().run_in_executor( 

423 None, 

424 lambda: self.reflection_db.conn.execute( 

425 """ 

426 INSERT OR REPLACE INTO search_index 

427 (id, content_type, content_id, indexed_content, search_metadata, last_indexed) 

428 VALUES (?, ?, ?, ?, ?, ?) 

429 """, 

430 [ 

431 f"refl_{refl_id}", 

432 "reflection", 

433 refl_id, 

434 indexed_content, 

435 json.dumps(search_metadata), 

436 datetime.now(UTC), 

437 ], 

438 ), 

439 ) 

440 

441 self.reflection_db.conn.commit() 

442 

443 async def _update_search_facets(self) -> None: 

444 """Update search facets based on indexed content.""" 

445 # Clear existing facets 

446 await asyncio.get_event_loop().run_in_executor( 

447 None, 

448 lambda: self.reflection_db.conn.execute("DELETE FROM search_facets"), 

449 ) 

450 

451 # Generate facets from search metadata 

452 facet_queries = { 

453 "project": "JSON_EXTRACT(search_metadata, '$.project')", 

454 "content_type": "content_type", 

455 "tags": "JSON_EXTRACT(search_metadata, '$.tags')", 

456 "technical_terms": "JSON_EXTRACT(search_metadata, '$.technical_terms')", 

457 } 

458 

459 for facet_name, facet_expr in facet_queries.items(): 

460 sql = f""" 

461 SELECT {facet_expr} as facet_value, COUNT(*) as count 

462 FROM search_index 

463 WHERE {facet_expr} IS NOT NULL 

464 GROUP BY facet_value 

465 ORDER BY count DESC 

466 """ 

467 

468 results = await asyncio.get_event_loop().run_in_executor( 

469 None, 

470 lambda: self.reflection_db.conn.execute(sql).fetchall(), 

471 ) 

472 

473 for facet_value, _count in results: 

474 if isinstance(facet_value, str) and facet_value: 

475 facet_id = hashlib.md5( 

476 f"{facet_name}_{facet_value}".encode(), 

477 ).hexdigest() 

478 

479 await asyncio.get_event_loop().run_in_executor( 

480 None, 

481 lambda: self.reflection_db.conn.execute( 

482 """ 

483 INSERT INTO search_facets 

484 (id, content_type, content_id, facet_name, facet_value, created_at) 

485 VALUES (?, ?, ?, ?, ?, ?) 

486 """, 

487 [ 

488 facet_id, 

489 "search_facet", 

490 f"{facet_name}_{facet_value}", 

491 facet_name, 

492 facet_value, 

493 datetime.now(UTC), 

494 ], 

495 ), 

496 ) 

497 

498 self.reflection_db.conn.commit() 

499 

500 def _extract_technical_terms(self, content: str) -> list[str]: 

501 """Extract technical terms and patterns from content.""" 

502 terms = [] 

503 

504 # Programming language keywords 

505 lang_patterns = { 

506 "python": r"\b(def|class|import|from|try|except|if|else|for|while|return)\b", 

507 "javascript": r"\b(function|const|let|var|async|await|=>|class|export|import)\b", 

508 "sql": r"\b(SELECT|FROM|WHERE|JOIN|INSERT|UPDATE|DELETE|CREATE|TABLE)\b", 

509 "error": r"\b(Error|Exception|Traceback|Failed|TypeError|ValueError)\b", 

510 } 

511 

512 for lang, pattern in lang_patterns.items(): 

513 if re.search(pattern, content, re.IGNORECASE): 

514 terms.append(lang) 

515 

516 # Extract function names 

517 func_matches = re.findall(r"\bdef\s+(\w+)", content) 

518 terms.extend([f"function:{func}" for func in func_matches[:5]]) # Limit to 5 

519 

520 # Extract class names 

521 class_matches = re.findall(r"\bclass\s+(\w+)", content) 

522 terms.extend([f"class:{cls}" for cls in class_matches[:5]]) 

523 

524 # Extract file extensions 

525 file_matches = re.findall(r"\.(\w{2,4})\b", content) 

526 terms.extend([f"filetype:{ext}" for ext in set(file_matches[:10])]) 

527 

528 return terms[:20] # Limit total terms 

529 

530 def _build_search_query( 

531 self, 

532 query: str, 

533 filters: list[SearchFilter] | None, 

534 ) -> str: 

535 """Build search query with filters.""" 

536 # For now, return simple query - could be enhanced with query parsing 

537 return query 

538 

539 def _build_filter_conditions( 

540 self, 

541 filters: list[SearchFilter], 

542 ) -> tuple[list[str], list[Any]]: 

543 """Build SQL conditions from filters.""" 

544 conditions = [] 

545 params = [] 

546 

547 for filt in filters: 

548 if filt.field == "timestamp" and filt.operator == "range": 

549 start_time, end_time = filt.value 

550 condition = ( 

551 "JSON_EXTRACT(search_metadata, '$.timestamp') BETWEEN ? AND ?" 

552 ) 

553 conditions.append(f"{'NOT ' if filt.negate else ''}{condition}") 

554 params.extend([start_time.isoformat(), end_time.isoformat()]) 

555 

556 elif filt.operator == "eq": 

557 condition = f"JSON_EXTRACT(search_metadata, '$.{filt.field}') = ?" 

558 conditions.append(f"{'NOT ' if filt.negate else ''}{condition}") 

559 params.append(filt.value) 

560 

561 elif filt.operator == "contains": 

562 condition = "indexed_content LIKE ?" 

563 conditions.append(f"{'NOT ' if filt.negate else ''}{condition}") 

564 params.append(f"%{filt.value}%") 

565 

566 return conditions, params 

567 

568 async def _execute_search( 

569 self, 

570 query: str, 

571 sort_by: str, 

572 limit: int, 

573 offset: int, 

574 ) -> list[SearchResult]: 

575 """Execute the actual search.""" 

576 # Simple text search for now - could be enhanced with full-text search 

577 sql = """ 

578 SELECT content_id, content_type, indexed_content, search_metadata, last_indexed 

579 FROM search_index 

580 WHERE indexed_content LIKE ? 

581 """ 

582 params = [f"%{query}%"] 

583 

584 # Add sorting 

585 if sort_by == "date": 

586 sql += " ORDER BY last_indexed DESC" 

587 elif sort_by == "project": 

588 sql += " ORDER BY JSON_EXTRACT(search_metadata, '$.project')" 

589 else: # relevance - simple for now 

590 sql += " ORDER BY LENGTH(indexed_content) DESC" # Longer content = more relevant 

591 

592 sql += " LIMIT ? OFFSET ?" 

593 params.extend([limit, offset]) 

594 

595 results = await asyncio.get_event_loop().run_in_executor( 

596 None, 

597 lambda: self.reflection_db.conn.execute(sql, params).fetchall(), 

598 ) 

599 

600 search_results = [] 

601 for row in results: 

602 ( 

603 content_id, 

604 content_type, 

605 indexed_content, 

606 search_metadata_json, 

607 last_indexed, 

608 ) = row 

609 

610 metadata = json.loads(search_metadata_json) if search_metadata_json else {} 

611 

612 search_results.append( 

613 SearchResult( 

614 content_id=content_id, 

615 content_type=content_type, 

616 title=f"{content_type.title()} from {metadata.get('project', 'Unknown')}", 

617 content=indexed_content[:500] + "..." 

618 if len(indexed_content) > 500 

619 else indexed_content, 

620 score=0.8, # Simple scoring for now 

621 project=metadata.get("project"), 

622 timestamp=datetime.fromisoformat(metadata["timestamp"]) 

623 if metadata.get("timestamp") 

624 else last_indexed, 

625 metadata=metadata, 

626 ), 

627 ) 

628 

629 return search_results 

630 

631 async def _add_highlights( 

632 self, 

633 results: list[SearchResult], 

634 query: str, 

635 ) -> list[SearchResult]: 

636 """Add search highlights to results.""" 

637 query_terms = query.lower().split() 

638 

639 for result in results: 

640 highlights = [] 

641 content_lower = result.content.lower() 

642 

643 for term in query_terms: 

644 if term in content_lower: 

645 # Find context around the term 

646 start_pos = content_lower.find(term) 

647 context_start = max(0, start_pos - 50) 

648 context_end = min(len(result.content), start_pos + len(term) + 50) 

649 

650 highlight = result.content[context_start:context_end] 

651 highlight = highlight.replace(term, f"<mark>{term}</mark>") 

652 highlights.append(highlight) 

653 

654 result.highlights = highlights[:3] # Limit to 3 highlights 

655 

656 return results 

657 

658 async def _calculate_facets( 

659 self, 

660 query: str, 

661 filters: list[SearchFilter] | None, 

662 requested_facets: list[str], 

663 ) -> dict[str, SearchFacet]: 

664 """Calculate facet counts for search results.""" 

665 facets = {} 

666 

667 for facet_name in requested_facets: 

668 if facet_name in self.facet_configs: 

669 facet_config = self.facet_configs[facet_name] 

670 

671 sql = """ 

672 SELECT facet_value, COUNT(*) as count 

673 FROM search_facets sf 

674 JOIN search_index si ON sf.content_id = si.id 

675 WHERE sf.facet_name = ? AND si.indexed_content LIKE ? 

676 GROUP BY facet_value 

677 ORDER BY count DESC 

678 LIMIT ? 

679 """ 

680 

681 results = await asyncio.get_event_loop().run_in_executor( 

682 None, 

683 lambda: self.reflection_db.conn.execute( 

684 sql, 

685 [facet_name, f"%{query}%", facet_config["size"]], 

686 ).fetchall(), 

687 ) 

688 

689 facets[facet_name] = SearchFacet( 

690 name=facet_name, 

691 values=[(row[0], row[1]) for row in results], 

692 facet_type=facet_config["type"], 

693 ) 

694 

695 return facets 

696 

697 def _parse_timeframe(self, timeframe: str) -> tuple[datetime, datetime]: 

698 """Parse timeframe string into start and end times.""" 

699 now = datetime.now(UTC) 

700 

701 if timeframe == "1h": 

702 start_time = now - timedelta(hours=1) 

703 elif timeframe == "1d": 

704 start_time = now - timedelta(days=1) 

705 elif timeframe == "1w": 

706 start_time = now - timedelta(weeks=1) 

707 elif timeframe == "1m": 

708 start_time = now - timedelta(days=30) 

709 elif timeframe == "1y": 

710 start_time = now - timedelta(days=365) 

711 else: 

712 # Try to parse as ISO date range or default to 30 days 

713 start_time = now - timedelta(days=30) 

714 

715 return start_time, now