Coverage for session_buddy / adapters / knowledge_graph_adapter_oneiric.py: 79.52%
197 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1"""Oneiric-compatible knowledge graph adapter using native DuckDB implementation.
3Provides a Oneiric-compatible knowledge graph adapter that maintains the existing
4KnowledgeGraphDatabase API while using native DuckDB operations instead of ACB.
6Phase 5: Oneiric Adapter Conversion - Knowledge Graph Adapter
8Key Features:
9 - Native DuckDB PGQ extension for property graph queries
10 - Oneiric settings and lifecycle management
11 - Backward-compatible API with existing KnowledgeGraphDatabase
12 - No ACB dependencies
13 - Fast local/in-memory operations
15"""
17from __future__ import annotations
19import json
20import typing as t
21import uuid
22from contextlib import suppress
23from datetime import UTC, datetime
25from session_buddy.adapters.settings import KnowledgeGraphAdapterSettings
27if t.TYPE_CHECKING:
28 from pathlib import Path
29 from types import TracebackType
31 import duckdb
33# DuckDB will be imported at runtime
34DUCKDB_AVAILABLE = True
35try:
36 import duckdb
37except ImportError:
38 DUCKDB_AVAILABLE = False
39 if t.TYPE_CHECKING:
40 # Type stub for type checking when duckdb is not installed
41 import types
43 duckdb = types.SimpleNamespace() # type: ignore[misc,assignment]
46class KnowledgeGraphDatabaseAdapterOneiric:
47 """Oneiric-compatible knowledge graph adapter using native DuckDB.
49 This adapter provides the same API as the ACB-based KnowledgeGraphDatabaseAdapter
50 but uses native DuckDB operations and Oneiric settings instead of ACB configuration.
52 Key differences from ACB implementation:
53 - Uses Oneiric settings (dataclass-based) instead of ACB Config
54 - No ACB dependency injection
55 - Same hybrid sync/async pattern (sync DuckDB ops, async interface)
56 - Maintains full API compatibility
58 Example:
59 >>> settings = KnowledgeGraphAdapterSettings.from_settings()
60 >>> async with KnowledgeGraphDatabaseAdapterOneiric(settings=settings) as kg:
61 >>> entity = await kg.create_entity("project", "project", ["observation"])
62 >>> relation = await kg.create_relation("proj1", "proj2", "depends_on")
64 """
66 def __init__(
67 self,
68 db_path: str | Path | None = None,
69 settings: KnowledgeGraphAdapterSettings | None = None,
70 ) -> None:
71 """Initialize adapter with optional database path.
73 Args:
74 db_path: Path to DuckDB database file. If None, uses path from settings.
75 settings: KnowledgeGraphAdapterSettings instance. If None, creates from defaults.
77 """
78 self.db_path = str(db_path) if db_path else None
79 self.settings = settings or KnowledgeGraphAdapterSettings.from_settings()
80 self.conn: t.Any = None # DuckDB connection (sync)
81 self._duckpgq_installed = False
82 self._initialized = False
84 def __enter__(self) -> t.Self:
85 """Sync context manager entry (not recommended - use async)."""
86 msg = "Use 'async with' instead of 'with' for KnowledgeGraphDatabaseAdapterOneiric"
87 raise RuntimeError(msg)
89 def __exit__(
90 self,
91 exc_type: type[BaseException] | None,
92 exc_value: BaseException | None,
93 traceback: TracebackType | None,
94 ) -> None:
95 """Sync context manager exit."""
97 async def __aenter__(self) -> t.Self:
98 """Async context manager entry."""
99 await self.initialize()
100 return self
102 async def __aexit__(
103 self,
104 exc_type: type[BaseException] | None,
105 exc_value: BaseException | None,
106 traceback: TracebackType | None,
107 ) -> None:
108 """Async context manager exit with cleanup."""
109 self.close()
111 def close(self) -> None:
112 """Close DuckDB connection."""
113 if self.conn is not None:
114 self.conn.close()
115 self.conn = None
117 def __del__(self) -> None:
118 """Destructor to ensure cleanup."""
119 self.close()
121 def _get_db_path(self) -> str:
122 """Get database path from settings or use default.
124 Returns:
125 Database file path
127 """
128 # Prefer instance path when provided
129 if self.db_path:
130 return self.db_path
132 # Use settings path
133 return str(self.settings.database_path)
135 async def initialize(self) -> None:
136 """Initialize DuckDB connection and create schema.
138 This method:
139 1. Gets database path from settings
140 2. Creates sync DuckDB connection (fast, local)
141 3. Installs and loads DuckPGQ extension
142 4. Creates knowledge graph schema
143 """
144 if self._initialized:
145 return
147 if not DUCKDB_AVAILABLE: 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true
148 msg = "DuckDB not available. Install with: uv add duckdb"
149 raise ImportError(msg)
151 # Get database path
152 db_path = self._get_db_path()
154 # Create sync DuckDB connection (fast, local operation)
155 self.conn = duckdb.connect(db_path)
157 # Install and load DuckPGQ extension
158 try:
159 extensions = self.settings.install_extensions
160 for extension in extensions:
161 self.conn.execute(f"INSTALL {extension} FROM community")
162 self.conn.execute(f"LOAD {extension}")
163 self._duckpgq_installed = True
164 except Exception as e:
165 msg = f"Failed to install DuckPGQ extension: {e}"
166 raise RuntimeError(msg) from e
168 # Create schema (sync operations, complete quickly)
169 await self._create_schema()
171 self._initialized = True
173 def _get_conn(self) -> t.Any:
174 """Get DuckDB connection, raising error if not initialized.
176 Returns:
177 Active DuckDB connection
179 Raises:
180 RuntimeError: If connection not initialized
182 """
183 if self.conn is None: 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true
184 msg = "Database connection not initialized. Call initialize() first"
185 raise RuntimeError(msg)
186 return self.conn
188 async def _resolve_entity_id(self, identifier: str) -> str:
189 """Resolve an entity identifier to its canonical ID."""
190 entity = await self.find_entity_by_name(identifier)
191 if entity: 191 ↛ 196line 191 didn't jump to line 196 because the condition on line 191 was always true
192 # Type narrow entity["id"] to str
193 entity_id = entity["id"]
194 return entity_id if isinstance(entity_id, str) else str(entity_id)
196 row = (
197 self._get_conn()
198 .execute(
199 "SELECT id FROM kg_entities WHERE id = ?",
200 (identifier,),
201 )
202 .fetchone()
203 )
204 if row:
205 # Type narrow row[0] to str (id column is TEXT in SQL)
206 return row[0] if isinstance(row[0], str) else str(row[0])
208 msg = f"Entity '{identifier}' not found"
209 raise ValueError(msg)
211 def _format_timestamp(self, value: t.Any) -> str | None:
212 """Format timestamps consistently across DuckDB outputs."""
213 if value is None: 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true
214 return None
215 return value.isoformat() if hasattr(value, "isoformat") else str(value)
217 async def _create_schema(self) -> None:
218 """Create knowledge graph schema.
220 Creates:
221 - kg_entities table (nodes)
222 - kg_relationships table (edges)
223 - Indexes for performance
225 Note: Executes synchronously but completes quickly (local operation)
226 """
227 conn = self._get_conn()
229 # Create entities table (nodes/vertices)
230 conn.execute("""
231 CREATE TABLE IF NOT EXISTS kg_entities (
232 id VARCHAR PRIMARY KEY,
233 name VARCHAR NOT NULL,
234 entity_type VARCHAR NOT NULL,
235 observations VARCHAR[],
236 properties JSON,
237 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
238 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
239 metadata JSON
240 )
241 """)
243 # Create relationships table (edges)
244 conn.execute("""
245 CREATE TABLE IF NOT EXISTS kg_relationships (
246 id VARCHAR PRIMARY KEY,
247 from_entity VARCHAR NOT NULL,
248 to_entity VARCHAR NOT NULL,
249 relation_type VARCHAR NOT NULL,
250 properties JSON,
251 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
252 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
253 metadata JSON
254 )
255 """)
257 # Ensure columns exist when DuckPGQ pre-creates tables without all fields.
258 relationship_columns = {
259 row[1]
260 for row in conn.execute("PRAGMA table_info('kg_relationships')").fetchall()
261 }
262 if "updated_at" not in relationship_columns: 262 ↛ 263line 262 didn't jump to line 263 because the condition on line 262 was never true
263 conn.execute(
264 "ALTER TABLE kg_relationships "
265 "ADD COLUMN updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP"
266 )
268 # Create indexes for performance
269 conn.execute(
270 "CREATE INDEX IF NOT EXISTS idx_entities_name ON kg_entities(name)",
271 )
272 conn.execute(
273 "CREATE INDEX IF NOT EXISTS idx_entities_type ON kg_entities(entity_type)",
274 )
275 conn.execute(
276 "CREATE INDEX IF NOT EXISTS idx_relationships_from "
277 "ON kg_relationships(from_entity)",
278 )
279 conn.execute(
280 "CREATE INDEX IF NOT EXISTS idx_relationships_to "
281 "ON kg_relationships(to_entity)",
282 )
283 conn.execute(
284 "CREATE INDEX IF NOT EXISTS idx_relationships_type "
285 "ON kg_relationships(relation_type)",
286 )
288 async def create_entity(
289 self,
290 name: str,
291 entity_type: str,
292 observations: list[str] | None = None,
293 properties: dict[str, t.Any] | None = None,
294 metadata: dict[str, t.Any] | None = None,
295 ) -> dict[str, t.Any]:
296 """Create a new entity (node) in the knowledge graph.
298 Args:
299 name: Entity name (must be unique)
300 entity_type: Type/category of entity
301 observations: List of observation strings
302 properties: Additional properties as key-value pairs
303 metadata: Additional metadata
305 Returns:
306 Created entity as dictionary
308 Raises:
309 ValueError: If entity with name already exists
311 """
312 conn = self._get_conn()
314 # Check if entity already exists
315 existing = await self.find_entity_by_name(name)
316 if existing: 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true
317 msg = f"Entity with name '{name}' already exists"
318 raise ValueError(msg)
320 entity_id = str(uuid.uuid4())
321 now = datetime.now(tz=UTC)
323 # Sync DuckDB execution (fast, local operation)
324 conn.execute(
325 """
326 INSERT INTO kg_entities
327 (id, name, entity_type, observations, properties, created_at, updated_at, metadata)
328 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
329 """,
330 (
331 entity_id,
332 name,
333 entity_type,
334 observations or [],
335 json.dumps(properties or {}),
336 now,
337 now,
338 json.dumps(metadata or {}),
339 ),
340 )
342 return {
343 "id": entity_id,
344 "name": name,
345 "entity_type": entity_type,
346 "observations": observations or [],
347 "properties": properties or {},
348 "created_at": now.isoformat(),
349 "updated_at": now.isoformat(),
350 "metadata": metadata or {},
351 }
353 async def get_entity(self, entity_id: str) -> dict[str, t.Any] | None:
354 """Get entity by ID.
356 Args:
357 entity_id: Entity UUID
359 Returns:
360 Entity dictionary or None if not found
362 """
363 conn = self._get_conn()
365 result = conn.execute(
366 "SELECT * FROM kg_entities WHERE id = ?",
367 (entity_id,),
368 ).fetchone()
370 if not result:
371 return None
373 return {
374 "id": result[0],
375 "name": result[1],
376 "entity_type": result[2],
377 "observations": list(result[3]) if result[3] else [],
378 "properties": json.loads(result[4]) if result[4] else {},
379 "created_at": self._format_timestamp(result[5]),
380 "updated_at": self._format_timestamp(result[6]),
381 "metadata": json.loads(result[7]) if result[7] else {},
382 }
384 async def find_entity_by_name(self, name: str) -> dict[str, t.Any] | None:
385 """Find entity by name.
387 Args:
388 name: Entity name to search for
390 Returns:
391 Entity dictionary or None if not found
393 """
394 conn = self._get_conn()
396 result = conn.execute(
397 """
398 SELECT id, name, entity_type, observations, properties,
399 created_at, updated_at, metadata
400 FROM kg_entities
401 WHERE name = ?
402 """,
403 (name,),
404 ).fetchone()
406 if not result:
407 return None
409 return {
410 "id": result[0],
411 "name": result[1],
412 "entity_type": result[2],
413 "observations": list(result[3]) if result[3] else [],
414 "properties": json.loads(result[4]) if result[4] else {},
415 "created_at": result[5].isoformat() if result[5] else None,
416 "updated_at": result[6].isoformat() if result[6] else None,
417 "metadata": json.loads(result[7]) if result[7] else {},
418 }
420 async def create_relation(
421 self,
422 from_entity: str,
423 to_entity: str,
424 relation_type: str,
425 properties: dict[str, t.Any] | None = None,
426 metadata: dict[str, t.Any] | None = None,
427 ) -> dict[str, t.Any]:
428 """Create a relationship (edge) between two entities.
430 Args:
431 from_entity: Source entity name
432 to_entity: Target entity name
433 relation_type: Type of relationship
434 properties: Additional properties
435 metadata: Additional metadata
437 Returns:
438 Created relationship as dictionary
440 Raises:
441 ValueError: If either entity doesn't exist
443 """
444 conn = self._get_conn()
446 # Resolve entity identifiers to IDs (accepts names or IDs)
447 resolved_from_entity = await self._resolve_entity_id(from_entity)
448 resolved_to_entity = await self._resolve_entity_id(to_entity)
450 relation_id = str(uuid.uuid4())
451 now = datetime.now(tz=UTC)
453 conn.execute(
454 """
455 INSERT INTO kg_relationships
456 (id, from_entity, to_entity, relation_type, properties, created_at, updated_at, metadata)
457 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
458 """,
459 (
460 relation_id,
461 resolved_from_entity,
462 resolved_to_entity,
463 relation_type,
464 json.dumps(properties or {}),
465 now,
466 now,
467 json.dumps(metadata or {}),
468 ),
469 )
471 return {
472 "id": relation_id,
473 "from_entity": resolved_from_entity,
474 "to_entity": resolved_to_entity,
475 "relation_type": relation_type,
476 "properties": properties or {},
477 "created_at": now.isoformat(),
478 "updated_at": now.isoformat(),
479 "metadata": metadata or {},
480 }
482 async def add_observation(
483 self,
484 entity_name: str,
485 observation: str,
486 ) -> dict[str, t.Any]:
487 """Add an observation to an entity.
489 Args:
490 entity_name: Name of entity to update
491 observation: Observation text to add
493 Returns:
494 Updated entity dictionary
496 Raises:
497 ValueError: If entity doesn't exist
499 """
500 conn = self._get_conn()
502 entity = await self.find_entity_by_name(entity_name)
503 if not entity: 503 ↛ 504line 503 didn't jump to line 504 because the condition on line 503 was never true
504 msg = f"Entity '{entity_name}' not found"
505 raise ValueError(msg)
507 now = datetime.now(tz=UTC)
509 # Append observation to array
510 conn.execute(
511 """
512 UPDATE kg_entities
513 SET observations = list_append(observations, ?),
514 updated_at = ?
515 WHERE name = ?
516 """,
517 (observation, now, entity_name),
518 )
520 # Return updated entity
521 return await self.find_entity_by_name(entity_name) # type: ignore[return-value]
523 async def search_entities(
524 self,
525 query: str | None = None,
526 entity_type: str | None = None,
527 limit: int = 10,
528 ) -> list[dict[str, t.Any]]:
529 """Search for entities by name or observations.
531 Args:
532 query: Search query (matches name and observations)
533 entity_type: Filter by entity type
534 limit: Maximum number of results
536 Returns:
537 List of matching entities
539 """
540 conn = self._get_conn()
542 # Build query dynamically
543 conditions = []
544 params: list[t.Any] = []
546 if query: 546 ↛ 550line 546 didn't jump to line 550 because the condition on line 546 was always true
547 conditions.append("(name LIKE ? OR list_contains(observations, ?))")
548 params.extend([f"%{query}%", query])
550 if entity_type: 550 ↛ 551line 550 didn't jump to line 551 because the condition on line 550 was never true
551 conditions.append("entity_type = ?")
552 params.append(entity_type)
554 where_clause = " AND ".join(conditions) if conditions else "1=1"
555 # Build SQL safely - all user input is parameterized via params list
556 sql = (
557 "SELECT id, name, entity_type, observations, properties, "
558 "created_at, updated_at, metadata "
559 "FROM kg_entities WHERE "
560 + where_clause
561 + " ORDER BY created_at DESC LIMIT ?"
562 )
563 params.append(limit)
565 result = conn.execute(sql, params).fetchall()
567 # Use list comprehension for better readability (refurb FURB138)
568 return [
569 {
570 "id": row[0],
571 "name": row[1],
572 "entity_type": row[2],
573 "observations": list(row[3]) if row[3] else [],
574 "properties": json.loads(row[4]) if row[4] else {},
575 "created_at": self._format_timestamp(row[5]),
576 "updated_at": self._format_timestamp(row[6]),
577 "metadata": json.loads(row[7]) if row[7] else {},
578 }
579 for row in result
580 ]
582 async def get_relationships(
583 self,
584 entity_name: str,
585 relation_type: str | None = None,
586 direction: str = "both",
587 ) -> list[dict[str, t.Any]]:
588 """Get all relationships for a specific entity.
590 Args:
591 entity_name: Name of entity to get relationships for
592 relation_type: Optional filter by relationship type
593 direction: "outgoing", "incoming", or "both" (default)
595 Returns:
596 List of relationships involving this entity
598 """
599 conn = self._get_conn()
600 resolved_entity = await self._resolve_entity_id(entity_name)
602 conditions = []
603 params: list[t.Any] = []
605 if direction == "outgoing": 605 ↛ 606line 605 didn't jump to line 606 because the condition on line 605 was never true
606 conditions.append("from_entity = ?")
607 params.append(resolved_entity)
608 elif direction == "incoming": 608 ↛ 609line 608 didn't jump to line 609 because the condition on line 608 was never true
609 conditions.append("to_entity = ?")
610 params.append(resolved_entity)
611 else: # both
612 conditions.append("(from_entity = ? OR to_entity = ?)")
613 params.extend([resolved_entity, resolved_entity])
615 if relation_type: 615 ↛ 616line 615 didn't jump to line 616 because the condition on line 615 was never true
616 conditions.append("relation_type = ?")
617 params.append(relation_type)
619 where_clause = " AND ".join(conditions)
620 # Build SQL safely - all user input is parameterized via params list
621 sql = (
622 "SELECT id, from_entity, to_entity, relation_type, properties, "
623 "created_at, updated_at, metadata "
624 "FROM kg_relationships WHERE " + where_clause + " ORDER BY created_at DESC"
625 )
627 result = conn.execute(sql, params).fetchall()
629 # Use list comprehension for better readability (refurb FURB138)
630 return [
631 {
632 "id": row[0],
633 "from_entity": row[1],
634 "to_entity": row[2],
635 "relation_type": row[3],
636 "properties": json.loads(row[4]) if row[4] else {},
637 "created_at": self._format_timestamp(row[5]),
638 "updated_at": self._format_timestamp(row[6]),
639 "metadata": json.loads(row[7]) if row[7] else {},
640 }
641 for row in result
642 ]
644 async def find_path(
645 self,
646 from_entity: str,
647 to_entity: str,
648 max_depth: int = 5,
649 ) -> list[dict[str, t.Any]]:
650 """Find paths between two entities using breadth-first search.
652 Args:
653 from_entity: Starting entity name
654 to_entity: Target entity name
655 max_depth: Maximum path length to search
657 Returns:
658 Paths found between entities with hop counts
660 """
661 conn = self._get_conn()
662 resolved_from_entity = await self._resolve_entity_id(from_entity)
663 resolved_to_entity = await self._resolve_entity_id(to_entity)
665 # Get all relationships in one query (sync, fast local operation)
666 result = conn.execute(
667 "SELECT from_entity, to_entity, relation_type FROM kg_relationships",
668 ).fetchall()
670 # Build adjacency list
671 graph: dict[str, list[tuple[str, str]]] = {}
672 for row in result:
673 from_e = row[0]
674 to_e = row[1]
675 rel_type = row[2]
677 if from_e not in graph: 677 ↛ 679line 677 didn't jump to line 679 because the condition on line 677 was always true
678 graph[from_e] = []
679 graph[from_e].append((to_e, rel_type))
681 # BFS to find shortest path
682 from collections import deque
684 queue: deque[tuple[str, list[str], list[str]]] = deque(
685 [(resolved_from_entity, [resolved_from_entity], [])],
686 )
687 visited = {resolved_from_entity}
689 paths: list[dict[str, t.Any]] = []
690 while queue and not paths: # Find first path only (refurb FURB115) 690 ↛ 711line 690 didn't jump to line 711 because the condition on line 690 was always true
691 current, path, relations = queue.popleft()
693 if len(path) > max_depth + 1: 693 ↛ 694line 693 didn't jump to line 694 because the condition on line 693 was never true
694 continue
696 if current == resolved_to_entity and len(path) > 1:
697 paths.append(
698 {
699 "path": path,
700 "relations": relations,
701 "hops": len(path) - 1,
702 },
703 )
704 break
706 for neighbor, rel_type in graph.get(current, []):
707 if neighbor not in visited: 707 ↛ 706line 707 didn't jump to line 706 because the condition on line 707 was always true
708 visited.add(neighbor)
709 queue.append((neighbor, [*path, neighbor], [*relations, rel_type]))
711 return paths
713 async def get_stats(self) -> dict[str, t.Any]:
714 """Get statistics about the knowledge graph.
716 Returns:
717 Summary with entity count, relationship count, type distributions
719 """
720 conn = self._get_conn()
722 # Entity count
723 entity_count = conn.execute("SELECT COUNT(*) FROM kg_entities").fetchone()[0]
725 # Relationship count
726 relationship_count = conn.execute(
727 "SELECT COUNT(*) FROM kg_relationships",
728 ).fetchone()[0]
730 # Entity types distribution
731 entity_types_result = conn.execute(
732 """
733 SELECT entity_type, COUNT(*) as count
734 FROM kg_entities
735 GROUP BY entity_type
736 """,
737 ).fetchall()
738 entity_types = {row[0]: row[1] for row in entity_types_result}
740 # Relationship types distribution
741 relationship_types_result = conn.execute(
742 """
743 SELECT relation_type, COUNT(*) as count
744 FROM kg_relationships
745 GROUP BY relation_type
746 """,
747 ).fetchall()
748 relationship_types = {row[0]: row[1] for row in relationship_types_result}
750 return {
751 "total_entities": entity_count or 0,
752 "total_relationships": relationship_count or 0,
753 "entity_types": entity_types,
754 "relationship_types": relationship_types,
755 }