Coverage for session_buddy / adapters / reflection_adapter_oneiric.py: 76.72%

239 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1"""Reflection database adapter using native DuckDB vector operations. 

2 

3Replaces ACB vector adapter with direct DuckDB vector operations while maintaining 

4the same API for backward compatibility. 

5 

6Phase 5: Oneiric Adapter Conversion - Native DuckDB implementation 

7""" 

8 

9from __future__ import annotations 

10 

11import asyncio 

12import hashlib 

13import json 

14import logging 

15import os 

16import time 

17import typing as t 

18import uuid 

19from contextlib import suppress 

20from datetime import UTC, datetime 

21from pathlib import Path 

22 

23if t.TYPE_CHECKING: 

24 from types import TracebackType 

25 

26 import duckdb 

27 import numpy as np 

28 from onnxruntime import InferenceSession 

29 from transformers import AutoTokenizer 

30 

31# Runtime imports (available at runtime but optional for type checking) 

32try: 

33 import numpy as np 

34 

35 NUMPY_AVAILABLE = True 

36except ImportError: 

37 NUMPY_AVAILABLE = False 

38 

39# Embedding system imports 

40try: 

41 import onnxruntime as ort 

42 from transformers import AutoTokenizer 

43 

44 ONNX_AVAILABLE = True 

45except ImportError: 

46 ONNX_AVAILABLE = False 

47 ort = None # type: ignore[no-redef] 

48 AutoTokenizer = None # type: ignore[no-redef] 

49 

50from session_buddy.adapters.settings import ReflectionAdapterSettings 

51from session_buddy.di.container import depends 

52 

53logger = logging.getLogger(__name__) 

54 

55 

56# DuckDB will be imported at runtime 

57DUCKDB_AVAILABLE = True 

58try: 

59 import duckdb 

60except ImportError: 

61 DUCKDB_AVAILABLE = False 

62 if t.TYPE_CHECKING: 

63 # Type stub for type checking when duckdb is not installed 

64 import types 

65 

66 duckdb = types.SimpleNamespace() # type: ignore[misc,assignment] 

67 

68 

69class ReflectionDatabaseAdapterOneiric: 

70 """Manages conversation memory and reflection using native DuckDB vector operations. 

71 

72 This adapter replaces ACB's Vector adapter with direct DuckDB operations while maintaining 

73 the original ReflectionDatabase API for backward compatibility. It handles: 

74 - Local ONNX embedding generation (all-MiniLM-L6-v2, 384 dimensions) 

75 - Vector storage and retrieval via native DuckDB 

76 - Graceful fallback to text search when embeddings unavailable 

77 - Async/await patterns consistent with existing code 

78 

79 The adapter uses Oneiric settings and lifecycle management, providing: 

80 - Native DuckDB vector operations (no ACB dependency) 

81 - Oneiric settings integration 

82 - Same API as the ACB-based adapter 

83 

84 Example: 

85 >>> async with ReflectionDatabaseAdapterOneiric() as db: 

86 >>> conv_id = await db.store_conversation("content", {"project": "foo"}) 

87 >>> results = await db.search_conversations("query") 

88 

89 """ 

90 

91 def __init__( 

92 self, 

93 collection_name: str = "default", 

94 settings: ReflectionAdapterSettings | None = None, 

95 ) -> None: 

96 """Initialize adapter with optional collection name. 

97 

98 Args: 

99 collection_name: Name of the vector collection to use. 

100 Default "default" collection will be created automatically. 

101 settings: Reflection adapter settings. If None, uses defaults. 

102 

103 """ 

104 self.settings = settings or ReflectionAdapterSettings.from_settings() 

105 if collection_name == "default": 105 ↛ 108line 105 didn't jump to line 108 because the condition on line 105 was always true

106 self.collection_name = self.settings.collection_name 

107 else: 

108 self.collection_name = collection_name 

109 self.db_path = str(self.settings.database_path) 

110 self.conn: t.Any = None # DuckDB connection (sync) 

111 self.onnx_session: InferenceSession | None = None 

112 self.tokenizer: t.Any = None 

113 self.embedding_dim = self.settings.embedding_dim # all-MiniLM-L6-v2 dimension 

114 self._initialized = False 

115 

116 def __enter__(self) -> t.Self: 

117 """Sync context manager entry (not recommended - use async).""" 

118 msg = "Use 'async with' instead of 'with' for ReflectionDatabaseAdapterOneiric" 

119 raise RuntimeError(msg) 

120 

121 def __exit__( 

122 self, 

123 exc_type: type[BaseException] | None, 

124 exc_value: BaseException | None, 

125 traceback: TracebackType | None, 

126 ) -> None: 

127 """Sync context manager exit.""" 

128 self.close() 

129 

130 async def __aenter__(self) -> t.Self: 

131 """Async context manager entry.""" 

132 await self.initialize() 

133 return self 

134 

135 async def __aexit__( 

136 self, 

137 exc_type: type[BaseException] | None, 

138 exc_value: BaseException | None, 

139 traceback: TracebackType | None, 

140 ) -> None: 

141 """Async context manager exit with cleanup.""" 

142 await self.aclose() 

143 

144 def close(self) -> None: 

145 """Close adapter connections (sync version for compatibility).""" 

146 try: 

147 loop = asyncio.get_running_loop() 

148 except RuntimeError: 

149 loop = None 

150 

151 if loop and loop.is_running(): 151 ↛ 169line 151 didn't jump to line 169 because the condition on line 151 was always true

152 task = loop.create_task(self.aclose()) 

153 

154 def _consume_result(future: asyncio.Future[t.Any]) -> None: 

155 try: 

156 future.result() 

157 except asyncio.CancelledError: 

158 # Task was cancelled during shutdown, which is expected 

159 pass 

160 except Exception: 

161 # Log other exceptions if needed 

162 logger.debug( 

163 "Exception in ReflectionDatabaseAdapterOneiric close task", 

164 exc_info=True, 

165 ) 

166 

167 task.add_done_callback(_consume_result) 

168 else: 

169 asyncio.run(self.aclose()) 

170 

171 async def aclose(self) -> None: 

172 """Close adapter connections (async).""" 

173 if self.conn: 173 ↛ 177line 173 didn't jump to line 177 because the condition on line 173 was always true

174 with suppress(Exception): 

175 self.conn.close() 

176 self.conn = None 

177 self._initialized = False 

178 

179 async def initialize(self) -> None: 

180 """Initialize DuckDB connection and create tables if needed.""" 

181 if self._initialized: 

182 return 

183 

184 if not DUCKDB_AVAILABLE: 184 ↛ 185line 184 didn't jump to line 185 because the condition on line 184 was never true

185 msg = "DuckDB not available. Install with: uv add duckdb" 

186 raise ImportError(msg) 

187 

188 # Create database directory if it doesn't exist 

189 db_dir = Path(self.db_path).parent 

190 db_dir.mkdir(parents=True, exist_ok=True) 

191 

192 # Connect to DuckDB database 

193 self.conn = duckdb.connect(database=self.db_path, read_only=False) 

194 

195 # Enable vector extension if available 

196 with suppress(Exception): 

197 self.conn.execute("INSTALL 'httpfs';") 

198 self.conn.execute("LOAD 'httpfs';") 

199 

200 # Create tables if they don't exist 

201 self._create_tables() 

202 

203 # Initialize ONNX embedding model if embeddings are enabled 

204 if self.settings.enable_embeddings and ONNX_AVAILABLE: 

205 await self._init_embedding_model() 

206 

207 self._initialized = True 

208 

209 def _create_tables(self) -> None: 

210 """Create database tables if they don't exist.""" 

211 # Create conversations table 

212 self.conn.execute( 

213 f""" 

214 CREATE TABLE IF NOT EXISTS {self.collection_name}_conversations ( 

215 id VARCHAR PRIMARY KEY, 

216 content TEXT NOT NULL, 

217 metadata JSON, 

218 created_at TIMESTAMP NOT NULL, 

219 updated_at TIMESTAMP NOT NULL, 

220 embedding FLOAT[{self.embedding_dim}] 

221 ) 

222 """ 

223 ) 

224 

225 # Create reflections table 

226 self.conn.execute( 

227 f""" 

228 CREATE TABLE IF NOT EXISTS {self.collection_name}_reflections ( 

229 id VARCHAR PRIMARY KEY, 

230 conversation_id VARCHAR, 

231 content TEXT NOT NULL, 

232 tags VARCHAR[], 

233 metadata JSON, 

234 created_at TIMESTAMP NOT NULL, 

235 updated_at TIMESTAMP NOT NULL, 

236 embedding FLOAT[{self.embedding_dim}], 

237 FOREIGN KEY (conversation_id) REFERENCES {self.collection_name}_conversations(id) 

238 ) 

239 """ 

240 ) 

241 

242 # Create indices for faster search 

243 self.conn.execute( 

244 f"CREATE INDEX IF NOT EXISTS idx_{self.collection_name}_conv_created ON {self.collection_name}_conversations(created_at)" 

245 ) 

246 self.conn.execute( 

247 f"CREATE INDEX IF NOT EXISTS idx_{self.collection_name}_refl_created ON {self.collection_name}_reflections(created_at)" 

248 ) 

249 

250 async def _init_embedding_model(self) -> None: 

251 """Initialize ONNX embedding model.""" 

252 if not ONNX_AVAILABLE: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true

253 return 

254 

255 assert AutoTokenizer is not None 

256 assert ort is not None 

257 

258 # Use Xenova's pre-converted ONNX model (no PyTorch required) 

259 model_name = "Xenova/all-MiniLM-L6-v2" 

260 

261 # Load tokenizer 

262 self.tokenizer = AutoTokenizer.from_pretrained(model_name) 

263 

264 # Load ONNX model from onnx/ subdirectory 

265 try: 

266 from huggingface_hub import snapshot_download 

267 

268 # Get the actual cache directory for this model 

269 cache_dir = snapshot_download( 

270 repo_id=model_name, allow_patterns=["onnx/model.onnx"] 

271 ) 

272 onnx_path = str(Path(cache_dir) / "onnx" / "model.onnx") 

273 

274 self.onnx_session = ort.InferenceSession( 

275 onnx_path, 

276 providers=["CPUExecutionProvider"], 

277 ) 

278 logger.info("✅ ONNX model loaded successfully (Xenova/all-MiniLM-L6-v2)") 

279 except Exception as e: 

280 logger.warning(f"Failed to load ONNX model from {model_name}: {e}") 

281 self.onnx_session = None 

282 

283 async def _generate_embedding(self, text: str) -> list[float] | None: 

284 """Generate embedding for text using ONNX model.""" 

285 if not self.onnx_session or not self.tokenizer: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true

286 return None 

287 

288 try: 

289 # Tokenize input (use NumPy to avoid PyTorch dependency) 

290 inputs = self.tokenizer( 

291 text, 

292 return_tensors="np", 

293 padding=True, 

294 truncation=True, 

295 max_length=256, 

296 ) 

297 

298 # Get numpy arrays directly (no conversion needed) 

299 input_ids = inputs["input_ids"] 

300 attention_mask = inputs["attention_mask"] 

301 token_type_ids = inputs.get("token_type_ids", None) 

302 

303 # Run inference 

304 ort_inputs = { 

305 "input_ids": input_ids, 

306 "attention_mask": attention_mask, 

307 } 

308 if token_type_ids is not None: 308 ↛ 312line 308 didn't jump to line 312 because the condition on line 308 was always true

309 ort_inputs["token_type_ids"] = token_type_ids 

310 

311 # Get embeddings (shape: [batch, seq_len, 384]) 

312 outputs = self.onnx_session.run(None, ort_inputs) 

313 last_hidden_state = outputs[0] # Shape: [1, seq_len, 384] 

314 

315 # Apply mean pooling to get sentence embedding 

316 # Expand attention_mask to match embedding dimensions 

317 input_mask_expanded = np.expand_dims( 

318 attention_mask, axis=-1 

319 ) # [1, seq_len, 1] 

320 input_mask_expanded = np.broadcast_to( 

321 input_mask_expanded, last_hidden_state.shape 

322 ) 

323 

324 # Weighted sum of embeddings (masked tokens have 0 weight) 

325 sum_embeddings = np.sum( 

326 last_hidden_state * input_mask_expanded, axis=1 

327 ) # [1, 384] 

328 

329 # Sum of mask (number of real tokens, not padding) 

330 sum_mask = np.maximum(np.sum(input_mask_expanded, axis=1), 1e-9) # [1, 384] 

331 

332 # Mean pooling 

333 mean_pooled = sum_embeddings / sum_mask # [1, 384] 

334 

335 # Normalize to unit length 

336 embeddings = mean_pooled / np.linalg.norm( 

337 mean_pooled, axis=1, keepdims=True 

338 ) 

339 

340 # Return [384] as list 

341 result = embeddings[0].tolist() 

342 return t.cast("list[float]", result) 

343 except Exception as e: 

344 logger.warning(f"Failed to generate embedding: {e}") 

345 return None 

346 

347 def _generate_id(self, content: str) -> str: 

348 """Generate deterministic ID from content.""" 

349 content_bytes = content.encode("utf-8") 

350 hash_obj = hashlib.sha256(content_bytes) 

351 return hash_obj.hexdigest()[:16] 

352 

353 async def store_conversation( 

354 self, content: str, metadata: dict[str, t.Any] | None = None 

355 ) -> str: 

356 """Store a conversation in the database. 

357 

358 Args: 

359 content: Conversation content 

360 metadata: Optional metadata 

361 

362 Returns: 

363 Conversation ID 

364 

365 """ 

366 if not self._initialized: 366 ↛ 367line 366 didn't jump to line 367 because the condition on line 366 was never true

367 await self.initialize() 

368 

369 conv_id = self._generate_id(content) 

370 now = datetime.now(UTC) 

371 metadata_json = json.dumps(metadata or {}) 

372 

373 # Generate embedding if enabled 

374 embedding = None 

375 if self.settings.enable_embeddings: 

376 embedding = await self._generate_embedding(content) 

377 

378 # Store conversation 

379 if embedding: 

380 self.conn.execute( 

381 f""" 

382 INSERT INTO {self.collection_name}_conversations 

383 (id, content, metadata, created_at, updated_at, embedding) 

384 VALUES (?, ?, ?, ?, ?, ?) 

385 ON CONFLICT(id) DO UPDATE SET 

386 content = excluded.content, 

387 metadata = excluded.metadata, 

388 updated_at = excluded.updated_at, 

389 embedding = excluded.embedding 

390 """, 

391 [ 

392 conv_id, 

393 content, 

394 metadata_json, 

395 now, 

396 now, 

397 embedding, 

398 ], 

399 ) 

400 else: 

401 self.conn.execute( 

402 f""" 

403 INSERT INTO {self.collection_name}_conversations 

404 (id, content, metadata, created_at, updated_at) 

405 VALUES (?, ?, ?, ?, ?) 

406 ON CONFLICT(id) DO UPDATE SET 

407 content = excluded.content, 

408 metadata = excluded.metadata, 

409 updated_at = excluded.updated_at 

410 """, 

411 [conv_id, content, metadata_json, now, now], 

412 ) 

413 

414 return conv_id 

415 

416 async def search_conversations( 

417 self, 

418 query: str, 

419 limit: int = 10, 

420 threshold: float = 0.7, 

421 project: str | None = None, 

422 min_score: float | None = None, 

423 ) -> list[dict[str, t.Any]]: 

424 """Search conversations using vector similarity. 

425 

426 Args: 

427 query: Search query 

428 limit: Maximum number of results 

429 threshold: Minimum similarity score (0.0 to 1.0) 

430 project: Optional project filter (not yet implemented) 

431 min_score: Alias for threshold (for backward compatibility) 

432 

433 Returns: 

434 List of matching conversations with scores 

435 

436 """ 

437 # Use min_score as threshold if provided (backward compatibility) 

438 if min_score is not None: 

439 threshold = min_score 

440 if not self._initialized: 440 ↛ 441line 440 didn't jump to line 441 because the condition on line 440 was never true

441 await self.initialize() 

442 

443 results = [] 

444 

445 # Generate query embedding 

446 query_embedding = None 

447 if self.settings.enable_embeddings: 

448 query_embedding = await self._generate_embedding(query) 

449 

450 if query_embedding and self.settings.enable_vss: 

451 # Vector similarity search using DuckDB's array_cosine_similarity 

452 vector_query = f"[{', '.join(map(str, query_embedding))}]" 

453 result = self.conn.execute( 

454 f""" 

455 SELECT 

456 id, content, metadata, created_at, updated_at, 

457 array_cosine_similarity(embedding, '{vector_query}'::FLOAT[{self.embedding_dim}]) as score 

458 FROM {self.collection_name}_conversations 

459 WHERE embedding IS NOT NULL 

460 ORDER BY score DESC 

461 LIMIT ? 

462 """, 

463 [limit], 

464 ).fetchall() 

465 

466 for row in result: 

467 if row[5] >= threshold: # score column 467 ↛ 468line 467 didn't jump to line 468 because the condition on line 467 was never true

468 results.append( 

469 { 

470 "id": row[0], 

471 "content": row[1], 

472 "metadata": json.loads(row[2]) if row[2] else {}, 

473 "created_at": row[3], 

474 "updated_at": row[4], 

475 "score": float(row[5]), 

476 } 

477 ) 

478 else: 

479 # Fallback to text search 

480 result = self.conn.execute( 

481 f""" 

482 SELECT id, content, metadata, created_at, updated_at 

483 FROM {self.collection_name}_conversations 

484 WHERE content LIKE ? 

485 ORDER BY updated_at DESC 

486 LIMIT ? 

487 """, 

488 [f"%{query}%", limit], 

489 ).fetchall() 

490 

491 for row in result: 

492 results.append( 

493 { 

494 "id": row[0], 

495 "content": row[1], 

496 "metadata": json.loads(row[2]) if row[2] else {}, 

497 "created_at": row[3], 

498 "updated_at": row[4], 

499 "score": 1.0, # Text search gets maximum score 

500 } 

501 ) 

502 

503 return results 

504 

505 async def get_stats(self) -> dict[str, t.Any]: 

506 """Get database statistics. 

507 

508 Returns: 

509 Dictionary with statistics 

510 

511 """ 

512 if not self._initialized: 512 ↛ 513line 512 didn't jump to line 513 because the condition on line 512 was never true

513 await self.initialize() 

514 

515 # Get conversation count 

516 conv_count = self.conn.execute( 

517 f"SELECT COUNT(*) FROM {self.collection_name}_conversations" 

518 ).fetchone()[0] 

519 

520 # Get reflection count 

521 refl_count = self.conn.execute( 

522 f"SELECT COUNT(*) FROM {self.collection_name}_reflections" 

523 ).fetchone()[0] 

524 

525 # Get embedding stats 

526 embedding_count = self.conn.execute( 

527 f"SELECT COUNT(*) FROM {self.collection_name}_conversations WHERE embedding IS NOT NULL" 

528 ).fetchone()[0] 

529 

530 return { 

531 "total_conversations": conv_count, 

532 "total_reflections": refl_count, 

533 "conversations_with_embeddings": embedding_count, 

534 "database_path": self.db_path, 

535 "collection_name": self.collection_name, 

536 } 

537 

538 async def store_reflection( 

539 self, content: str, tags: list[str] | None = None 

540 ) -> str: 

541 """Store a reflection with optional tags. 

542 

543 Args: 

544 content: Reflection text content 

545 tags: Optional list of tags for categorization 

546 

547 Returns: 

548 Unique reflection ID 

549 

550 """ 

551 if not self._initialized: 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true

552 await self.initialize() 

553 

554 reflection_id = str(uuid.uuid4()) 

555 now = datetime.now(tz=UTC) 

556 

557 # Generate embedding if available 

558 embedding: list[float] | None = None 

559 if ONNX_AVAILABLE and self.onnx_session: 

560 try: 

561 embedding = await self._generate_embedding(content) 

562 except Exception: 

563 embedding = None 

564 

565 # Store reflection 

566 if embedding: 

567 self.conn.execute( 

568 f""" 

569 INSERT INTO {self.collection_name}_reflections 

570 (id, content, tags, embedding, created_at, updated_at) 

571 VALUES (?, ?, ?, ?, ?, ?) 

572 """, 

573 ( 

574 reflection_id, 

575 content, 

576 tags or [], 

577 embedding, 

578 now, 

579 now, 

580 ), 

581 ) 

582 else: 

583 self.conn.execute( 

584 f""" 

585 INSERT INTO {self.collection_name}_reflections 

586 (id, content, tags, created_at, updated_at) 

587 VALUES (?, ?, ?, ?, ?) 

588 """, 

589 ( 

590 reflection_id, 

591 content, 

592 tags or [], 

593 now, 

594 now, 

595 ), 

596 ) 

597 

598 return reflection_id 

599 

600 async def search_reflections( 

601 self, query: str, limit: int = 10, use_embeddings: bool = True 

602 ) -> list[dict[str, t.Any]]: 

603 """Search reflections by content or tags. 

604 

605 Args: 

606 query: Search query 

607 limit: Maximum number of results 

608 use_embeddings: Whether to use semantic search if embeddings available 

609 

610 Returns: 

611 List of matching reflections 

612 

613 """ 

614 if not self._initialized: 614 ↛ 615line 614 didn't jump to line 615 because the condition on line 614 was never true

615 await self.initialize() 

616 

617 if use_embeddings and ONNX_AVAILABLE and self.onnx_session: 

618 return await self._semantic_search_reflections(query, limit) 

619 return await self._text_search_reflections(query, limit) 

620 

621 async def _semantic_search_reflections( 

622 self, query: str, limit: int = 10 

623 ) -> list[dict[str, t.Any]]: 

624 """Perform semantic search on reflections using embeddings.""" 

625 if not self._initialized: 625 ↛ 626line 625 didn't jump to line 626 because the condition on line 625 was never true

626 await self.initialize() 

627 

628 # Generate query embedding 

629 query_embedding = await self._generate_embedding(query) 

630 if not query_embedding: 630 ↛ 631line 630 didn't jump to line 631 because the condition on line 630 was never true

631 return await self._text_search_reflections(query, limit) 

632 

633 # Perform vector similarity search 

634 results = self.conn.execute( 

635 f""" 

636 SELECT id, content, tags, created_at, updated_at, 

637 vector_cosine_similarity(embedding, ?) as similarity 

638 FROM {self.collection_name}_reflections 

639 WHERE embedding IS NOT NULL 

640 ORDER BY similarity DESC 

641 LIMIT ? 

642 """, 

643 (query_embedding, limit), 

644 ).fetchall() 

645 

646 return [ 

647 { 

648 "id": row[0], 

649 "content": row[1], 

650 "tags": list(row[2]) if row[2] else [], 

651 "created_at": row[3].isoformat() if row[3] else None, 

652 "updated_at": row[4].isoformat() if row[4] else None, 

653 "similarity": row[5] or 0.0, 

654 } 

655 for row in results 

656 ] 

657 

658 async def _text_search_reflections( 

659 self, query: str, limit: int = 10 

660 ) -> list[dict[str, t.Any]]: 

661 """Perform text search on reflections.""" 

662 if not self._initialized: 662 ↛ 663line 662 didn't jump to line 663 because the condition on line 662 was never true

663 await self.initialize() 

664 

665 results = self.conn.execute( 

666 f""" 

667 SELECT id, content, tags, created_at, updated_at 

668 FROM {self.collection_name}_reflections 

669 WHERE content LIKE ? OR list_contains(tags, ?) 

670 ORDER BY created_at DESC 

671 LIMIT ? 

672 """, 

673 (f"%{query}%", query, limit), 

674 ).fetchall() 

675 

676 return [ 

677 { 

678 "id": row[0], 

679 "content": row[1], 

680 "tags": list(row[2]) if row[2] else [], 

681 "created_at": row[3].isoformat() if row[3] else None, 

682 "updated_at": row[4].isoformat() if row[4] else None, 

683 } 

684 for row in results 

685 ] 

686 

687 async def get_reflection_by_id(self, reflection_id: str) -> dict[str, t.Any] | None: 

688 """Get a reflection by its ID. 

689 

690 Args: 

691 reflection_id: Reflection ID 

692 

693 Returns: 

694 Reflection dictionary or None if not found 

695 

696 """ 

697 if not self._initialized: 697 ↛ 698line 697 didn't jump to line 698 because the condition on line 697 was never true

698 await self.initialize() 

699 

700 result = self.conn.execute( 

701 f""" 

702 SELECT id, content, tags, created_at, updated_at 

703 FROM {self.collection_name}_reflections 

704 WHERE id = ? 

705 """, 

706 (reflection_id,), 

707 ).fetchone() 

708 

709 if not result: 

710 return None 

711 

712 return { 

713 "id": result[0], 

714 "content": result[1], 

715 "tags": list(result[2]) if result[2] else [], 

716 "created_at": result[3].isoformat() if result[3] else None, 

717 "updated_at": result[4].isoformat() if result[4] else None, 

718 } 

719 

720 async def similarity_search( 

721 self, query: str, limit: int = 10 

722 ) -> list[dict[str, t.Any]]: 

723 """Perform similarity search across both conversations and reflections. 

724 

725 Args: 

726 query: Search query 

727 limit: Maximum number of results 

728 

729 Returns: 

730 List of matching items with type information 

731 

732 """ 

733 if not self._initialized: 733 ↛ 734line 733 didn't jump to line 734 because the condition on line 733 was never true

734 await self.initialize() 

735 

736 # Search conversations 

737 conv_results = await self.search_conversations(query, limit) 

738 

739 # Search reflections 

740 refl_results = await self.search_reflections(query, limit) 

741 

742 # Combine and limit results 

743 combined = [{"type": "conversation"} | result for result in conv_results] + [ 

744 {"type": "reflection"} | result for result in refl_results 

745 ] 

746 

747 return combined[:limit] 

748 

749 async def reset_database(self) -> None: 

750 """Reset the database by dropping and recreating tables.""" 

751 if not self._initialized: 

752 await self.initialize() 

753 

754 # Drop foreign key constraints first, then tables 

755 try: 

756 # Drop reflections table first (has foreign key to conversations) 

757 self.conn.execute( 

758 f"DROP TABLE IF EXISTS {self.collection_name}_reflections" 

759 ) 

760 # Then drop conversations table 

761 self.conn.execute( 

762 f"DROP TABLE IF EXISTS {self.collection_name}_conversations" 

763 ) 

764 except Exception: 

765 # If there are issues, try dropping with CASCADE 

766 self.conn.execute( 

767 f"DROP TABLE IF EXISTS {self.collection_name}_reflections CASCADE" 

768 ) 

769 self.conn.execute( 

770 f"DROP TABLE IF EXISTS {self.collection_name}_conversations CASCADE" 

771 ) 

772 

773 # Recreate tables 

774 self._create_tables() 

775 

776 async def health_check(self) -> bool: 

777 """Check if database is healthy. 

778 

779 Returns: 

780 True if database is healthy, False otherwise 

781 

782 """ 

783 try: 

784 if not self._initialized: 

785 await self.initialize() 

786 # Simple query to test connection 

787 self.conn.execute("SELECT 1").fetchone() 

788 return True 

789 except Exception: 

790 return False 

791 

792 

793# Alias for backward compatibility 

794ReflectionDatabase = ReflectionDatabaseAdapterOneiric 

795 

796 

797__all__ = [ 

798 "ReflectionDatabase", 

799 "ReflectionDatabaseAdapterOneiric", 

800]