Coverage for session_buddy / advanced_search.py: 67.20%
374 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1#!/usr/bin/env python3
2"""Advanced Search Engine for Session Management.
4Provides enhanced search capabilities with faceted filtering, full-text search,
5and intelligent result ranking.
6"""
8import hashlib
9import json
10import time
11from datetime import UTC, datetime
12from typing import Any
14from .reflection_tools import ReflectionDatabase
15from .search_enhanced import EnhancedSearchEngine
16from .session_types import SQLCondition
17from .utils.search import (
18 SearchFacet,
19 SearchFilter,
20 SearchResult,
21 ensure_timezone,
22 extract_technical_terms,
23 parse_timeframe,
24 parse_timeframe_single,
25 truncate_content,
26)
28__all__ = ["AdvancedSearchEngine", "SearchFilter"]
31class AdvancedSearchEngine:
32 """Advanced search engine with faceted filtering and full-text search."""
34 def __init__(self, reflection_db: ReflectionDatabase) -> None:
35 self.reflection_db = reflection_db
36 self.enhanced_search = EnhancedSearchEngine(reflection_db)
37 self.index_cache: dict[str, datetime] = {}
39 # Search configuration
40 self.facet_configs = {
41 "project": {"type": "terms", "size": 20},
42 "content_type": {"type": "terms", "size": 10},
43 "date_range": {
44 "type": "date",
45 "ranges": ["1d", "7d", "30d", "90d", "365d"],
46 },
47 "author": {"type": "terms", "size": 15},
48 "tags": {"type": "terms", "size": 25},
49 "file_type": {"type": "terms", "size": 10},
50 "language": {"type": "terms", "size": 10},
51 "error_type": {"type": "terms", "size": 15},
52 }
54 async def search(
55 self,
56 query: str,
57 filters: list[SearchFilter] | None = None,
58 facets: list[str] | None = None,
59 sort_by: str = "relevance", # 'relevance', 'date', 'project'
60 limit: int = 20,
61 offset: int = 0,
62 include_highlights: bool = True,
63 content_type: str | None = None,
64 timeframe: str | None = None,
65 ) -> dict[str, Any]:
66 """Perform advanced search with faceted filtering."""
67 # Ensure search index is up to date
68 await self._ensure_search_index()
70 # Build and execute search
71 search_query = self._build_search_query(query, filters)
72 results = await self._execute_search(
73 search_query,
74 sort_by,
75 limit,
76 offset,
77 filters,
78 content_type,
79 timeframe,
80 )
82 # Process results with optional features
83 results = await self._process_search_results(results, query, include_highlights)
84 facet_results = await self._process_facets(query, filters, facets)
86 return self._format_search_response(results, facet_results, query, filters)
88 async def _process_search_results(
89 self,
90 results: list[Any],
91 query: str,
92 include_highlights: bool,
93 ) -> list[Any]:
94 """Process search results with optional highlighting."""
95 if include_highlights: 95 ↛ 97line 95 didn't jump to line 97 because the condition on line 95 was always true
96 return await self._add_highlights(results, query)
97 return results
99 async def _process_facets(
100 self,
101 query: str,
102 filters: list[SearchFilter] | None,
103 facets: list[str] | None,
104 ) -> dict[str, Any]:
105 """Process facets if requested."""
106 if facets:
107 return await self._calculate_facets(query, filters, facets)
108 return {}
110 def _format_search_response(
111 self,
112 results: list[Any],
113 facet_results: dict[str, Any],
114 query: str,
115 filters: list[SearchFilter] | None,
116 ) -> dict[str, Any]:
117 """Format the final search response."""
118 return {
119 "results": results,
120 "facets": facet_results,
121 "total": len(results),
122 "query": query,
123 "filters": [f.__dict__ for f in filters] if filters else [],
124 "took": time.time() - time.time(), # Will be updated with actual timing
125 }
127 async def suggest_completions(
128 self,
129 query: str,
130 field: str = "indexed_content",
131 limit: int = 10,
132 ) -> list[dict[str, Any]]:
133 """Get search completion suggestions."""
134 # Use parameterized queries with predefined SQL patterns to prevent injection
135 field_queries = {
136 "content": """
137 SELECT DISTINCT indexed_content, COUNT(*) as frequency
138 FROM search_index
139 WHERE indexed_content LIKE ?
140 GROUP BY indexed_content
141 ORDER BY frequency DESC, indexed_content
142 LIMIT ?
143 """,
144 "project": """
145 SELECT DISTINCT JSON_EXTRACT_STRING(search_metadata, '$.project'), COUNT(*) as frequency
146 FROM search_index
147 WHERE JSON_EXTRACT_STRING(search_metadata, '$.project') LIKE ?
148 GROUP BY JSON_EXTRACT_STRING(search_metadata, '$.project')
149 ORDER BY frequency DESC, JSON_EXTRACT_STRING(search_metadata, '$.project')
150 LIMIT ?
151 """,
152 "tags": """
153 SELECT DISTINCT JSON_EXTRACT_STRING(search_metadata, '$.tags'), COUNT(*) as frequency
154 FROM search_index
155 WHERE JSON_EXTRACT_STRING(search_metadata, '$.tags') LIKE ?
156 GROUP BY JSON_EXTRACT_STRING(search_metadata, '$.tags')
157 ORDER BY frequency DESC, JSON_EXTRACT_STRING(search_metadata, '$.tags')
158 LIMIT ?
159 """,
160 }
162 # Use predefined query or default to content search
163 sql = field_queries.get(field, field_queries["content"])
165 if not self.reflection_db.conn: 165 ↛ 166line 165 didn't jump to line 166 because the condition on line 165 was never true
166 return []
168 try:
169 results = self.reflection_db.conn.execute(
170 sql,
171 [f"%{query}%", limit],
172 ).fetchall()
174 return [{"text": row[0], "frequency": row[1]} for row in results]
175 except Exception:
176 # Table doesn't exist yet, will be created during index rebuild
177 return []
179 async def get_similar_content(
180 self,
181 content_id: str,
182 content_type: str,
183 limit: int = 5,
184 ) -> list[SearchResult]:
185 """Find similar content using embeddings or text similarity."""
186 # Get the source content
187 sql = """
188 SELECT indexed_content, search_metadata
189 FROM search_index
190 WHERE content_id = ? AND content_type = ?
191 """
193 if not self.reflection_db.conn: 193 ↛ 194line 193 didn't jump to line 194 because the condition on line 193 was never true
194 return []
196 try:
197 result = self.reflection_db.conn.execute(
198 sql,
199 [content_id, content_type],
200 ).fetchone()
202 if not result:
203 return []
204 except Exception:
205 # Table doesn't exist yet, will be created during index rebuild
206 return []
208 source_content = result[0]
210 # Use enhanced search for similarity
211 similar_results = await self.reflection_db.search_conversations(
212 query=source_content[:500], # Use first 500 chars as query
213 limit=limit + 1, # +1 to exclude the source itself
214 )
216 # Convert to SearchResult format and exclude source
217 search_results = [
218 SearchResult(
219 content_id=conv.get("conversation_id", ""),
220 content_type="conversation",
221 title=f"Conversation from {conv.get('project', 'Unknown')}",
222 content=conv.get("content", ""),
223 score=conv.get("score", 0.0),
224 project=conv.get("project"),
225 timestamp=conv.get("timestamp"),
226 metadata=conv.get("metadata", {}),
227 )
228 for conv in similar_results
229 if conv.get("conversation_id") != content_id
230 ]
232 return search_results[:limit]
234 async def search_by_timeframe(
235 self,
236 timeframe: str, # '1h', '1d', '1w', '1m', '1y' or ISO date range
237 query: str | None = None,
238 project: str | None = None,
239 limit: int = 20,
240 ) -> list[dict[str, Any]]:
241 """Search within a specific timeframe."""
242 # Parse timeframe
243 time_range = parse_timeframe(timeframe)
244 start_time, end_time = time_range.start, time_range.end
246 # Build time filter
247 time_filter = SearchFilter(
248 field="timestamp",
249 operator="range",
250 value=(start_time, end_time),
251 )
253 filters = [time_filter]
254 if project: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true
255 filters.append(SearchFilter(field="project", operator="eq", value=project))
257 # Perform search
258 search_results = await self.search(
259 query=query or "*",
260 filters=filters,
261 limit=limit,
262 sort_by="date",
263 )
265 # Convert SearchResult objects to dictionaries for compatibility
266 result_dicts = []
267 for result in search_results["results"]: 267 ↛ 268line 267 didn't jump to line 268 because the loop on line 267 never started
268 result_dict = {
269 "content_id": result.content_id,
270 "content_type": result.content_type,
271 "title": result.title,
272 "content": result.content,
273 "score": result.score,
274 "project": result.project,
275 "timestamp": result.timestamp,
276 "metadata": result.metadata,
277 "highlights": result.highlights,
278 "facets": result.facets,
279 }
280 result_dicts.append(result_dict)
282 return result_dicts
284 async def aggregate_metrics(
285 self,
286 metric_type: str, # 'activity', 'projects', 'content_types', 'errors'
287 timeframe: str = "30d",
288 filters: list[SearchFilter] | None = None,
289 ) -> dict[str, Any]:
290 """Calculate aggregate metrics from search data."""
291 time_range = parse_timeframe(timeframe)
292 start_time, end_time = time_range.start, time_range.end
294 # Use predefined parameterized queries to prevent SQL injection
295 metric_queries = {
296 "activity": """
297 SELECT DATE_TRUNC('day', last_indexed) as day,
298 COUNT(*) as count,
299 COUNT(DISTINCT content_id) as unique_content
300 FROM search_index
301 WHERE last_indexed BETWEEN ? AND ?
302 GROUP BY day
303 ORDER BY day
304 """,
305 "projects": """
306 SELECT JSON_EXTRACT_STRING(search_metadata, '$.project') as project,
307 COUNT(*) as count
308 FROM search_index
309 WHERE last_indexed BETWEEN ? AND ?
310 AND JSON_EXTRACT_STRING(search_metadata, '$.project') IS NOT NULL
311 GROUP BY project
312 ORDER BY count DESC
313 """,
314 "content_types": """
315 SELECT content_type, COUNT(*) as count
316 FROM search_index
317 WHERE last_indexed BETWEEN ? AND ?
318 GROUP BY content_type
319 ORDER BY count DESC
320 """,
321 "errors": """
322 SELECT JSON_EXTRACT_STRING(search_metadata, '$.error_type') as error_type,
323 COUNT(*) as count
324 FROM search_index
325 WHERE last_indexed BETWEEN ? AND ?
326 AND JSON_EXTRACT_STRING(search_metadata, '$.error_type') IS NOT NULL
327 GROUP BY error_type
328 ORDER BY count DESC
329 """,
330 }
332 if metric_type not in metric_queries:
333 return {"error": f"Unknown metric type: {metric_type}"}
335 sql = metric_queries[metric_type]
337 if not self.reflection_db.conn: 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true
338 return {
339 "error": f"Database connection not available for metric type: {metric_type}",
340 }
342 try:
343 # Use simplified parameters for the base time range
344 base_params = [start_time, end_time]
345 results = self.reflection_db.conn.execute(sql, base_params).fetchall()
347 return {
348 "metric_type": metric_type,
349 "timeframe": timeframe,
350 "data": [{"key": row[0], "value": row[1]} for row in results]
351 if results
352 else [],
353 }
354 except Exception:
355 # Table doesn't exist yet, will be created during index rebuild
356 return {
357 "metric_type": metric_type,
358 "timeframe": timeframe,
359 "data": [],
360 }
362 async def _ensure_search_index(self) -> None:
363 """Ensure search index is up to date."""
364 # Check when index was last updated
365 last_update = await self._get_last_index_update()
367 # Update if older than 1 hour or if never updated
368 if not last_update or (datetime.now(UTC) - last_update).total_seconds() > 3600: 368 ↛ exitline 368 didn't return from function '_ensure_search_index' because the condition on line 368 was always true
369 await self._rebuild_search_index()
371 async def _get_last_index_update(self) -> datetime | None:
372 """Get timestamp of last index update."""
373 sql = "SELECT MAX(last_indexed) FROM search_index"
375 if not self.reflection_db.conn: 375 ↛ 376line 375 didn't jump to line 376 because the condition on line 375 was never true
376 return None
378 try:
379 result = self.reflection_db.conn.execute(sql).fetchone()
381 if result and result[0]: 381 ↛ 387line 381 didn't jump to line 387 because the condition on line 381 was always true
382 dt = result[0]
383 # Ensure datetime is timezone-aware
384 if isinstance(dt, datetime) and dt.tzinfo is None: 384 ↛ 386line 384 didn't jump to line 386 because the condition on line 384 was always true
385 dt = dt.replace(tzinfo=UTC)
386 return dt if isinstance(dt, datetime) else None
387 return None
388 except Exception:
389 # Table doesn't exist yet, will be created during index rebuild
390 return None
392 async def _rebuild_search_index(self) -> None:
393 """Rebuild the search index from conversations and reflections."""
394 # Ensure database tables exist before indexing
395 if self.reflection_db.conn: 395 ↛ 399line 395 didn't jump to line 399 because the condition on line 395 was always true
396 await self.reflection_db._ensure_tables()
398 # Index conversations
399 await self._index_conversations()
401 # Index reflections
402 await self._index_reflections()
404 # Update facets
405 await self._update_search_facets()
407 async def _index_conversations(self) -> None:
408 """Index all conversations for search."""
409 if not self.reflection_db.conn: 409 ↛ 410line 409 didn't jump to line 410 because the condition on line 409 was never true
410 return
412 conversations = self._fetch_conversations_for_indexing()
413 for row in conversations:
414 await self._process_conversation_for_index(row)
416 self._commit_conversation_index()
418 def _fetch_conversations_for_indexing(self) -> list[tuple[Any, ...]]:
419 """Fetch conversations from database for indexing."""
420 if not self.reflection_db.conn: 420 ↛ 421line 420 didn't jump to line 421 because the condition on line 420 was never true
421 return []
423 sql = "SELECT id, content, project, timestamp, metadata FROM conversations"
424 return self.reflection_db.conn.execute(sql).fetchall()
426 async def _process_conversation_for_index(self, row: tuple[Any, ...]) -> None:
427 """Process a single conversation row for search indexing."""
428 conv_id, content, project, timestamp, metadata_json = row
430 self._parse_conversation_metadata(metadata_json)
431 indexed_content = self._build_indexed_content(content, project)
432 search_metadata = self._build_conversation_search_metadata(
433 project,
434 timestamp,
435 content,
436 indexed_content,
437 )
439 self._insert_conversation_into_search_index(
440 conv_id,
441 indexed_content,
442 search_metadata,
443 )
445 def _parse_conversation_metadata(self, metadata_json: str | None) -> dict[str, Any]:
446 """Parse conversation metadata JSON safely."""
447 return json.loads(metadata_json) if metadata_json else {}
449 def _build_indexed_content(self, content: str, project: str | None) -> str:
450 """Build indexed content with project and technical terms."""
451 indexed_content = content
453 if project: 453 ↛ 456line 453 didn't jump to line 456 because the condition on line 453 was always true
454 indexed_content += f" project:{project}"
456 tech_terms = extract_technical_terms(content)
457 if tech_terms:
458 indexed_content += " " + " ".join(tech_terms)
460 return indexed_content
462 def _build_conversation_search_metadata(
463 self,
464 project: str | None,
465 timestamp: datetime | None,
466 content: str,
467 indexed_content: str,
468 ) -> dict[str, Any]:
469 """Build search metadata for conversation."""
470 tech_terms = extract_technical_terms(content)
471 return {
472 "project": project,
473 "timestamp": timestamp.isoformat() if timestamp else None,
474 "content_length": len(content),
475 "technical_terms": tech_terms,
476 }
478 def _insert_conversation_into_search_index(
479 self,
480 conv_id: str,
481 indexed_content: str,
482 search_metadata: dict[str, Any],
483 ) -> None:
484 """Insert conversation into search index."""
485 if not self.reflection_db.conn: 485 ↛ 486line 485 didn't jump to line 486 because the condition on line 485 was never true
486 return
488 self.reflection_db.conn.execute(
489 """
490 INSERT INTO search_index
491 (id, content_type, content_id, indexed_content, search_metadata, last_indexed)
492 VALUES (?, ?, ?, ?, ?, ?)
493 ON CONFLICT (id) DO UPDATE SET
494 content_type = EXCLUDED.content_type,
495 content_id = EXCLUDED.content_id,
496 indexed_content = EXCLUDED.indexed_content,
497 search_metadata = EXCLUDED.search_metadata,
498 last_indexed = EXCLUDED.last_indexed
499 """,
500 [
501 f"conv_{conv_id}",
502 "conversation",
503 conv_id,
504 indexed_content,
505 json.dumps(search_metadata),
506 datetime.now(UTC),
507 ],
508 )
510 def _commit_conversation_index(self) -> None:
511 """Commit the conversation indexing transaction."""
512 if self.reflection_db.conn: 512 ↛ exitline 512 didn't return from function '_commit_conversation_index' because the condition on line 512 was always true
513 self.reflection_db.conn.commit()
515 async def _index_reflections(self) -> None:
516 """Index all reflections for search."""
517 if not self.reflection_db.conn: 517 ↛ 518line 517 didn't jump to line 518 because the condition on line 517 was never true
518 return
520 sql = "SELECT id, content, tags, timestamp, metadata FROM reflections"
521 results = self.reflection_db.conn.execute(sql).fetchall()
523 for row in results:
524 refl_id, content, tags, timestamp, metadata_json = row
526 # Extract metadata
527 metadata: dict[str, Any] = (
528 json.loads(metadata_json) if metadata_json else {}
529 )
531 # Create indexed content
532 indexed_content = content
533 if tags: 533 ↛ 537line 533 didn't jump to line 537 because the condition on line 533 was always true
534 indexed_content += " " + " ".join(f"tag:{tag}" for tag in tags)
536 # Create search metadata
537 base_metadata: dict[str, Any] = {
538 "tags": tags or [],
539 "timestamp": timestamp.isoformat() if timestamp else None,
540 "content_length": len(content),
541 }
542 search_metadata: dict[str, Any] = base_metadata | metadata
544 # Insert or update search index
545 if self.reflection_db.conn: 545 ↛ 523line 545 didn't jump to line 523 because the condition on line 545 was always true
546 self.reflection_db.conn.execute(
547 """
548 INSERT INTO search_index
549 (id, content_type, content_id, indexed_content, search_metadata, last_indexed)
550 VALUES (?, ?, ?, ?, ?, ?)
551 ON CONFLICT (id) DO UPDATE SET
552 content_type = EXCLUDED.content_type,
553 content_id = EXCLUDED.content_id,
554 indexed_content = EXCLUDED.indexed_content,
555 search_metadata = EXCLUDED.search_metadata,
556 last_indexed = EXCLUDED.last_indexed
557 """,
558 [
559 f"refl_{refl_id}",
560 "reflection",
561 refl_id,
562 indexed_content,
563 json.dumps(search_metadata),
564 datetime.now(UTC),
565 ],
566 )
568 if self.reflection_db.conn: 568 ↛ exitline 568 didn't return from function '_index_reflections' because the condition on line 568 was always true
569 self.reflection_db.conn.commit()
571 def _get_facet_queries(self) -> dict[str, str]:
572 """Get facet query definitions."""
573 return {
574 "project": """
575 SELECT JSON_EXTRACT_STRING(search_metadata, '$.project') as facet_value, COUNT(*) as count
576 FROM search_index
577 WHERE JSON_EXTRACT_STRING(search_metadata, '$.project') IS NOT NULL
578 GROUP BY facet_value
579 ORDER BY count DESC
580 """,
581 "content_type": """
582 SELECT content_type as facet_value, COUNT(*) as count
583 FROM search_index
584 WHERE content_type IS NOT NULL
585 GROUP BY facet_value
586 ORDER BY count DESC
587 """,
588 "tags": """
589 SELECT JSON_EXTRACT_STRING(search_metadata, '$.tags') as facet_value, COUNT(*) as count
590 FROM search_index
591 WHERE JSON_EXTRACT_STRING(search_metadata, '$.tags') IS NOT NULL
592 GROUP BY facet_value
593 ORDER BY count DESC
594 """,
595 "technical_terms": """
596 SELECT JSON_EXTRACT_STRING(search_metadata, '$.technical_terms') as facet_value, COUNT(*) as count
597 FROM search_index
598 WHERE JSON_EXTRACT_STRING(search_metadata, '$.technical_terms') IS NOT NULL
599 GROUP BY facet_value
600 ORDER BY count DESC
601 """,
602 }
604 def _should_process_facet_value(self, facet_value: Any) -> bool:
605 """Check if facet value should be processed."""
606 return isinstance(facet_value, str) and bool(facet_value)
608 def _insert_facet_value(self, facet_name: str, facet_value: str) -> None:
609 """Insert a single facet value into the database."""
610 if not self.reflection_db.conn: 610 ↛ 611line 610 didn't jump to line 611 because the condition on line 610 was never true
611 return
613 facet_id = hashlib.md5(
614 f"{facet_name}_{facet_value}".encode(),
615 usedforsecurity=False,
616 ).hexdigest()
618 self.reflection_db.conn.execute(
619 """
620 INSERT INTO search_facets
621 (id, content_type, content_id, facet_name, facet_value, created_at)
622 VALUES (?, ?, ?, ?, ?, ?)
623 ON CONFLICT (id) DO UPDATE SET
624 content_type = EXCLUDED.content_type,
625 content_id = EXCLUDED.content_id,
626 facet_name = EXCLUDED.facet_name,
627 facet_value = EXCLUDED.facet_value,
628 created_at = EXCLUDED.created_at
629 """,
630 [
631 facet_id,
632 "search_facet",
633 f"{facet_name}_{facet_value}",
634 facet_name,
635 facet_value,
636 datetime.now(UTC).isoformat(),
637 ],
638 )
640 def _process_facet_query(self, facet_name: str, sql: str) -> None:
641 """Process a single facet query."""
642 if not self.reflection_db.conn: 642 ↛ 643line 642 didn't jump to line 643 because the condition on line 642 was never true
643 return
645 try:
646 results = self.reflection_db.conn.execute(sql).fetchall()
648 for facet_value, _count in results:
649 if self._should_process_facet_value(facet_value): 649 ↛ 648line 649 didn't jump to line 648 because the condition on line 649 was always true
650 self._insert_facet_value(facet_name, facet_value)
651 except Exception:
652 # Table doesn't exist yet, will be created during index rebuild
653 return
655 async def _update_search_facets(self) -> None:
656 """Update search facets based on indexed content."""
657 if not self.reflection_db.conn: 657 ↛ 658line 657 didn't jump to line 658 because the condition on line 657 was never true
658 return
660 # Clear existing facets
661 self.reflection_db.conn.execute("DELETE FROM search_facets")
663 # Process each facet query
664 facet_queries = self._get_facet_queries()
665 for facet_name, sql in facet_queries.items():
666 self._process_facet_query(facet_name, sql)
668 # Commit all changes
669 if self.reflection_db.conn: 669 ↛ exitline 669 didn't return from function '_update_search_facets' because the condition on line 669 was always true
670 self.reflection_db.conn.commit()
672 def _build_search_query(
673 self,
674 query: str,
675 filters: list[SearchFilter] | None,
676 ) -> str:
677 """Build search query with filters."""
678 # For now, return simple query - could be enhanced with query parsing
679 return query
681 def _build_filter_conditions(
682 self,
683 filters: list[SearchFilter],
684 ) -> SQLCondition:
685 """Build SQL conditions from filters."""
686 conditions = []
687 params: list[str | datetime] = []
689 for filt in filters:
690 condition_result = self._build_single_filter_condition(filt)
691 if condition_result:
692 conditions.append(condition_result.condition)
693 params.extend(condition_result.params)
695 return SQLCondition(condition=" AND ".join(conditions), params=params)
697 def _build_single_filter_condition(self, filt: SearchFilter) -> SQLCondition | None:
698 """Build a single filter condition."""
699 if filt.field == "timestamp" and filt.operator == "range":
700 return self._build_timestamp_range_condition(filt)
701 if filt.operator == "eq":
702 return self._build_equality_condition(filt)
703 if filt.operator == "contains":
704 return self._build_contains_condition(filt)
705 return None
707 def _build_timestamp_range_condition(
708 self,
709 filt: SearchFilter,
710 ) -> SQLCondition | None:
711 """Build timestamp range condition."""
712 if not isinstance(filt.value, tuple | list) or len(filt.value) != 2:
713 return None # Skip invalid range values
715 start_time, end_time = filt.value[0], filt.value[1]
716 condition = "last_indexed BETWEEN ? AND ?"
717 negated_condition = f"{'NOT ' if filt.negate else ''}{condition}"
719 # Ensure params are strings or datetime objects
720 params: list[str | datetime] = []
721 if isinstance(start_time, str | datetime):
722 params.append(start_time)
723 if isinstance(end_time, str | datetime):
724 params.append(end_time)
726 return SQLCondition(condition=negated_condition, params=params)
728 def _build_equality_condition(self, filt: SearchFilter) -> SQLCondition:
729 """Build equality condition."""
730 condition = f"JSON_EXTRACT_STRING(search_metadata, '$.{filt.field}') = ?"
731 negated_condition = f"{'NOT ' if filt.negate else ''}{condition}"
733 # Ensure value is a string or datetime
734 params: list[str | datetime] = []
735 if isinstance(filt.value, str | datetime):
736 params.append(filt.value)
737 elif (
738 isinstance(filt.value, list)
739 and filt.value
740 and isinstance(filt.value[0], str | datetime)
741 ):
742 params.append(filt.value[0])
744 return SQLCondition(condition=negated_condition, params=params)
746 def _build_contains_condition(self, filt: SearchFilter) -> SQLCondition:
747 """Build contains condition."""
748 condition = "indexed_content LIKE ?"
749 negated_condition = f"{'NOT ' if filt.negate else ''}{condition}"
751 # Ensure value is a string
752 value_str = str(filt.value) if not isinstance(filt.value, str) else filt.value
753 params: list[str | datetime] = [f"%{value_str}%"]
755 return SQLCondition(condition=negated_condition, params=params)
757 async def _execute_search(
758 self,
759 query: str,
760 sort_by: str,
761 limit: int,
762 offset: int,
763 filters: list[SearchFilter] | None = None,
764 content_type: str | None = None,
765 timeframe: str | None = None,
766 ) -> list[SearchResult]:
767 """Execute the actual search."""
768 sql_result = self._build_search_sql(
769 query,
770 content_type,
771 timeframe,
772 filters,
773 sort_by,
774 limit,
775 offset,
776 )
778 if not self.reflection_db.conn: 778 ↛ 779line 778 didn't jump to line 779 because the condition on line 778 was never true
779 return []
781 try:
782 results = self.reflection_db.conn.execute(
783 sql_result.condition,
784 self._prepare_sql_params(sql_result.params),
785 ).fetchall()
786 return self._convert_sql_results_to_search_results(results)
787 except Exception:
788 # Table doesn't exist yet, will be created during index rebuild
789 return []
791 def _build_search_sql(
792 self,
793 query: str,
794 content_type: str | None,
795 timeframe: str | None,
796 filters: list[SearchFilter] | None,
797 sort_by: str,
798 limit: int,
799 offset: int,
800 ) -> SQLCondition:
801 """Build complete SQL query for search."""
802 sql = """
803 SELECT content_id, content_type, indexed_content, search_metadata, last_indexed
804 FROM search_index
805 WHERE indexed_content LIKE ?
806 """
807 params: list[str | datetime] = [f"%{query}%"]
809 result = self._add_content_type_filter(sql, params, content_type)
810 sql, params = result.condition, result.params
812 result = self._add_timeframe_filter(sql, params, timeframe, content_type)
813 sql, params = result.condition, result.params
815 result = self._add_filter_conditions_to_sql(sql, params, filters)
816 sql, params = result.condition, result.params
818 sql = self._add_sorting_to_sql(sql, sort_by)
819 sql += " LIMIT ? OFFSET ?"
820 params.extend([str(limit), str(offset)])
822 return SQLCondition(condition=sql, params=params)
824 def _get_sql_field(self, field: str) -> str:
825 """Map filter field to SQL column expression."""
826 field_mappings = {
827 "content_type": "content_type",
828 "last_indexed": "last_indexed",
829 "project": "JSON_EXTRACT_STRING(search_metadata, '$.project')",
830 "timestamp": "JSON_EXTRACT_STRING(search_metadata, '$.timestamp')",
831 "author": "JSON_EXTRACT_STRING(search_metadata, '$.author')",
832 "tags": "JSON_EXTRACT_STRING(search_metadata, '$.tags')",
833 }
834 return field_mappings.get(
835 field,
836 f"JSON_EXTRACT_STRING(search_metadata, '$.{field}')",
837 )
839 def _apply_eq_filter(
840 self,
841 sql_field: str,
842 negation: str,
843 value: Any,
844 params: list[str | datetime],
845 ) -> tuple[str, list[str | datetime]]:
846 """Apply equality filter."""
847 sql_part = f" AND {negation}{sql_field} = ?"
848 params.append(str(value))
849 return sql_part, params
851 def _apply_ne_filter(
852 self,
853 sql_field: str,
854 negation: str,
855 value: Any,
856 params: list[str | datetime],
857 ) -> tuple[str, list[str | datetime]]:
858 """Apply not-equal filter."""
859 sql_part = f" AND {negation}{sql_field} != ?"
860 params.append(str(value))
861 return sql_part, params
863 def _apply_in_filter(
864 self,
865 sql_field: str,
866 negation: str,
867 value: Any,
868 params: list[str | datetime],
869 ) -> tuple[str, list[str | datetime]]:
870 """Apply IN filter."""
871 if isinstance(value, list):
872 placeholders = ", ".join("?" * len(value))
873 sql_part = f" AND {negation}{sql_field} IN ({placeholders})"
874 params.extend([str(v) for v in value])
875 return sql_part, params
876 return "", params
878 def _apply_not_in_filter(
879 self,
880 sql_field: str,
881 negation: str,
882 value: Any,
883 params: list[str | datetime],
884 ) -> tuple[str, list[str | datetime]]:
885 """Apply NOT IN filter."""
886 if isinstance(value, list):
887 placeholders = ", ".join("?" * len(value))
888 sql_part = f" AND {negation}{sql_field} NOT IN ({placeholders})"
889 params.extend([str(v) for v in value])
890 return sql_part, params
891 return "", params
893 def _apply_contains_filter(
894 self,
895 sql_field: str,
896 negation: str,
897 value: Any,
898 params: list[str | datetime],
899 ) -> tuple[str, list[str | datetime]]:
900 """Apply CONTAINS filter."""
901 sql_part = f" AND {negation}{sql_field} LIKE ?"
902 params.append(f"%{value}%")
903 return sql_part, params
905 def _apply_starts_with_filter(
906 self,
907 sql_field: str,
908 negation: str,
909 value: Any,
910 params: list[str | datetime],
911 ) -> tuple[str, list[str | datetime]]:
912 """Apply STARTS_WITH filter."""
913 sql_part = f" AND {negation}{sql_field} LIKE ?"
914 params.append(f"{value}%")
915 return sql_part, params
917 def _apply_ends_with_filter(
918 self,
919 sql_field: str,
920 negation: str,
921 value: Any,
922 params: list[str | datetime],
923 ) -> tuple[str, list[str | datetime]]:
924 """Apply ENDS_WITH filter."""
925 sql_part = f" AND {negation}{sql_field} LIKE ?"
926 params.append(f"%{value}")
927 return sql_part, params
929 def _apply_range_filter(
930 self,
931 sql_field: str,
932 filter_obj: SearchFilter,
933 params: list[str | datetime],
934 ) -> tuple[str, list[str | datetime]]:
935 """Apply RANGE filter."""
936 if isinstance(filter_obj.value, tuple) and len(filter_obj.value) == 2: 936 ↛ 943line 936 didn't jump to line 943 because the condition on line 936 was always true
937 if filter_obj.negate: 937 ↛ 938line 937 didn't jump to line 938 because the condition on line 937 was never true
938 sql_part = f" AND ({sql_field} < ? OR {sql_field} > ?)"
939 else:
940 sql_part = f" AND {sql_field} BETWEEN ? AND ?"
941 params.extend([str(filter_obj.value[0]), str(filter_obj.value[1])])
942 return sql_part, params
943 return "", params
945 def _add_filter_conditions_to_sql(
946 self,
947 sql: str,
948 params: list[str | datetime],
949 filters: list[SearchFilter] | None,
950 ) -> SQLCondition:
951 """Add custom filter conditions to SQL query."""
952 if not filters:
953 return SQLCondition(condition=sql, params=params)
955 for filter_obj in filters:
956 sql_field = self._get_sql_field(filter_obj.field)
957 negation = "NOT " if filter_obj.negate else ""
959 # Dispatch to appropriate filter handler
960 operator_handlers = {
961 "eq": lambda: self._apply_eq_filter(
962 sql_field,
963 negation,
964 filter_obj.value,
965 params,
966 ),
967 "ne": lambda: self._apply_ne_filter(
968 sql_field,
969 negation,
970 filter_obj.value,
971 params,
972 ),
973 "in": lambda: self._apply_in_filter(
974 sql_field,
975 negation,
976 filter_obj.value,
977 params,
978 ),
979 "not_in": lambda: self._apply_not_in_filter(
980 sql_field,
981 negation,
982 filter_obj.value,
983 params,
984 ),
985 "contains": lambda: self._apply_contains_filter(
986 sql_field,
987 negation,
988 filter_obj.value,
989 params,
990 ),
991 "starts_with": lambda: self._apply_starts_with_filter(
992 sql_field,
993 negation,
994 filter_obj.value,
995 params,
996 ),
997 "ends_with": lambda: self._apply_ends_with_filter(
998 sql_field,
999 negation,
1000 filter_obj.value,
1001 params,
1002 ),
1003 "range": lambda: self._apply_range_filter(
1004 sql_field,
1005 filter_obj,
1006 params,
1007 ),
1008 }
1010 handler = operator_handlers.get(filter_obj.operator)
1011 if handler: 1011 ↛ 955line 1011 didn't jump to line 955 because the condition on line 1011 was always true
1012 sql_part, params = handler()
1013 sql += sql_part
1015 return SQLCondition(condition=sql, params=params)
1017 def _add_content_type_filter(
1018 self,
1019 sql: str,
1020 params: list[str | datetime],
1021 content_type: str | None,
1022 ) -> SQLCondition:
1023 """Add content type filter to SQL query."""
1024 if content_type:
1025 sql += " AND content_type = ?"
1026 params.append(content_type)
1027 return SQLCondition(condition=sql, params=params)
1029 def _add_timeframe_filter(
1030 self,
1031 sql: str,
1032 params: list[str | datetime],
1033 timeframe: str | None,
1034 content_type: str | None,
1035 ) -> SQLCondition:
1036 """Add timeframe filter to SQL query."""
1037 if ( 1037 ↛ 1040line 1037 didn't jump to line 1040 because the condition on line 1037 was never true
1038 timeframe and content_type
1039 ): # Only add timeframe if content_type is also specified
1040 cutoff_date = parse_timeframe_single(timeframe)
1041 if cutoff_date:
1042 sql += " AND last_indexed >= ?"
1043 params.append(cutoff_date.isoformat())
1044 return SQLCondition(condition=sql, params=params)
1046 def _add_sorting_to_sql(self, sql: str, sort_by: str) -> str:
1047 """Add sorting clause to SQL query."""
1048 if sort_by == "date":
1049 sql += " ORDER BY last_indexed DESC"
1050 elif sort_by == "project": 1050 ↛ 1051line 1050 didn't jump to line 1051 because the condition on line 1050 was never true
1051 sql += " ORDER BY JSON_EXTRACT_STRING(search_metadata, '$.project')"
1052 else: # relevance - simple for now
1053 sql += " ORDER BY LENGTH(indexed_content) DESC" # Longer content = more relevant
1054 return sql
1056 def _prepare_sql_params(self, params: list[str | datetime]) -> list[str]:
1057 """Prepare parameters for SQL execution."""
1058 return [
1059 param.isoformat() if isinstance(param, datetime) else str(param)
1060 for param in params
1061 ]
1063 def _convert_sql_results_to_search_results(
1064 self,
1065 results: list[tuple[Any, ...]],
1066 ) -> list[SearchResult]:
1067 """Convert SQL results to SearchResult objects."""
1068 search_results = []
1069 for row in results:
1070 (
1071 content_id,
1072 content_type,
1073 indexed_content,
1074 search_metadata_json,
1075 last_indexed,
1076 ) = row
1077 metadata: dict[str, Any] = (
1078 json.loads(search_metadata_json) if search_metadata_json else {}
1079 )
1081 search_results.append(
1082 SearchResult(
1083 content_id=content_id,
1084 content_type=content_type or "unknown",
1085 title=f"{(content_type or 'unknown').title()} from {metadata.get('project', 'Unknown')}",
1086 content=truncate_content(indexed_content),
1087 score=0.8, # Simple scoring for now
1088 project=metadata.get("project"),
1089 timestamp=ensure_timezone(last_indexed),
1090 metadata=metadata,
1091 ),
1092 )
1093 return search_results
1095 async def _add_highlights(
1096 self,
1097 results: list[SearchResult],
1098 query: str,
1099 ) -> list[SearchResult]:
1100 """Add search highlights to results."""
1101 query_terms = query.lower().split()
1103 for result in results:
1104 highlights = []
1105 content_lower = result.content.lower()
1107 for term in query_terms:
1108 if term in content_lower: 1108 ↛ 1107line 1108 didn't jump to line 1107 because the condition on line 1108 was always true
1109 # Find context around the term
1110 start_pos = content_lower.find(term)
1111 context_start = max(0, start_pos - 50)
1112 context_end = min(len(result.content), start_pos + len(term) + 50)
1114 highlight = result.content[context_start:context_end]
1115 highlight = highlight.replace(term, f"<mark>{term}</mark>")
1116 highlights.append(highlight)
1118 result.highlights = highlights[:3] # Limit to 3 highlights
1120 return results
1122 async def _calculate_facets(
1123 self,
1124 query: str,
1125 filters: list[SearchFilter] | None,
1126 requested_facets: list[str],
1127 ) -> dict[str, SearchFacet]:
1128 """Calculate facet counts for search results."""
1129 facets = {}
1131 for facet_name in requested_facets:
1132 if facet_name in self.facet_configs: 1132 ↛ 1131line 1132 didn't jump to line 1131 because the condition on line 1132 was always true
1133 facet_config = self.facet_configs[facet_name]
1135 sql = """
1136 SELECT facet_value, COUNT(*) as count
1137 FROM search_facets sf
1138 JOIN search_index si ON sf.content_id = si.id
1139 WHERE sf.facet_name = ? AND si.indexed_content LIKE ?
1140 GROUP BY facet_value
1141 ORDER BY count DESC
1142 LIMIT ?
1143 """
1145 if not self.reflection_db.conn: 1145 ↛ 1146line 1145 didn't jump to line 1146 because the condition on line 1145 was never true
1146 continue
1148 try:
1149 results = self.reflection_db.conn.execute(
1150 sql,
1151 [facet_name, f"%{query}%", facet_config["size"]],
1152 ).fetchall()
1154 facets[facet_name] = SearchFacet(
1155 name=facet_name,
1156 values=[
1157 (str(row[0]) if row[0] is not None else "", row[1])
1158 for row in results
1159 ],
1160 facet_type=str(facet_config["type"]),
1161 )
1162 except Exception:
1163 # Table doesn't exist yet, will be created during index rebuild
1164 continue
1166 return facets