Coverage for session_buddy / advanced_search.py: 67.20%

374 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1#!/usr/bin/env python3 

2"""Advanced Search Engine for Session Management. 

3 

4Provides enhanced search capabilities with faceted filtering, full-text search, 

5and intelligent result ranking. 

6""" 

7 

8import hashlib 

9import json 

10import time 

11from datetime import UTC, datetime 

12from typing import Any 

13 

14from .reflection_tools import ReflectionDatabase 

15from .search_enhanced import EnhancedSearchEngine 

16from .session_types import SQLCondition 

17from .utils.search import ( 

18 SearchFacet, 

19 SearchFilter, 

20 SearchResult, 

21 ensure_timezone, 

22 extract_technical_terms, 

23 parse_timeframe, 

24 parse_timeframe_single, 

25 truncate_content, 

26) 

27 

28__all__ = ["AdvancedSearchEngine", "SearchFilter"] 

29 

30 

31class AdvancedSearchEngine: 

32 """Advanced search engine with faceted filtering and full-text search.""" 

33 

34 def __init__(self, reflection_db: ReflectionDatabase) -> None: 

35 self.reflection_db = reflection_db 

36 self.enhanced_search = EnhancedSearchEngine(reflection_db) 

37 self.index_cache: dict[str, datetime] = {} 

38 

39 # Search configuration 

40 self.facet_configs = { 

41 "project": {"type": "terms", "size": 20}, 

42 "content_type": {"type": "terms", "size": 10}, 

43 "date_range": { 

44 "type": "date", 

45 "ranges": ["1d", "7d", "30d", "90d", "365d"], 

46 }, 

47 "author": {"type": "terms", "size": 15}, 

48 "tags": {"type": "terms", "size": 25}, 

49 "file_type": {"type": "terms", "size": 10}, 

50 "language": {"type": "terms", "size": 10}, 

51 "error_type": {"type": "terms", "size": 15}, 

52 } 

53 

54 async def search( 

55 self, 

56 query: str, 

57 filters: list[SearchFilter] | None = None, 

58 facets: list[str] | None = None, 

59 sort_by: str = "relevance", # 'relevance', 'date', 'project' 

60 limit: int = 20, 

61 offset: int = 0, 

62 include_highlights: bool = True, 

63 content_type: str | None = None, 

64 timeframe: str | None = None, 

65 ) -> dict[str, Any]: 

66 """Perform advanced search with faceted filtering.""" 

67 # Ensure search index is up to date 

68 await self._ensure_search_index() 

69 

70 # Build and execute search 

71 search_query = self._build_search_query(query, filters) 

72 results = await self._execute_search( 

73 search_query, 

74 sort_by, 

75 limit, 

76 offset, 

77 filters, 

78 content_type, 

79 timeframe, 

80 ) 

81 

82 # Process results with optional features 

83 results = await self._process_search_results(results, query, include_highlights) 

84 facet_results = await self._process_facets(query, filters, facets) 

85 

86 return self._format_search_response(results, facet_results, query, filters) 

87 

88 async def _process_search_results( 

89 self, 

90 results: list[Any], 

91 query: str, 

92 include_highlights: bool, 

93 ) -> list[Any]: 

94 """Process search results with optional highlighting.""" 

95 if include_highlights: 95 ↛ 97line 95 didn't jump to line 97 because the condition on line 95 was always true

96 return await self._add_highlights(results, query) 

97 return results 

98 

99 async def _process_facets( 

100 self, 

101 query: str, 

102 filters: list[SearchFilter] | None, 

103 facets: list[str] | None, 

104 ) -> dict[str, Any]: 

105 """Process facets if requested.""" 

106 if facets: 

107 return await self._calculate_facets(query, filters, facets) 

108 return {} 

109 

110 def _format_search_response( 

111 self, 

112 results: list[Any], 

113 facet_results: dict[str, Any], 

114 query: str, 

115 filters: list[SearchFilter] | None, 

116 ) -> dict[str, Any]: 

117 """Format the final search response.""" 

118 return { 

119 "results": results, 

120 "facets": facet_results, 

121 "total": len(results), 

122 "query": query, 

123 "filters": [f.__dict__ for f in filters] if filters else [], 

124 "took": time.time() - time.time(), # Will be updated with actual timing 

125 } 

126 

127 async def suggest_completions( 

128 self, 

129 query: str, 

130 field: str = "indexed_content", 

131 limit: int = 10, 

132 ) -> list[dict[str, Any]]: 

133 """Get search completion suggestions.""" 

134 # Use parameterized queries with predefined SQL patterns to prevent injection 

135 field_queries = { 

136 "content": """ 

137 SELECT DISTINCT indexed_content, COUNT(*) as frequency 

138 FROM search_index 

139 WHERE indexed_content LIKE ? 

140 GROUP BY indexed_content 

141 ORDER BY frequency DESC, indexed_content 

142 LIMIT ? 

143 """, 

144 "project": """ 

145 SELECT DISTINCT JSON_EXTRACT_STRING(search_metadata, '$.project'), COUNT(*) as frequency 

146 FROM search_index 

147 WHERE JSON_EXTRACT_STRING(search_metadata, '$.project') LIKE ? 

148 GROUP BY JSON_EXTRACT_STRING(search_metadata, '$.project') 

149 ORDER BY frequency DESC, JSON_EXTRACT_STRING(search_metadata, '$.project') 

150 LIMIT ? 

151 """, 

152 "tags": """ 

153 SELECT DISTINCT JSON_EXTRACT_STRING(search_metadata, '$.tags'), COUNT(*) as frequency 

154 FROM search_index 

155 WHERE JSON_EXTRACT_STRING(search_metadata, '$.tags') LIKE ? 

156 GROUP BY JSON_EXTRACT_STRING(search_metadata, '$.tags') 

157 ORDER BY frequency DESC, JSON_EXTRACT_STRING(search_metadata, '$.tags') 

158 LIMIT ? 

159 """, 

160 } 

161 

162 # Use predefined query or default to content search 

163 sql = field_queries.get(field, field_queries["content"]) 

164 

165 if not self.reflection_db.conn: 165 ↛ 166line 165 didn't jump to line 166 because the condition on line 165 was never true

166 return [] 

167 

168 try: 

169 results = self.reflection_db.conn.execute( 

170 sql, 

171 [f"%{query}%", limit], 

172 ).fetchall() 

173 

174 return [{"text": row[0], "frequency": row[1]} for row in results] 

175 except Exception: 

176 # Table doesn't exist yet, will be created during index rebuild 

177 return [] 

178 

179 async def get_similar_content( 

180 self, 

181 content_id: str, 

182 content_type: str, 

183 limit: int = 5, 

184 ) -> list[SearchResult]: 

185 """Find similar content using embeddings or text similarity.""" 

186 # Get the source content 

187 sql = """ 

188 SELECT indexed_content, search_metadata 

189 FROM search_index 

190 WHERE content_id = ? AND content_type = ? 

191 """ 

192 

193 if not self.reflection_db.conn: 193 ↛ 194line 193 didn't jump to line 194 because the condition on line 193 was never true

194 return [] 

195 

196 try: 

197 result = self.reflection_db.conn.execute( 

198 sql, 

199 [content_id, content_type], 

200 ).fetchone() 

201 

202 if not result: 

203 return [] 

204 except Exception: 

205 # Table doesn't exist yet, will be created during index rebuild 

206 return [] 

207 

208 source_content = result[0] 

209 

210 # Use enhanced search for similarity 

211 similar_results = await self.reflection_db.search_conversations( 

212 query=source_content[:500], # Use first 500 chars as query 

213 limit=limit + 1, # +1 to exclude the source itself 

214 ) 

215 

216 # Convert to SearchResult format and exclude source 

217 search_results = [ 

218 SearchResult( 

219 content_id=conv.get("conversation_id", ""), 

220 content_type="conversation", 

221 title=f"Conversation from {conv.get('project', 'Unknown')}", 

222 content=conv.get("content", ""), 

223 score=conv.get("score", 0.0), 

224 project=conv.get("project"), 

225 timestamp=conv.get("timestamp"), 

226 metadata=conv.get("metadata", {}), 

227 ) 

228 for conv in similar_results 

229 if conv.get("conversation_id") != content_id 

230 ] 

231 

232 return search_results[:limit] 

233 

234 async def search_by_timeframe( 

235 self, 

236 timeframe: str, # '1h', '1d', '1w', '1m', '1y' or ISO date range 

237 query: str | None = None, 

238 project: str | None = None, 

239 limit: int = 20, 

240 ) -> list[dict[str, Any]]: 

241 """Search within a specific timeframe.""" 

242 # Parse timeframe 

243 time_range = parse_timeframe(timeframe) 

244 start_time, end_time = time_range.start, time_range.end 

245 

246 # Build time filter 

247 time_filter = SearchFilter( 

248 field="timestamp", 

249 operator="range", 

250 value=(start_time, end_time), 

251 ) 

252 

253 filters = [time_filter] 

254 if project: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true

255 filters.append(SearchFilter(field="project", operator="eq", value=project)) 

256 

257 # Perform search 

258 search_results = await self.search( 

259 query=query or "*", 

260 filters=filters, 

261 limit=limit, 

262 sort_by="date", 

263 ) 

264 

265 # Convert SearchResult objects to dictionaries for compatibility 

266 result_dicts = [] 

267 for result in search_results["results"]: 267 ↛ 268line 267 didn't jump to line 268 because the loop on line 267 never started

268 result_dict = { 

269 "content_id": result.content_id, 

270 "content_type": result.content_type, 

271 "title": result.title, 

272 "content": result.content, 

273 "score": result.score, 

274 "project": result.project, 

275 "timestamp": result.timestamp, 

276 "metadata": result.metadata, 

277 "highlights": result.highlights, 

278 "facets": result.facets, 

279 } 

280 result_dicts.append(result_dict) 

281 

282 return result_dicts 

283 

284 async def aggregate_metrics( 

285 self, 

286 metric_type: str, # 'activity', 'projects', 'content_types', 'errors' 

287 timeframe: str = "30d", 

288 filters: list[SearchFilter] | None = None, 

289 ) -> dict[str, Any]: 

290 """Calculate aggregate metrics from search data.""" 

291 time_range = parse_timeframe(timeframe) 

292 start_time, end_time = time_range.start, time_range.end 

293 

294 # Use predefined parameterized queries to prevent SQL injection 

295 metric_queries = { 

296 "activity": """ 

297 SELECT DATE_TRUNC('day', last_indexed) as day, 

298 COUNT(*) as count, 

299 COUNT(DISTINCT content_id) as unique_content 

300 FROM search_index 

301 WHERE last_indexed BETWEEN ? AND ? 

302 GROUP BY day 

303 ORDER BY day 

304 """, 

305 "projects": """ 

306 SELECT JSON_EXTRACT_STRING(search_metadata, '$.project') as project, 

307 COUNT(*) as count 

308 FROM search_index 

309 WHERE last_indexed BETWEEN ? AND ? 

310 AND JSON_EXTRACT_STRING(search_metadata, '$.project') IS NOT NULL 

311 GROUP BY project 

312 ORDER BY count DESC 

313 """, 

314 "content_types": """ 

315 SELECT content_type, COUNT(*) as count 

316 FROM search_index 

317 WHERE last_indexed BETWEEN ? AND ? 

318 GROUP BY content_type 

319 ORDER BY count DESC 

320 """, 

321 "errors": """ 

322 SELECT JSON_EXTRACT_STRING(search_metadata, '$.error_type') as error_type, 

323 COUNT(*) as count 

324 FROM search_index 

325 WHERE last_indexed BETWEEN ? AND ? 

326 AND JSON_EXTRACT_STRING(search_metadata, '$.error_type') IS NOT NULL 

327 GROUP BY error_type 

328 ORDER BY count DESC 

329 """, 

330 } 

331 

332 if metric_type not in metric_queries: 

333 return {"error": f"Unknown metric type: {metric_type}"} 

334 

335 sql = metric_queries[metric_type] 

336 

337 if not self.reflection_db.conn: 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true

338 return { 

339 "error": f"Database connection not available for metric type: {metric_type}", 

340 } 

341 

342 try: 

343 # Use simplified parameters for the base time range 

344 base_params = [start_time, end_time] 

345 results = self.reflection_db.conn.execute(sql, base_params).fetchall() 

346 

347 return { 

348 "metric_type": metric_type, 

349 "timeframe": timeframe, 

350 "data": [{"key": row[0], "value": row[1]} for row in results] 

351 if results 

352 else [], 

353 } 

354 except Exception: 

355 # Table doesn't exist yet, will be created during index rebuild 

356 return { 

357 "metric_type": metric_type, 

358 "timeframe": timeframe, 

359 "data": [], 

360 } 

361 

362 async def _ensure_search_index(self) -> None: 

363 """Ensure search index is up to date.""" 

364 # Check when index was last updated 

365 last_update = await self._get_last_index_update() 

366 

367 # Update if older than 1 hour or if never updated 

368 if not last_update or (datetime.now(UTC) - last_update).total_seconds() > 3600: 368 ↛ exitline 368 didn't return from function '_ensure_search_index' because the condition on line 368 was always true

369 await self._rebuild_search_index() 

370 

371 async def _get_last_index_update(self) -> datetime | None: 

372 """Get timestamp of last index update.""" 

373 sql = "SELECT MAX(last_indexed) FROM search_index" 

374 

375 if not self.reflection_db.conn: 375 ↛ 376line 375 didn't jump to line 376 because the condition on line 375 was never true

376 return None 

377 

378 try: 

379 result = self.reflection_db.conn.execute(sql).fetchone() 

380 

381 if result and result[0]: 381 ↛ 387line 381 didn't jump to line 387 because the condition on line 381 was always true

382 dt = result[0] 

383 # Ensure datetime is timezone-aware 

384 if isinstance(dt, datetime) and dt.tzinfo is None: 384 ↛ 386line 384 didn't jump to line 386 because the condition on line 384 was always true

385 dt = dt.replace(tzinfo=UTC) 

386 return dt if isinstance(dt, datetime) else None 

387 return None 

388 except Exception: 

389 # Table doesn't exist yet, will be created during index rebuild 

390 return None 

391 

392 async def _rebuild_search_index(self) -> None: 

393 """Rebuild the search index from conversations and reflections.""" 

394 # Ensure database tables exist before indexing 

395 if self.reflection_db.conn: 395 ↛ 399line 395 didn't jump to line 399 because the condition on line 395 was always true

396 await self.reflection_db._ensure_tables() 

397 

398 # Index conversations 

399 await self._index_conversations() 

400 

401 # Index reflections 

402 await self._index_reflections() 

403 

404 # Update facets 

405 await self._update_search_facets() 

406 

407 async def _index_conversations(self) -> None: 

408 """Index all conversations for search.""" 

409 if not self.reflection_db.conn: 409 ↛ 410line 409 didn't jump to line 410 because the condition on line 409 was never true

410 return 

411 

412 conversations = self._fetch_conversations_for_indexing() 

413 for row in conversations: 

414 await self._process_conversation_for_index(row) 

415 

416 self._commit_conversation_index() 

417 

418 def _fetch_conversations_for_indexing(self) -> list[tuple[Any, ...]]: 

419 """Fetch conversations from database for indexing.""" 

420 if not self.reflection_db.conn: 420 ↛ 421line 420 didn't jump to line 421 because the condition on line 420 was never true

421 return [] 

422 

423 sql = "SELECT id, content, project, timestamp, metadata FROM conversations" 

424 return self.reflection_db.conn.execute(sql).fetchall() 

425 

426 async def _process_conversation_for_index(self, row: tuple[Any, ...]) -> None: 

427 """Process a single conversation row for search indexing.""" 

428 conv_id, content, project, timestamp, metadata_json = row 

429 

430 self._parse_conversation_metadata(metadata_json) 

431 indexed_content = self._build_indexed_content(content, project) 

432 search_metadata = self._build_conversation_search_metadata( 

433 project, 

434 timestamp, 

435 content, 

436 indexed_content, 

437 ) 

438 

439 self._insert_conversation_into_search_index( 

440 conv_id, 

441 indexed_content, 

442 search_metadata, 

443 ) 

444 

445 def _parse_conversation_metadata(self, metadata_json: str | None) -> dict[str, Any]: 

446 """Parse conversation metadata JSON safely.""" 

447 return json.loads(metadata_json) if metadata_json else {} 

448 

449 def _build_indexed_content(self, content: str, project: str | None) -> str: 

450 """Build indexed content with project and technical terms.""" 

451 indexed_content = content 

452 

453 if project: 453 ↛ 456line 453 didn't jump to line 456 because the condition on line 453 was always true

454 indexed_content += f" project:{project}" 

455 

456 tech_terms = extract_technical_terms(content) 

457 if tech_terms: 

458 indexed_content += " " + " ".join(tech_terms) 

459 

460 return indexed_content 

461 

462 def _build_conversation_search_metadata( 

463 self, 

464 project: str | None, 

465 timestamp: datetime | None, 

466 content: str, 

467 indexed_content: str, 

468 ) -> dict[str, Any]: 

469 """Build search metadata for conversation.""" 

470 tech_terms = extract_technical_terms(content) 

471 return { 

472 "project": project, 

473 "timestamp": timestamp.isoformat() if timestamp else None, 

474 "content_length": len(content), 

475 "technical_terms": tech_terms, 

476 } 

477 

478 def _insert_conversation_into_search_index( 

479 self, 

480 conv_id: str, 

481 indexed_content: str, 

482 search_metadata: dict[str, Any], 

483 ) -> None: 

484 """Insert conversation into search index.""" 

485 if not self.reflection_db.conn: 485 ↛ 486line 485 didn't jump to line 486 because the condition on line 485 was never true

486 return 

487 

488 self.reflection_db.conn.execute( 

489 """ 

490 INSERT INTO search_index 

491 (id, content_type, content_id, indexed_content, search_metadata, last_indexed) 

492 VALUES (?, ?, ?, ?, ?, ?) 

493 ON CONFLICT (id) DO UPDATE SET 

494 content_type = EXCLUDED.content_type, 

495 content_id = EXCLUDED.content_id, 

496 indexed_content = EXCLUDED.indexed_content, 

497 search_metadata = EXCLUDED.search_metadata, 

498 last_indexed = EXCLUDED.last_indexed 

499 """, 

500 [ 

501 f"conv_{conv_id}", 

502 "conversation", 

503 conv_id, 

504 indexed_content, 

505 json.dumps(search_metadata), 

506 datetime.now(UTC), 

507 ], 

508 ) 

509 

510 def _commit_conversation_index(self) -> None: 

511 """Commit the conversation indexing transaction.""" 

512 if self.reflection_db.conn: 512 ↛ exitline 512 didn't return from function '_commit_conversation_index' because the condition on line 512 was always true

513 self.reflection_db.conn.commit() 

514 

515 async def _index_reflections(self) -> None: 

516 """Index all reflections for search.""" 

517 if not self.reflection_db.conn: 517 ↛ 518line 517 didn't jump to line 518 because the condition on line 517 was never true

518 return 

519 

520 sql = "SELECT id, content, tags, timestamp, metadata FROM reflections" 

521 results = self.reflection_db.conn.execute(sql).fetchall() 

522 

523 for row in results: 

524 refl_id, content, tags, timestamp, metadata_json = row 

525 

526 # Extract metadata 

527 metadata: dict[str, Any] = ( 

528 json.loads(metadata_json) if metadata_json else {} 

529 ) 

530 

531 # Create indexed content 

532 indexed_content = content 

533 if tags: 533 ↛ 537line 533 didn't jump to line 537 because the condition on line 533 was always true

534 indexed_content += " " + " ".join(f"tag:{tag}" for tag in tags) 

535 

536 # Create search metadata 

537 base_metadata: dict[str, Any] = { 

538 "tags": tags or [], 

539 "timestamp": timestamp.isoformat() if timestamp else None, 

540 "content_length": len(content), 

541 } 

542 search_metadata: dict[str, Any] = base_metadata | metadata 

543 

544 # Insert or update search index 

545 if self.reflection_db.conn: 545 ↛ 523line 545 didn't jump to line 523 because the condition on line 545 was always true

546 self.reflection_db.conn.execute( 

547 """ 

548 INSERT INTO search_index 

549 (id, content_type, content_id, indexed_content, search_metadata, last_indexed) 

550 VALUES (?, ?, ?, ?, ?, ?) 

551 ON CONFLICT (id) DO UPDATE SET 

552 content_type = EXCLUDED.content_type, 

553 content_id = EXCLUDED.content_id, 

554 indexed_content = EXCLUDED.indexed_content, 

555 search_metadata = EXCLUDED.search_metadata, 

556 last_indexed = EXCLUDED.last_indexed 

557 """, 

558 [ 

559 f"refl_{refl_id}", 

560 "reflection", 

561 refl_id, 

562 indexed_content, 

563 json.dumps(search_metadata), 

564 datetime.now(UTC), 

565 ], 

566 ) 

567 

568 if self.reflection_db.conn: 568 ↛ exitline 568 didn't return from function '_index_reflections' because the condition on line 568 was always true

569 self.reflection_db.conn.commit() 

570 

571 def _get_facet_queries(self) -> dict[str, str]: 

572 """Get facet query definitions.""" 

573 return { 

574 "project": """ 

575 SELECT JSON_EXTRACT_STRING(search_metadata, '$.project') as facet_value, COUNT(*) as count 

576 FROM search_index 

577 WHERE JSON_EXTRACT_STRING(search_metadata, '$.project') IS NOT NULL 

578 GROUP BY facet_value 

579 ORDER BY count DESC 

580 """, 

581 "content_type": """ 

582 SELECT content_type as facet_value, COUNT(*) as count 

583 FROM search_index 

584 WHERE content_type IS NOT NULL 

585 GROUP BY facet_value 

586 ORDER BY count DESC 

587 """, 

588 "tags": """ 

589 SELECT JSON_EXTRACT_STRING(search_metadata, '$.tags') as facet_value, COUNT(*) as count 

590 FROM search_index 

591 WHERE JSON_EXTRACT_STRING(search_metadata, '$.tags') IS NOT NULL 

592 GROUP BY facet_value 

593 ORDER BY count DESC 

594 """, 

595 "technical_terms": """ 

596 SELECT JSON_EXTRACT_STRING(search_metadata, '$.technical_terms') as facet_value, COUNT(*) as count 

597 FROM search_index 

598 WHERE JSON_EXTRACT_STRING(search_metadata, '$.technical_terms') IS NOT NULL 

599 GROUP BY facet_value 

600 ORDER BY count DESC 

601 """, 

602 } 

603 

604 def _should_process_facet_value(self, facet_value: Any) -> bool: 

605 """Check if facet value should be processed.""" 

606 return isinstance(facet_value, str) and bool(facet_value) 

607 

608 def _insert_facet_value(self, facet_name: str, facet_value: str) -> None: 

609 """Insert a single facet value into the database.""" 

610 if not self.reflection_db.conn: 610 ↛ 611line 610 didn't jump to line 611 because the condition on line 610 was never true

611 return 

612 

613 facet_id = hashlib.md5( 

614 f"{facet_name}_{facet_value}".encode(), 

615 usedforsecurity=False, 

616 ).hexdigest() 

617 

618 self.reflection_db.conn.execute( 

619 """ 

620 INSERT INTO search_facets 

621 (id, content_type, content_id, facet_name, facet_value, created_at) 

622 VALUES (?, ?, ?, ?, ?, ?) 

623 ON CONFLICT (id) DO UPDATE SET 

624 content_type = EXCLUDED.content_type, 

625 content_id = EXCLUDED.content_id, 

626 facet_name = EXCLUDED.facet_name, 

627 facet_value = EXCLUDED.facet_value, 

628 created_at = EXCLUDED.created_at 

629 """, 

630 [ 

631 facet_id, 

632 "search_facet", 

633 f"{facet_name}_{facet_value}", 

634 facet_name, 

635 facet_value, 

636 datetime.now(UTC).isoformat(), 

637 ], 

638 ) 

639 

640 def _process_facet_query(self, facet_name: str, sql: str) -> None: 

641 """Process a single facet query.""" 

642 if not self.reflection_db.conn: 642 ↛ 643line 642 didn't jump to line 643 because the condition on line 642 was never true

643 return 

644 

645 try: 

646 results = self.reflection_db.conn.execute(sql).fetchall() 

647 

648 for facet_value, _count in results: 

649 if self._should_process_facet_value(facet_value): 649 ↛ 648line 649 didn't jump to line 648 because the condition on line 649 was always true

650 self._insert_facet_value(facet_name, facet_value) 

651 except Exception: 

652 # Table doesn't exist yet, will be created during index rebuild 

653 return 

654 

655 async def _update_search_facets(self) -> None: 

656 """Update search facets based on indexed content.""" 

657 if not self.reflection_db.conn: 657 ↛ 658line 657 didn't jump to line 658 because the condition on line 657 was never true

658 return 

659 

660 # Clear existing facets 

661 self.reflection_db.conn.execute("DELETE FROM search_facets") 

662 

663 # Process each facet query 

664 facet_queries = self._get_facet_queries() 

665 for facet_name, sql in facet_queries.items(): 

666 self._process_facet_query(facet_name, sql) 

667 

668 # Commit all changes 

669 if self.reflection_db.conn: 669 ↛ exitline 669 didn't return from function '_update_search_facets' because the condition on line 669 was always true

670 self.reflection_db.conn.commit() 

671 

672 def _build_search_query( 

673 self, 

674 query: str, 

675 filters: list[SearchFilter] | None, 

676 ) -> str: 

677 """Build search query with filters.""" 

678 # For now, return simple query - could be enhanced with query parsing 

679 return query 

680 

681 def _build_filter_conditions( 

682 self, 

683 filters: list[SearchFilter], 

684 ) -> SQLCondition: 

685 """Build SQL conditions from filters.""" 

686 conditions = [] 

687 params: list[str | datetime] = [] 

688 

689 for filt in filters: 

690 condition_result = self._build_single_filter_condition(filt) 

691 if condition_result: 

692 conditions.append(condition_result.condition) 

693 params.extend(condition_result.params) 

694 

695 return SQLCondition(condition=" AND ".join(conditions), params=params) 

696 

697 def _build_single_filter_condition(self, filt: SearchFilter) -> SQLCondition | None: 

698 """Build a single filter condition.""" 

699 if filt.field == "timestamp" and filt.operator == "range": 

700 return self._build_timestamp_range_condition(filt) 

701 if filt.operator == "eq": 

702 return self._build_equality_condition(filt) 

703 if filt.operator == "contains": 

704 return self._build_contains_condition(filt) 

705 return None 

706 

707 def _build_timestamp_range_condition( 

708 self, 

709 filt: SearchFilter, 

710 ) -> SQLCondition | None: 

711 """Build timestamp range condition.""" 

712 if not isinstance(filt.value, tuple | list) or len(filt.value) != 2: 

713 return None # Skip invalid range values 

714 

715 start_time, end_time = filt.value[0], filt.value[1] 

716 condition = "last_indexed BETWEEN ? AND ?" 

717 negated_condition = f"{'NOT ' if filt.negate else ''}{condition}" 

718 

719 # Ensure params are strings or datetime objects 

720 params: list[str | datetime] = [] 

721 if isinstance(start_time, str | datetime): 

722 params.append(start_time) 

723 if isinstance(end_time, str | datetime): 

724 params.append(end_time) 

725 

726 return SQLCondition(condition=negated_condition, params=params) 

727 

728 def _build_equality_condition(self, filt: SearchFilter) -> SQLCondition: 

729 """Build equality condition.""" 

730 condition = f"JSON_EXTRACT_STRING(search_metadata, '$.{filt.field}') = ?" 

731 negated_condition = f"{'NOT ' if filt.negate else ''}{condition}" 

732 

733 # Ensure value is a string or datetime 

734 params: list[str | datetime] = [] 

735 if isinstance(filt.value, str | datetime): 

736 params.append(filt.value) 

737 elif ( 

738 isinstance(filt.value, list) 

739 and filt.value 

740 and isinstance(filt.value[0], str | datetime) 

741 ): 

742 params.append(filt.value[0]) 

743 

744 return SQLCondition(condition=negated_condition, params=params) 

745 

746 def _build_contains_condition(self, filt: SearchFilter) -> SQLCondition: 

747 """Build contains condition.""" 

748 condition = "indexed_content LIKE ?" 

749 negated_condition = f"{'NOT ' if filt.negate else ''}{condition}" 

750 

751 # Ensure value is a string 

752 value_str = str(filt.value) if not isinstance(filt.value, str) else filt.value 

753 params: list[str | datetime] = [f"%{value_str}%"] 

754 

755 return SQLCondition(condition=negated_condition, params=params) 

756 

757 async def _execute_search( 

758 self, 

759 query: str, 

760 sort_by: str, 

761 limit: int, 

762 offset: int, 

763 filters: list[SearchFilter] | None = None, 

764 content_type: str | None = None, 

765 timeframe: str | None = None, 

766 ) -> list[SearchResult]: 

767 """Execute the actual search.""" 

768 sql_result = self._build_search_sql( 

769 query, 

770 content_type, 

771 timeframe, 

772 filters, 

773 sort_by, 

774 limit, 

775 offset, 

776 ) 

777 

778 if not self.reflection_db.conn: 778 ↛ 779line 778 didn't jump to line 779 because the condition on line 778 was never true

779 return [] 

780 

781 try: 

782 results = self.reflection_db.conn.execute( 

783 sql_result.condition, 

784 self._prepare_sql_params(sql_result.params), 

785 ).fetchall() 

786 return self._convert_sql_results_to_search_results(results) 

787 except Exception: 

788 # Table doesn't exist yet, will be created during index rebuild 

789 return [] 

790 

791 def _build_search_sql( 

792 self, 

793 query: str, 

794 content_type: str | None, 

795 timeframe: str | None, 

796 filters: list[SearchFilter] | None, 

797 sort_by: str, 

798 limit: int, 

799 offset: int, 

800 ) -> SQLCondition: 

801 """Build complete SQL query for search.""" 

802 sql = """ 

803 SELECT content_id, content_type, indexed_content, search_metadata, last_indexed 

804 FROM search_index 

805 WHERE indexed_content LIKE ? 

806 """ 

807 params: list[str | datetime] = [f"%{query}%"] 

808 

809 result = self._add_content_type_filter(sql, params, content_type) 

810 sql, params = result.condition, result.params 

811 

812 result = self._add_timeframe_filter(sql, params, timeframe, content_type) 

813 sql, params = result.condition, result.params 

814 

815 result = self._add_filter_conditions_to_sql(sql, params, filters) 

816 sql, params = result.condition, result.params 

817 

818 sql = self._add_sorting_to_sql(sql, sort_by) 

819 sql += " LIMIT ? OFFSET ?" 

820 params.extend([str(limit), str(offset)]) 

821 

822 return SQLCondition(condition=sql, params=params) 

823 

824 def _get_sql_field(self, field: str) -> str: 

825 """Map filter field to SQL column expression.""" 

826 field_mappings = { 

827 "content_type": "content_type", 

828 "last_indexed": "last_indexed", 

829 "project": "JSON_EXTRACT_STRING(search_metadata, '$.project')", 

830 "timestamp": "JSON_EXTRACT_STRING(search_metadata, '$.timestamp')", 

831 "author": "JSON_EXTRACT_STRING(search_metadata, '$.author')", 

832 "tags": "JSON_EXTRACT_STRING(search_metadata, '$.tags')", 

833 } 

834 return field_mappings.get( 

835 field, 

836 f"JSON_EXTRACT_STRING(search_metadata, '$.{field}')", 

837 ) 

838 

839 def _apply_eq_filter( 

840 self, 

841 sql_field: str, 

842 negation: str, 

843 value: Any, 

844 params: list[str | datetime], 

845 ) -> tuple[str, list[str | datetime]]: 

846 """Apply equality filter.""" 

847 sql_part = f" AND {negation}{sql_field} = ?" 

848 params.append(str(value)) 

849 return sql_part, params 

850 

851 def _apply_ne_filter( 

852 self, 

853 sql_field: str, 

854 negation: str, 

855 value: Any, 

856 params: list[str | datetime], 

857 ) -> tuple[str, list[str | datetime]]: 

858 """Apply not-equal filter.""" 

859 sql_part = f" AND {negation}{sql_field} != ?" 

860 params.append(str(value)) 

861 return sql_part, params 

862 

863 def _apply_in_filter( 

864 self, 

865 sql_field: str, 

866 negation: str, 

867 value: Any, 

868 params: list[str | datetime], 

869 ) -> tuple[str, list[str | datetime]]: 

870 """Apply IN filter.""" 

871 if isinstance(value, list): 

872 placeholders = ", ".join("?" * len(value)) 

873 sql_part = f" AND {negation}{sql_field} IN ({placeholders})" 

874 params.extend([str(v) for v in value]) 

875 return sql_part, params 

876 return "", params 

877 

878 def _apply_not_in_filter( 

879 self, 

880 sql_field: str, 

881 negation: str, 

882 value: Any, 

883 params: list[str | datetime], 

884 ) -> tuple[str, list[str | datetime]]: 

885 """Apply NOT IN filter.""" 

886 if isinstance(value, list): 

887 placeholders = ", ".join("?" * len(value)) 

888 sql_part = f" AND {negation}{sql_field} NOT IN ({placeholders})" 

889 params.extend([str(v) for v in value]) 

890 return sql_part, params 

891 return "", params 

892 

893 def _apply_contains_filter( 

894 self, 

895 sql_field: str, 

896 negation: str, 

897 value: Any, 

898 params: list[str | datetime], 

899 ) -> tuple[str, list[str | datetime]]: 

900 """Apply CONTAINS filter.""" 

901 sql_part = f" AND {negation}{sql_field} LIKE ?" 

902 params.append(f"%{value}%") 

903 return sql_part, params 

904 

905 def _apply_starts_with_filter( 

906 self, 

907 sql_field: str, 

908 negation: str, 

909 value: Any, 

910 params: list[str | datetime], 

911 ) -> tuple[str, list[str | datetime]]: 

912 """Apply STARTS_WITH filter.""" 

913 sql_part = f" AND {negation}{sql_field} LIKE ?" 

914 params.append(f"{value}%") 

915 return sql_part, params 

916 

917 def _apply_ends_with_filter( 

918 self, 

919 sql_field: str, 

920 negation: str, 

921 value: Any, 

922 params: list[str | datetime], 

923 ) -> tuple[str, list[str | datetime]]: 

924 """Apply ENDS_WITH filter.""" 

925 sql_part = f" AND {negation}{sql_field} LIKE ?" 

926 params.append(f"%{value}") 

927 return sql_part, params 

928 

929 def _apply_range_filter( 

930 self, 

931 sql_field: str, 

932 filter_obj: SearchFilter, 

933 params: list[str | datetime], 

934 ) -> tuple[str, list[str | datetime]]: 

935 """Apply RANGE filter.""" 

936 if isinstance(filter_obj.value, tuple) and len(filter_obj.value) == 2: 936 ↛ 943line 936 didn't jump to line 943 because the condition on line 936 was always true

937 if filter_obj.negate: 937 ↛ 938line 937 didn't jump to line 938 because the condition on line 937 was never true

938 sql_part = f" AND ({sql_field} < ? OR {sql_field} > ?)" 

939 else: 

940 sql_part = f" AND {sql_field} BETWEEN ? AND ?" 

941 params.extend([str(filter_obj.value[0]), str(filter_obj.value[1])]) 

942 return sql_part, params 

943 return "", params 

944 

945 def _add_filter_conditions_to_sql( 

946 self, 

947 sql: str, 

948 params: list[str | datetime], 

949 filters: list[SearchFilter] | None, 

950 ) -> SQLCondition: 

951 """Add custom filter conditions to SQL query.""" 

952 if not filters: 

953 return SQLCondition(condition=sql, params=params) 

954 

955 for filter_obj in filters: 

956 sql_field = self._get_sql_field(filter_obj.field) 

957 negation = "NOT " if filter_obj.negate else "" 

958 

959 # Dispatch to appropriate filter handler 

960 operator_handlers = { 

961 "eq": lambda: self._apply_eq_filter( 

962 sql_field, 

963 negation, 

964 filter_obj.value, 

965 params, 

966 ), 

967 "ne": lambda: self._apply_ne_filter( 

968 sql_field, 

969 negation, 

970 filter_obj.value, 

971 params, 

972 ), 

973 "in": lambda: self._apply_in_filter( 

974 sql_field, 

975 negation, 

976 filter_obj.value, 

977 params, 

978 ), 

979 "not_in": lambda: self._apply_not_in_filter( 

980 sql_field, 

981 negation, 

982 filter_obj.value, 

983 params, 

984 ), 

985 "contains": lambda: self._apply_contains_filter( 

986 sql_field, 

987 negation, 

988 filter_obj.value, 

989 params, 

990 ), 

991 "starts_with": lambda: self._apply_starts_with_filter( 

992 sql_field, 

993 negation, 

994 filter_obj.value, 

995 params, 

996 ), 

997 "ends_with": lambda: self._apply_ends_with_filter( 

998 sql_field, 

999 negation, 

1000 filter_obj.value, 

1001 params, 

1002 ), 

1003 "range": lambda: self._apply_range_filter( 

1004 sql_field, 

1005 filter_obj, 

1006 params, 

1007 ), 

1008 } 

1009 

1010 handler = operator_handlers.get(filter_obj.operator) 

1011 if handler: 1011 ↛ 955line 1011 didn't jump to line 955 because the condition on line 1011 was always true

1012 sql_part, params = handler() 

1013 sql += sql_part 

1014 

1015 return SQLCondition(condition=sql, params=params) 

1016 

1017 def _add_content_type_filter( 

1018 self, 

1019 sql: str, 

1020 params: list[str | datetime], 

1021 content_type: str | None, 

1022 ) -> SQLCondition: 

1023 """Add content type filter to SQL query.""" 

1024 if content_type: 

1025 sql += " AND content_type = ?" 

1026 params.append(content_type) 

1027 return SQLCondition(condition=sql, params=params) 

1028 

1029 def _add_timeframe_filter( 

1030 self, 

1031 sql: str, 

1032 params: list[str | datetime], 

1033 timeframe: str | None, 

1034 content_type: str | None, 

1035 ) -> SQLCondition: 

1036 """Add timeframe filter to SQL query.""" 

1037 if ( 1037 ↛ 1040line 1037 didn't jump to line 1040 because the condition on line 1037 was never true

1038 timeframe and content_type 

1039 ): # Only add timeframe if content_type is also specified 

1040 cutoff_date = parse_timeframe_single(timeframe) 

1041 if cutoff_date: 

1042 sql += " AND last_indexed >= ?" 

1043 params.append(cutoff_date.isoformat()) 

1044 return SQLCondition(condition=sql, params=params) 

1045 

1046 def _add_sorting_to_sql(self, sql: str, sort_by: str) -> str: 

1047 """Add sorting clause to SQL query.""" 

1048 if sort_by == "date": 

1049 sql += " ORDER BY last_indexed DESC" 

1050 elif sort_by == "project": 1050 ↛ 1051line 1050 didn't jump to line 1051 because the condition on line 1050 was never true

1051 sql += " ORDER BY JSON_EXTRACT_STRING(search_metadata, '$.project')" 

1052 else: # relevance - simple for now 

1053 sql += " ORDER BY LENGTH(indexed_content) DESC" # Longer content = more relevant 

1054 return sql 

1055 

1056 def _prepare_sql_params(self, params: list[str | datetime]) -> list[str]: 

1057 """Prepare parameters for SQL execution.""" 

1058 return [ 

1059 param.isoformat() if isinstance(param, datetime) else str(param) 

1060 for param in params 

1061 ] 

1062 

1063 def _convert_sql_results_to_search_results( 

1064 self, 

1065 results: list[tuple[Any, ...]], 

1066 ) -> list[SearchResult]: 

1067 """Convert SQL results to SearchResult objects.""" 

1068 search_results = [] 

1069 for row in results: 

1070 ( 

1071 content_id, 

1072 content_type, 

1073 indexed_content, 

1074 search_metadata_json, 

1075 last_indexed, 

1076 ) = row 

1077 metadata: dict[str, Any] = ( 

1078 json.loads(search_metadata_json) if search_metadata_json else {} 

1079 ) 

1080 

1081 search_results.append( 

1082 SearchResult( 

1083 content_id=content_id, 

1084 content_type=content_type or "unknown", 

1085 title=f"{(content_type or 'unknown').title()} from {metadata.get('project', 'Unknown')}", 

1086 content=truncate_content(indexed_content), 

1087 score=0.8, # Simple scoring for now 

1088 project=metadata.get("project"), 

1089 timestamp=ensure_timezone(last_indexed), 

1090 metadata=metadata, 

1091 ), 

1092 ) 

1093 return search_results 

1094 

1095 async def _add_highlights( 

1096 self, 

1097 results: list[SearchResult], 

1098 query: str, 

1099 ) -> list[SearchResult]: 

1100 """Add search highlights to results.""" 

1101 query_terms = query.lower().split() 

1102 

1103 for result in results: 

1104 highlights = [] 

1105 content_lower = result.content.lower() 

1106 

1107 for term in query_terms: 

1108 if term in content_lower: 1108 ↛ 1107line 1108 didn't jump to line 1107 because the condition on line 1108 was always true

1109 # Find context around the term 

1110 start_pos = content_lower.find(term) 

1111 context_start = max(0, start_pos - 50) 

1112 context_end = min(len(result.content), start_pos + len(term) + 50) 

1113 

1114 highlight = result.content[context_start:context_end] 

1115 highlight = highlight.replace(term, f"<mark>{term}</mark>") 

1116 highlights.append(highlight) 

1117 

1118 result.highlights = highlights[:3] # Limit to 3 highlights 

1119 

1120 return results 

1121 

1122 async def _calculate_facets( 

1123 self, 

1124 query: str, 

1125 filters: list[SearchFilter] | None, 

1126 requested_facets: list[str], 

1127 ) -> dict[str, SearchFacet]: 

1128 """Calculate facet counts for search results.""" 

1129 facets = {} 

1130 

1131 for facet_name in requested_facets: 

1132 if facet_name in self.facet_configs: 1132 ↛ 1131line 1132 didn't jump to line 1131 because the condition on line 1132 was always true

1133 facet_config = self.facet_configs[facet_name] 

1134 

1135 sql = """ 

1136 SELECT facet_value, COUNT(*) as count 

1137 FROM search_facets sf 

1138 JOIN search_index si ON sf.content_id = si.id 

1139 WHERE sf.facet_name = ? AND si.indexed_content LIKE ? 

1140 GROUP BY facet_value 

1141 ORDER BY count DESC 

1142 LIMIT ? 

1143 """ 

1144 

1145 if not self.reflection_db.conn: 1145 ↛ 1146line 1145 didn't jump to line 1146 because the condition on line 1145 was never true

1146 continue 

1147 

1148 try: 

1149 results = self.reflection_db.conn.execute( 

1150 sql, 

1151 [facet_name, f"%{query}%", facet_config["size"]], 

1152 ).fetchall() 

1153 

1154 facets[facet_name] = SearchFacet( 

1155 name=facet_name, 

1156 values=[ 

1157 (str(row[0]) if row[0] is not None else "", row[1]) 

1158 for row in results 

1159 ], 

1160 facet_type=str(facet_config["type"]), 

1161 ) 

1162 except Exception: 

1163 # Table doesn't exist yet, will be created during index rebuild 

1164 continue 

1165 

1166 return facets