Coverage for session_buddy / knowledge_graph_db.py: 82.99%
205 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1#!/usr/bin/env python3
2"""Knowledge Graph Database using DuckDB + DuckPGQ Extension.
4This module provides semantic memory (knowledge graph) capabilities
5using DuckDB's DuckPGQ extension for SQL/PGQ (Property Graph Queries).
7The knowledge graph stores:
8- **Entities**: Nodes representing projects, libraries, technologies, concepts
9- **Relations**: Edges connecting entities (uses, depends_on, developed_by, etc.)
10- **Observations**: Facts and notes attached to entities
12This is separate from the episodic memory (conversations) in ReflectionDatabase.
13"""
15from __future__ import annotations
17import json
18import os
19import uuid
20from datetime import UTC, datetime
21from pathlib import Path
22from typing import TYPE_CHECKING, Any, Self
24if TYPE_CHECKING:
25 import duckdb
27try:
28 import duckdb
30 DUCKDB_AVAILABLE = True
31except ImportError:
32 DUCKDB_AVAILABLE = False
35class KnowledgeGraphDatabase:
36 """Manages knowledge graph using DuckDB + DuckPGQ extension.
38 This class provides semantic memory through a property graph model,
39 complementing the episodic memory in ReflectionDatabase.
41 Example:
42 >>> async with KnowledgeGraphDatabase() as kg:
43 >>> entity = await kg.create_entity(
44 >>> name="session-mgmt-mcp",
45 >>> entity_type="project"
46 >>> )
47 >>> relation = await kg.create_relation(
48 >>> from_entity="session-mgmt-mcp",
49 >>> to_entity="ACB",
50 >>> relation_type="uses"
51 >>> )
53 """
55 def __init__(self, db_path: str | None = None) -> None:
56 """Initialize knowledge graph database.
58 Args:
59 db_path: Path to DuckDB database file.
60 Defaults to ~/.claude/data/knowledge_graph.duckdb
62 """
63 self.db_path = db_path or os.path.expanduser(
64 "~/.claude/data/knowledge_graph.duckdb",
65 )
66 Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
68 self.conn: duckdb.DuckDBPyConnection | None = None
69 self._duckpgq_installed = False
71 def __enter__(self) -> Self:
72 """Context manager entry."""
73 return self
75 def __exit__(self, *_exc_info: object) -> None:
76 """Context manager exit with cleanup."""
77 self.close()
79 async def __aenter__(self) -> Self:
80 """Async context manager entry."""
81 await self.initialize()
82 return self
84 async def __aexit__(self, *_exc_info: object) -> None:
85 """Async context manager exit with cleanup."""
86 self.close()
88 def close(self) -> None:
89 """Close database connection."""
90 if self.conn:
91 try:
92 self.conn.close()
93 except Exception:
94 # nosec B110 - intentionally suppressing exceptions during cleanup
95 pass # Ignore errors during cleanup
96 finally:
97 self.conn = None
99 def __del__(self) -> None:
100 """Destructor to ensure cleanup."""
101 self.close()
103 async def initialize(self) -> None:
104 """Initialize database and DuckPGQ extension.
106 This method:
107 1. Creates DuckDB connection
108 2. Installs DuckPGQ extension from community repository
109 3. Creates property graph schema (entities + relationships tables)
110 4. Creates the knowledge_graph property graph
112 Raises:
113 ImportError: If DuckDB is not available
114 RuntimeError: If DuckPGQ installation fails
116 """
117 if not DUCKDB_AVAILABLE: 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true
118 msg = "DuckDB not available. Install with: uv add duckdb"
119 raise ImportError(msg)
121 # Create connection
122 self.conn = duckdb.connect(self.db_path)
123 assert self.conn is not None # Type narrowing
125 # Install and load DuckPGQ extension
126 try:
127 self.conn.execute("INSTALL duckpgq FROM community")
128 self.conn.execute("LOAD duckpgq")
129 self._duckpgq_installed = True
130 except Exception as e:
131 msg = f"Failed to install DuckPGQ extension: {e}"
132 raise RuntimeError(msg) from e
134 # Create schema
135 await self._create_schema()
137 def _get_conn(self) -> duckdb.DuckDBPyConnection:
138 """Get database connection, raising error if not initialized.
140 Returns:
141 Active DuckDB connection
143 Raises:
144 RuntimeError: If connection not initialized
146 """
147 if self.conn is None: 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true
148 msg = "Database connection not initialized. Call initialize() first"
149 raise RuntimeError(msg)
150 return self.conn
152 async def _create_schema(self) -> None:
153 """Create knowledge graph schema with DuckPGQ property graph.
155 Creates:
156 - kg_entities table (nodes)
157 - kg_relationships table (edges)
158 - knowledge_graph property graph
159 - Indexes for performance
160 """
161 conn = self._get_conn()
163 # Create entities table (nodes/vertices)
164 conn.execute("""
165 CREATE TABLE IF NOT EXISTS kg_entities (
166 id VARCHAR PRIMARY KEY,
167 name VARCHAR NOT NULL,
168 entity_type VARCHAR NOT NULL,
169 observations VARCHAR[],
170 properties JSON,
171 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
172 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
173 metadata JSON
174 )
175 """)
177 # Create relationships table (edges)
178 # Note: DuckDB doesn't support CASCADE constraints, so we omit ON DELETE CASCADE
179 conn.execute("""
180 CREATE TABLE IF NOT EXISTS kg_relationships (
181 id VARCHAR PRIMARY KEY,
182 from_entity VARCHAR NOT NULL,
183 to_entity VARCHAR NOT NULL,
184 relation_type VARCHAR NOT NULL,
185 properties JSON,
186 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
187 metadata JSON,
188 FOREIGN KEY (from_entity) REFERENCES kg_entities(id),
189 FOREIGN KEY (to_entity) REFERENCES kg_entities(id)
190 )
191 """)
193 # Create indexes for performance
194 conn.execute(
195 "CREATE INDEX IF NOT EXISTS idx_entities_name ON kg_entities(name)",
196 )
197 conn.execute(
198 "CREATE INDEX IF NOT EXISTS idx_entities_type ON kg_entities(entity_type)",
199 )
200 conn.execute(
201 "CREATE INDEX IF NOT EXISTS idx_relationships_type ON kg_relationships(relation_type)",
202 )
203 conn.execute(
204 "CREATE INDEX IF NOT EXISTS idx_relationships_from ON kg_relationships(from_entity)",
205 )
206 conn.execute(
207 "CREATE INDEX IF NOT EXISTS idx_relationships_to ON kg_relationships(to_entity)",
208 )
210 # Create property graph using DuckPGQ
211 # This maps our tables to SQL/PGQ graph structure
212 try:
213 conn.execute("""
214 CREATE PROPERTY GRAPH IF NOT EXISTS knowledge_graph
215 VERTEX TABLES (kg_entities)
216 EDGE TABLES (
217 kg_relationships
218 SOURCE KEY (from_entity) REFERENCES kg_entities (id)
219 DESTINATION KEY (to_entity) REFERENCES kg_entities (id)
220 )
221 """)
222 except Exception as e:
223 # Property graph might already exist, that's okay
224 if "already exists" not in str(e).lower():
225 raise
227 async def create_entity(
228 self,
229 name: str,
230 entity_type: str,
231 observations: list[str] | None = None,
232 properties: dict[str, Any] | None = None,
233 metadata: dict[str, Any] | None = None,
234 ) -> dict[str, Any]:
235 """Create an entity (node) in the knowledge graph.
237 Args:
238 name: Entity name (e.g., "session-mgmt-mcp", "Python 3.13")
239 entity_type: Type of entity (e.g., "project", "language", "library")
240 observations: List of facts about this entity
241 properties: Additional structured properties
242 metadata: Metadata (e.g., source, confidence)
244 Returns:
245 Created entity as dict with id, name, type, etc.
247 Example:
248 >>> entity = await kg.create_entity(
249 >>> name="FastBlocks",
250 >>> entity_type="framework",
251 >>> observations=["Web framework", "Built on ACB"]
252 >>> )
254 """
255 conn = self._get_conn()
256 entity_id = str(uuid.uuid4())
257 observations = observations or []
258 properties = properties or {}
259 metadata = metadata or {}
260 now = datetime.now(UTC)
262 conn.execute(
263 """
264 INSERT INTO kg_entities (id, name, entity_type, observations, properties, created_at, updated_at, metadata)
265 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
266 """,
267 (
268 entity_id,
269 name,
270 entity_type,
271 observations,
272 json.dumps(properties),
273 now,
274 now,
275 json.dumps(metadata),
276 ),
277 )
279 return {
280 "id": entity_id,
281 "name": name,
282 "entity_type": entity_type,
283 "observations": observations,
284 "properties": properties,
285 "created_at": now.isoformat(),
286 "metadata": metadata,
287 }
289 async def get_entity(self, entity_id: str) -> dict[str, Any] | None:
290 """Retrieve an entity by ID.
292 Args:
293 entity_id: UUID of the entity
295 Returns:
296 Entity dict or None if not found
298 """
299 conn = self._get_conn()
301 result = conn.execute(
302 "SELECT * FROM kg_entities WHERE id = ?",
303 (entity_id,),
304 ).fetchone()
306 if not result:
307 return None
309 # Type annotations for clarity - result is tuple from fetchone()
310 entity_id_str: str = str(result[0])
311 name: str = str(result[1]) # type: ignore[misc]
312 entity_type: str = str(result[2]) # type: ignore[misc]
313 observations: list[str] = list(result[3]) if result[3] else [] # type: ignore[misc]
314 properties_json: str | None = result[4] if len(result) > 4 else None
315 created_at_raw = result[5] if len(result) > 5 else None
316 updated_at_raw = result[6] if len(result) > 6 else None
317 metadata_json: str | None = result[7] if len(result) > 7 else None
319 return {
320 "id": entity_id_str,
321 "name": name,
322 "entity_type": entity_type,
323 "observations": observations,
324 "properties": json.loads(properties_json) if properties_json else {},
325 "created_at": created_at_raw.isoformat() if created_at_raw else None,
326 "updated_at": updated_at_raw.isoformat() if updated_at_raw else None,
327 "metadata": json.loads(metadata_json) if metadata_json else {},
328 }
330 async def find_entity_by_name(
331 self,
332 name: str,
333 entity_type: str | None = None,
334 ) -> dict[str, Any] | None:
335 """Find an entity by name (case-insensitive).
337 Args:
338 name: Entity name to search for
339 entity_type: Optional type filter
341 Returns:
342 First matching entity or None
344 """
345 conn = self._get_conn()
347 if entity_type:
348 result = conn.execute(
349 "SELECT * FROM kg_entities WHERE LOWER(name) = LOWER(?) AND entity_type = ? LIMIT 1",
350 (name, entity_type),
351 ).fetchone()
352 else:
353 result = conn.execute(
354 "SELECT * FROM kg_entities WHERE LOWER(name) = LOWER(?) LIMIT 1",
355 (name,),
356 ).fetchone()
358 if not result:
359 return None
361 # Type annotations for clarity - result is tuple from fetchone()
362 entity_id: str = str(result[0])
363 entity_name: str = str(result[1]) # type: ignore[misc]
364 entity_type_str: str = str(result[2]) # type: ignore[misc]
365 observations: list[str] = list(result[3]) if result[3] else [] # type: ignore[misc]
366 properties_json: str | None = result[4] if len(result) > 4 else None
367 created_at_raw = result[5] if len(result) > 5 else None
368 updated_at_raw = result[6] if len(result) > 6 else None
369 metadata_json: str | None = result[7] if len(result) > 7 else None
371 return {
372 "id": entity_id,
373 "name": entity_name,
374 "entity_type": entity_type_str,
375 "observations": observations,
376 "properties": json.loads(properties_json) if properties_json else {},
377 "created_at": created_at_raw.isoformat() if created_at_raw else None,
378 "updated_at": updated_at_raw.isoformat() if updated_at_raw else None,
379 "metadata": json.loads(metadata_json) if metadata_json else {},
380 }
382 async def create_relation(
383 self,
384 from_entity: str,
385 to_entity: str,
386 relation_type: str,
387 properties: dict[str, Any] | None = None,
388 metadata: dict[str, Any] | None = None,
389 ) -> dict[str, Any] | None:
390 """Create a relationship between two entities.
392 Args:
393 from_entity: Source entity name
394 to_entity: Target entity name
395 relation_type: Type of relationship (e.g., "uses", "depends_on")
396 properties: Additional properties
397 metadata: Metadata
399 Returns:
400 Created relationship dict, or None if entities not found
402 Example:
403 >>> relation = await kg.create_relation(
404 >>> from_entity="crackerjack",
405 >>> to_entity="Python 3.13",
406 >>> relation_type="uses"
407 >>> )
409 """
410 # Find source and target entities
411 from_node = await self.find_entity_by_name(from_entity)
412 to_node = await self.find_entity_by_name(to_entity)
414 if not from_node or not to_node:
415 return None
417 conn = self._get_conn()
418 relation_id = str(uuid.uuid4())
419 properties = properties or {}
420 metadata = metadata or {}
421 now = datetime.now(UTC)
423 conn.execute(
424 """
425 INSERT INTO kg_relationships (id, from_entity, to_entity, relation_type, properties, created_at, metadata)
426 VALUES (?, ?, ?, ?, ?, ?, ?)
427 """,
428 (
429 relation_id,
430 from_node["id"],
431 to_node["id"],
432 relation_type,
433 json.dumps(properties),
434 now,
435 json.dumps(metadata),
436 ),
437 )
439 return {
440 "id": relation_id,
441 "from_entity": from_entity,
442 "to_entity": to_entity,
443 "relation_type": relation_type,
444 "properties": properties,
445 "created_at": now.isoformat(),
446 "metadata": metadata,
447 }
449 async def add_observation(self, entity_name: str, observation: str) -> bool:
450 """Add an observation (fact) to an existing entity.
452 Args:
453 entity_name: Name of the entity
454 observation: Fact to add
456 Returns:
457 True if successful, False if entity not found
459 """
460 entity = await self.find_entity_by_name(entity_name)
461 if not entity: 461 ↛ 462line 461 didn't jump to line 462 because the condition on line 461 was never true
462 return False
464 conn = self._get_conn()
465 observations = entity.get("observations", [])
466 observations.append(observation)
467 now = datetime.now(UTC)
469 conn.execute(
470 """
471 UPDATE kg_entities
472 SET observations = ?, updated_at = ?
473 WHERE id = ?
474 """,
475 (observations, now, entity["id"]),
476 )
478 return True
480 async def search_entities(
481 self,
482 query: str,
483 entity_type: str | None = None,
484 limit: int = 10,
485 ) -> list[dict[str, Any]]:
486 """Search for entities by name or observations.
488 Args:
489 query: Search query (matches name and observations)
490 entity_type: Optional filter by type
491 limit: Maximum results to return
493 Returns:
494 List of matching entities
496 """
497 conn = self._get_conn()
499 params_tuple: tuple[str, ...] | tuple[str, str, str, int] | tuple[str, str, int]
500 if entity_type: 500 ↛ 510line 500 didn't jump to line 510 because the condition on line 500 was always true
501 sql = """
502 SELECT * FROM kg_entities
503 WHERE (LOWER(name) LIKE LOWER(?) OR ARRAY_TO_STRING(observations, ' ') LIKE LOWER(?))
504 AND entity_type = ?
505 ORDER BY created_at DESC
506 LIMIT ?
507 """
508 params_tuple = (f"%{query}%", f"%{query}%", entity_type, limit)
509 else:
510 sql = """
511 SELECT * FROM kg_entities
512 WHERE LOWER(name) LIKE LOWER(?) OR ARRAY_TO_STRING(observations, ' ') LIKE LOWER(?)
513 ORDER BY created_at DESC
514 LIMIT ?
515 """
516 params_tuple = (f"%{query}%", f"%{query}%", limit)
518 results = conn.execute(sql, params_tuple).fetchall()
520 entities: list[dict[str, Any]] = []
521 for row in results:
522 entity_id: str = str(row[0])
523 name: str = str(row[1])
524 row_entity_type: str = str(row[2])
525 observations: list[str] = list(row[3]) if row[3] else []
526 properties_json: str | None = row[4] if len(row) > 4 else None
527 created_at_raw = row[5] if len(row) > 5 else None
528 updated_at_raw = row[6] if len(row) > 6 else None
529 metadata_json: str | None = row[7] if len(row) > 7 else None
531 entities.append(
532 {
533 "id": entity_id,
534 "name": name,
535 "entity_type": row_entity_type,
536 "observations": observations,
537 "properties": json.loads(properties_json)
538 if properties_json
539 else {},
540 "created_at": created_at_raw.isoformat()
541 if created_at_raw
542 else None,
543 "updated_at": updated_at_raw.isoformat()
544 if updated_at_raw
545 else None,
546 "metadata": json.loads(metadata_json) if metadata_json else {},
547 },
548 )
549 return entities
551 async def get_relationships(
552 self,
553 entity_name: str,
554 relation_type: str | None = None,
555 direction: str = "both",
556 ) -> list[dict[str, Any]]:
557 """Get all relationships for an entity.
559 Args:
560 entity_name: Entity to find relationships for
561 relation_type: Optional filter by relationship type
562 direction: "outgoing", "incoming", or "both"
564 Returns:
565 List of relationships
567 """
568 entity = await self.find_entity_by_name(entity_name)
569 if not entity: 569 ↛ 570line 569 didn't jump to line 570 because the condition on line 569 was never true
570 return []
572 conn = self._get_conn()
574 where_clause, params = self._build_relationship_filters(
575 direction,
576 relation_type,
577 entity,
578 )
580 # Build SQL safely - all user input is parameterized via params list
581 sql = (
582 "SELECT r.id, r.relation_type, e1.name as from_name, "
583 "e2.name as to_name, r.properties, r.created_at "
584 "FROM kg_relationships r "
585 "JOIN kg_entities e1 ON r.from_entity = e1.id "
586 "JOIN kg_entities e2 ON r.to_entity = e2.id "
587 + where_clause
588 + " ORDER BY r.created_at DESC"
589 )
591 results = conn.execute(sql, params).fetchall()
593 relationships: list[dict[str, Any]] = []
594 for row in results:
595 rel_id: str = str(row[0])
596 row_relation_type: str = str(row[1])
597 from_name: str = str(row[2])
598 to_name: str = str(row[3])
599 properties_json: str | None = row[4] if len(row) > 4 else None
600 created_at_raw = row[5] if len(row) > 5 else None
602 relationships.append(
603 {
604 "id": rel_id,
605 "relation_type": row_relation_type,
606 "from_entity": from_name,
607 "to_entity": to_name,
608 "properties": json.loads(properties_json)
609 if properties_json
610 else {},
611 "created_at": created_at_raw.isoformat()
612 if created_at_raw
613 else None,
614 },
615 )
616 return relationships
618 def _build_relationship_filters(
619 self,
620 direction: str,
621 relation_type: str | None,
622 entity: dict[str, Any],
623 ) -> tuple[str, tuple[str, ...]]:
624 """Build WHERE clause and parameters for relationship queries."""
625 entity_id = entity["id"]
626 if direction == "outgoing": 626 ↛ 627line 626 didn't jump to line 627 because the condition on line 626 was never true
627 base_clause = "WHERE r.from_entity = ?"
628 params: tuple[str, ...] = (entity_id,)
629 elif direction == "incoming": 629 ↛ 630line 629 didn't jump to line 630 because the condition on line 629 was never true
630 base_clause = "WHERE r.to_entity = ?"
631 params = (entity_id,)
632 else:
633 base_clause = "WHERE (r.from_entity = ? OR r.to_entity = ?)"
634 params = (entity_id, entity_id)
636 if relation_type: 636 ↛ 637line 636 didn't jump to line 637 because the condition on line 636 was never true
637 base_clause += " AND r.relation_type = ?"
638 params = (*params, relation_type)
640 return base_clause, params
642 async def find_path(
643 self,
644 from_entity: str,
645 to_entity: str,
646 max_depth: int = 5,
647 ) -> list[dict[str, Any]]:
648 """Find paths between two entities using SQL/PGQ.
650 Args:
651 from_entity: Starting entity name
652 to_entity: Target entity name
653 max_depth: Maximum path length
655 Returns:
656 List of paths, each with nodes and relationships
658 Note:
659 This uses DuckPGQ's SQL/PGQ syntax for graph pattern matching.
661 """
662 from_node = await self.find_entity_by_name(from_entity)
663 to_node = await self.find_entity_by_name(to_entity)
665 if not from_node or not to_node: 665 ↛ 666line 665 didn't jump to line 666 because the condition on line 665 was never true
666 return []
668 conn = self._get_conn()
670 # Use SQL/PGQ for path finding
671 # This is the beautiful part - SQL:2023 standard graph queries!
672 query = """
673 SELECT *
674 FROM GRAPH_TABLE (knowledge_graph
675 MATCH (start)-[path:*1..?] ->(end)
676 WHERE start.id = ?
677 AND end.id = ?
678 COLUMNS (
679 start.name AS from_name,
680 end.name AS to_name,
681 length(path) AS path_length
682 )
683 )
684 """
685 params = [max_depth, from_node["id"], to_node["id"]]
687 try:
688 results = conn.execute(query, params).fetchall()
690 paths = []
691 for row in results:
692 from_name: str = str(row[0])
693 to_name: str = str(row[1])
694 path_length: int = int(row[2])
696 paths.append(
697 {
698 "from_entity": from_name,
699 "to_entity": to_name,
700 "path_length": path_length,
701 },
702 )
703 return paths
704 except Exception:
705 # Fallback to simple check if SQL/PGQ fails
706 # (This can happen if graph is complex)
707 return []
709 async def get_stats(self) -> dict[str, Any]:
710 """Get knowledge graph statistics.
712 Returns:
713 Stats including entity count, relationship count, types
715 """
716 conn = self._get_conn()
718 # Count entities
719 entity_count_result = conn.execute(
720 "SELECT COUNT(*) FROM kg_entities",
721 ).fetchone()
722 entity_count: int = int(entity_count_result[0]) if entity_count_result else 0
724 # Count relationships
725 relationship_count_result = conn.execute(
726 "SELECT COUNT(*) FROM kg_relationships",
727 ).fetchone()
728 relationship_count: int = (
729 int(relationship_count_result[0]) if relationship_count_result else 0
730 )
732 # Get entity types
733 entity_types = conn.execute("""
734 SELECT entity_type, COUNT(*) as count
735 FROM kg_entities
736 GROUP BY entity_type
737 ORDER BY count DESC
738 """).fetchall()
740 # Get relationship types
741 relationship_types = conn.execute("""
742 SELECT relation_type, COUNT(*) as count
743 FROM kg_relationships
744 GROUP BY relation_type
745 ORDER BY count DESC
746 """).fetchall()
748 return {
749 "total_entities": entity_count,
750 "total_relationships": relationship_count,
751 "entity_types": dict(entity_types),
752 "relationship_types": dict(relationship_types),
753 "database_path": self.db_path,
754 "duckpgq_installed": self._duckpgq_installed,
755 }