Coverage for session_buddy / knowledge_graph_db.py: 82.99%

205 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1#!/usr/bin/env python3 

2"""Knowledge Graph Database using DuckDB + DuckPGQ Extension. 

3 

4This module provides semantic memory (knowledge graph) capabilities 

5using DuckDB's DuckPGQ extension for SQL/PGQ (Property Graph Queries). 

6 

7The knowledge graph stores: 

8- **Entities**: Nodes representing projects, libraries, technologies, concepts 

9- **Relations**: Edges connecting entities (uses, depends_on, developed_by, etc.) 

10- **Observations**: Facts and notes attached to entities 

11 

12This is separate from the episodic memory (conversations) in ReflectionDatabase. 

13""" 

14 

15from __future__ import annotations 

16 

17import json 

18import os 

19import uuid 

20from datetime import UTC, datetime 

21from pathlib import Path 

22from typing import TYPE_CHECKING, Any, Self 

23 

24if TYPE_CHECKING: 

25 import duckdb 

26 

27try: 

28 import duckdb 

29 

30 DUCKDB_AVAILABLE = True 

31except ImportError: 

32 DUCKDB_AVAILABLE = False 

33 

34 

35class KnowledgeGraphDatabase: 

36 """Manages knowledge graph using DuckDB + DuckPGQ extension. 

37 

38 This class provides semantic memory through a property graph model, 

39 complementing the episodic memory in ReflectionDatabase. 

40 

41 Example: 

42 >>> async with KnowledgeGraphDatabase() as kg: 

43 >>> entity = await kg.create_entity( 

44 >>> name="session-mgmt-mcp", 

45 >>> entity_type="project" 

46 >>> ) 

47 >>> relation = await kg.create_relation( 

48 >>> from_entity="session-mgmt-mcp", 

49 >>> to_entity="ACB", 

50 >>> relation_type="uses" 

51 >>> ) 

52 

53 """ 

54 

55 def __init__(self, db_path: str | None = None) -> None: 

56 """Initialize knowledge graph database. 

57 

58 Args: 

59 db_path: Path to DuckDB database file. 

60 Defaults to ~/.claude/data/knowledge_graph.duckdb 

61 

62 """ 

63 self.db_path = db_path or os.path.expanduser( 

64 "~/.claude/data/knowledge_graph.duckdb", 

65 ) 

66 Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) 

67 

68 self.conn: duckdb.DuckDBPyConnection | None = None 

69 self._duckpgq_installed = False 

70 

71 def __enter__(self) -> Self: 

72 """Context manager entry.""" 

73 return self 

74 

75 def __exit__(self, *_exc_info: object) -> None: 

76 """Context manager exit with cleanup.""" 

77 self.close() 

78 

79 async def __aenter__(self) -> Self: 

80 """Async context manager entry.""" 

81 await self.initialize() 

82 return self 

83 

84 async def __aexit__(self, *_exc_info: object) -> None: 

85 """Async context manager exit with cleanup.""" 

86 self.close() 

87 

88 def close(self) -> None: 

89 """Close database connection.""" 

90 if self.conn: 

91 try: 

92 self.conn.close() 

93 except Exception: 

94 # nosec B110 - intentionally suppressing exceptions during cleanup 

95 pass # Ignore errors during cleanup 

96 finally: 

97 self.conn = None 

98 

99 def __del__(self) -> None: 

100 """Destructor to ensure cleanup.""" 

101 self.close() 

102 

103 async def initialize(self) -> None: 

104 """Initialize database and DuckPGQ extension. 

105 

106 This method: 

107 1. Creates DuckDB connection 

108 2. Installs DuckPGQ extension from community repository 

109 3. Creates property graph schema (entities + relationships tables) 

110 4. Creates the knowledge_graph property graph 

111 

112 Raises: 

113 ImportError: If DuckDB is not available 

114 RuntimeError: If DuckPGQ installation fails 

115 

116 """ 

117 if not DUCKDB_AVAILABLE: 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true

118 msg = "DuckDB not available. Install with: uv add duckdb" 

119 raise ImportError(msg) 

120 

121 # Create connection 

122 self.conn = duckdb.connect(self.db_path) 

123 assert self.conn is not None # Type narrowing 

124 

125 # Install and load DuckPGQ extension 

126 try: 

127 self.conn.execute("INSTALL duckpgq FROM community") 

128 self.conn.execute("LOAD duckpgq") 

129 self._duckpgq_installed = True 

130 except Exception as e: 

131 msg = f"Failed to install DuckPGQ extension: {e}" 

132 raise RuntimeError(msg) from e 

133 

134 # Create schema 

135 await self._create_schema() 

136 

137 def _get_conn(self) -> duckdb.DuckDBPyConnection: 

138 """Get database connection, raising error if not initialized. 

139 

140 Returns: 

141 Active DuckDB connection 

142 

143 Raises: 

144 RuntimeError: If connection not initialized 

145 

146 """ 

147 if self.conn is None: 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true

148 msg = "Database connection not initialized. Call initialize() first" 

149 raise RuntimeError(msg) 

150 return self.conn 

151 

152 async def _create_schema(self) -> None: 

153 """Create knowledge graph schema with DuckPGQ property graph. 

154 

155 Creates: 

156 - kg_entities table (nodes) 

157 - kg_relationships table (edges) 

158 - knowledge_graph property graph 

159 - Indexes for performance 

160 """ 

161 conn = self._get_conn() 

162 

163 # Create entities table (nodes/vertices) 

164 conn.execute(""" 

165 CREATE TABLE IF NOT EXISTS kg_entities ( 

166 id VARCHAR PRIMARY KEY, 

167 name VARCHAR NOT NULL, 

168 entity_type VARCHAR NOT NULL, 

169 observations VARCHAR[], 

170 properties JSON, 

171 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 

172 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 

173 metadata JSON 

174 ) 

175 """) 

176 

177 # Create relationships table (edges) 

178 # Note: DuckDB doesn't support CASCADE constraints, so we omit ON DELETE CASCADE 

179 conn.execute(""" 

180 CREATE TABLE IF NOT EXISTS kg_relationships ( 

181 id VARCHAR PRIMARY KEY, 

182 from_entity VARCHAR NOT NULL, 

183 to_entity VARCHAR NOT NULL, 

184 relation_type VARCHAR NOT NULL, 

185 properties JSON, 

186 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 

187 metadata JSON, 

188 FOREIGN KEY (from_entity) REFERENCES kg_entities(id), 

189 FOREIGN KEY (to_entity) REFERENCES kg_entities(id) 

190 ) 

191 """) 

192 

193 # Create indexes for performance 

194 conn.execute( 

195 "CREATE INDEX IF NOT EXISTS idx_entities_name ON kg_entities(name)", 

196 ) 

197 conn.execute( 

198 "CREATE INDEX IF NOT EXISTS idx_entities_type ON kg_entities(entity_type)", 

199 ) 

200 conn.execute( 

201 "CREATE INDEX IF NOT EXISTS idx_relationships_type ON kg_relationships(relation_type)", 

202 ) 

203 conn.execute( 

204 "CREATE INDEX IF NOT EXISTS idx_relationships_from ON kg_relationships(from_entity)", 

205 ) 

206 conn.execute( 

207 "CREATE INDEX IF NOT EXISTS idx_relationships_to ON kg_relationships(to_entity)", 

208 ) 

209 

210 # Create property graph using DuckPGQ 

211 # This maps our tables to SQL/PGQ graph structure 

212 try: 

213 conn.execute(""" 

214 CREATE PROPERTY GRAPH IF NOT EXISTS knowledge_graph 

215 VERTEX TABLES (kg_entities) 

216 EDGE TABLES ( 

217 kg_relationships 

218 SOURCE KEY (from_entity) REFERENCES kg_entities (id) 

219 DESTINATION KEY (to_entity) REFERENCES kg_entities (id) 

220 ) 

221 """) 

222 except Exception as e: 

223 # Property graph might already exist, that's okay 

224 if "already exists" not in str(e).lower(): 

225 raise 

226 

227 async def create_entity( 

228 self, 

229 name: str, 

230 entity_type: str, 

231 observations: list[str] | None = None, 

232 properties: dict[str, Any] | None = None, 

233 metadata: dict[str, Any] | None = None, 

234 ) -> dict[str, Any]: 

235 """Create an entity (node) in the knowledge graph. 

236 

237 Args: 

238 name: Entity name (e.g., "session-mgmt-mcp", "Python 3.13") 

239 entity_type: Type of entity (e.g., "project", "language", "library") 

240 observations: List of facts about this entity 

241 properties: Additional structured properties 

242 metadata: Metadata (e.g., source, confidence) 

243 

244 Returns: 

245 Created entity as dict with id, name, type, etc. 

246 

247 Example: 

248 >>> entity = await kg.create_entity( 

249 >>> name="FastBlocks", 

250 >>> entity_type="framework", 

251 >>> observations=["Web framework", "Built on ACB"] 

252 >>> ) 

253 

254 """ 

255 conn = self._get_conn() 

256 entity_id = str(uuid.uuid4()) 

257 observations = observations or [] 

258 properties = properties or {} 

259 metadata = metadata or {} 

260 now = datetime.now(UTC) 

261 

262 conn.execute( 

263 """ 

264 INSERT INTO kg_entities (id, name, entity_type, observations, properties, created_at, updated_at, metadata) 

265 VALUES (?, ?, ?, ?, ?, ?, ?, ?) 

266 """, 

267 ( 

268 entity_id, 

269 name, 

270 entity_type, 

271 observations, 

272 json.dumps(properties), 

273 now, 

274 now, 

275 json.dumps(metadata), 

276 ), 

277 ) 

278 

279 return { 

280 "id": entity_id, 

281 "name": name, 

282 "entity_type": entity_type, 

283 "observations": observations, 

284 "properties": properties, 

285 "created_at": now.isoformat(), 

286 "metadata": metadata, 

287 } 

288 

289 async def get_entity(self, entity_id: str) -> dict[str, Any] | None: 

290 """Retrieve an entity by ID. 

291 

292 Args: 

293 entity_id: UUID of the entity 

294 

295 Returns: 

296 Entity dict or None if not found 

297 

298 """ 

299 conn = self._get_conn() 

300 

301 result = conn.execute( 

302 "SELECT * FROM kg_entities WHERE id = ?", 

303 (entity_id,), 

304 ).fetchone() 

305 

306 if not result: 

307 return None 

308 

309 # Type annotations for clarity - result is tuple from fetchone() 

310 entity_id_str: str = str(result[0]) 

311 name: str = str(result[1]) # type: ignore[misc] 

312 entity_type: str = str(result[2]) # type: ignore[misc] 

313 observations: list[str] = list(result[3]) if result[3] else [] # type: ignore[misc] 

314 properties_json: str | None = result[4] if len(result) > 4 else None 

315 created_at_raw = result[5] if len(result) > 5 else None 

316 updated_at_raw = result[6] if len(result) > 6 else None 

317 metadata_json: str | None = result[7] if len(result) > 7 else None 

318 

319 return { 

320 "id": entity_id_str, 

321 "name": name, 

322 "entity_type": entity_type, 

323 "observations": observations, 

324 "properties": json.loads(properties_json) if properties_json else {}, 

325 "created_at": created_at_raw.isoformat() if created_at_raw else None, 

326 "updated_at": updated_at_raw.isoformat() if updated_at_raw else None, 

327 "metadata": json.loads(metadata_json) if metadata_json else {}, 

328 } 

329 

330 async def find_entity_by_name( 

331 self, 

332 name: str, 

333 entity_type: str | None = None, 

334 ) -> dict[str, Any] | None: 

335 """Find an entity by name (case-insensitive). 

336 

337 Args: 

338 name: Entity name to search for 

339 entity_type: Optional type filter 

340 

341 Returns: 

342 First matching entity or None 

343 

344 """ 

345 conn = self._get_conn() 

346 

347 if entity_type: 

348 result = conn.execute( 

349 "SELECT * FROM kg_entities WHERE LOWER(name) = LOWER(?) AND entity_type = ? LIMIT 1", 

350 (name, entity_type), 

351 ).fetchone() 

352 else: 

353 result = conn.execute( 

354 "SELECT * FROM kg_entities WHERE LOWER(name) = LOWER(?) LIMIT 1", 

355 (name,), 

356 ).fetchone() 

357 

358 if not result: 

359 return None 

360 

361 # Type annotations for clarity - result is tuple from fetchone() 

362 entity_id: str = str(result[0]) 

363 entity_name: str = str(result[1]) # type: ignore[misc] 

364 entity_type_str: str = str(result[2]) # type: ignore[misc] 

365 observations: list[str] = list(result[3]) if result[3] else [] # type: ignore[misc] 

366 properties_json: str | None = result[4] if len(result) > 4 else None 

367 created_at_raw = result[5] if len(result) > 5 else None 

368 updated_at_raw = result[6] if len(result) > 6 else None 

369 metadata_json: str | None = result[7] if len(result) > 7 else None 

370 

371 return { 

372 "id": entity_id, 

373 "name": entity_name, 

374 "entity_type": entity_type_str, 

375 "observations": observations, 

376 "properties": json.loads(properties_json) if properties_json else {}, 

377 "created_at": created_at_raw.isoformat() if created_at_raw else None, 

378 "updated_at": updated_at_raw.isoformat() if updated_at_raw else None, 

379 "metadata": json.loads(metadata_json) if metadata_json else {}, 

380 } 

381 

382 async def create_relation( 

383 self, 

384 from_entity: str, 

385 to_entity: str, 

386 relation_type: str, 

387 properties: dict[str, Any] | None = None, 

388 metadata: dict[str, Any] | None = None, 

389 ) -> dict[str, Any] | None: 

390 """Create a relationship between two entities. 

391 

392 Args: 

393 from_entity: Source entity name 

394 to_entity: Target entity name 

395 relation_type: Type of relationship (e.g., "uses", "depends_on") 

396 properties: Additional properties 

397 metadata: Metadata 

398 

399 Returns: 

400 Created relationship dict, or None if entities not found 

401 

402 Example: 

403 >>> relation = await kg.create_relation( 

404 >>> from_entity="crackerjack", 

405 >>> to_entity="Python 3.13", 

406 >>> relation_type="uses" 

407 >>> ) 

408 

409 """ 

410 # Find source and target entities 

411 from_node = await self.find_entity_by_name(from_entity) 

412 to_node = await self.find_entity_by_name(to_entity) 

413 

414 if not from_node or not to_node: 

415 return None 

416 

417 conn = self._get_conn() 

418 relation_id = str(uuid.uuid4()) 

419 properties = properties or {} 

420 metadata = metadata or {} 

421 now = datetime.now(UTC) 

422 

423 conn.execute( 

424 """ 

425 INSERT INTO kg_relationships (id, from_entity, to_entity, relation_type, properties, created_at, metadata) 

426 VALUES (?, ?, ?, ?, ?, ?, ?) 

427 """, 

428 ( 

429 relation_id, 

430 from_node["id"], 

431 to_node["id"], 

432 relation_type, 

433 json.dumps(properties), 

434 now, 

435 json.dumps(metadata), 

436 ), 

437 ) 

438 

439 return { 

440 "id": relation_id, 

441 "from_entity": from_entity, 

442 "to_entity": to_entity, 

443 "relation_type": relation_type, 

444 "properties": properties, 

445 "created_at": now.isoformat(), 

446 "metadata": metadata, 

447 } 

448 

449 async def add_observation(self, entity_name: str, observation: str) -> bool: 

450 """Add an observation (fact) to an existing entity. 

451 

452 Args: 

453 entity_name: Name of the entity 

454 observation: Fact to add 

455 

456 Returns: 

457 True if successful, False if entity not found 

458 

459 """ 

460 entity = await self.find_entity_by_name(entity_name) 

461 if not entity: 461 ↛ 462line 461 didn't jump to line 462 because the condition on line 461 was never true

462 return False 

463 

464 conn = self._get_conn() 

465 observations = entity.get("observations", []) 

466 observations.append(observation) 

467 now = datetime.now(UTC) 

468 

469 conn.execute( 

470 """ 

471 UPDATE kg_entities 

472 SET observations = ?, updated_at = ? 

473 WHERE id = ? 

474 """, 

475 (observations, now, entity["id"]), 

476 ) 

477 

478 return True 

479 

480 async def search_entities( 

481 self, 

482 query: str, 

483 entity_type: str | None = None, 

484 limit: int = 10, 

485 ) -> list[dict[str, Any]]: 

486 """Search for entities by name or observations. 

487 

488 Args: 

489 query: Search query (matches name and observations) 

490 entity_type: Optional filter by type 

491 limit: Maximum results to return 

492 

493 Returns: 

494 List of matching entities 

495 

496 """ 

497 conn = self._get_conn() 

498 

499 params_tuple: tuple[str, ...] | tuple[str, str, str, int] | tuple[str, str, int] 

500 if entity_type: 500 ↛ 510line 500 didn't jump to line 510 because the condition on line 500 was always true

501 sql = """ 

502 SELECT * FROM kg_entities 

503 WHERE (LOWER(name) LIKE LOWER(?) OR ARRAY_TO_STRING(observations, ' ') LIKE LOWER(?)) 

504 AND entity_type = ? 

505 ORDER BY created_at DESC 

506 LIMIT ? 

507 """ 

508 params_tuple = (f"%{query}%", f"%{query}%", entity_type, limit) 

509 else: 

510 sql = """ 

511 SELECT * FROM kg_entities 

512 WHERE LOWER(name) LIKE LOWER(?) OR ARRAY_TO_STRING(observations, ' ') LIKE LOWER(?) 

513 ORDER BY created_at DESC 

514 LIMIT ? 

515 """ 

516 params_tuple = (f"%{query}%", f"%{query}%", limit) 

517 

518 results = conn.execute(sql, params_tuple).fetchall() 

519 

520 entities: list[dict[str, Any]] = [] 

521 for row in results: 

522 entity_id: str = str(row[0]) 

523 name: str = str(row[1]) 

524 row_entity_type: str = str(row[2]) 

525 observations: list[str] = list(row[3]) if row[3] else [] 

526 properties_json: str | None = row[4] if len(row) > 4 else None 

527 created_at_raw = row[5] if len(row) > 5 else None 

528 updated_at_raw = row[6] if len(row) > 6 else None 

529 metadata_json: str | None = row[7] if len(row) > 7 else None 

530 

531 entities.append( 

532 { 

533 "id": entity_id, 

534 "name": name, 

535 "entity_type": row_entity_type, 

536 "observations": observations, 

537 "properties": json.loads(properties_json) 

538 if properties_json 

539 else {}, 

540 "created_at": created_at_raw.isoformat() 

541 if created_at_raw 

542 else None, 

543 "updated_at": updated_at_raw.isoformat() 

544 if updated_at_raw 

545 else None, 

546 "metadata": json.loads(metadata_json) if metadata_json else {}, 

547 }, 

548 ) 

549 return entities 

550 

551 async def get_relationships( 

552 self, 

553 entity_name: str, 

554 relation_type: str | None = None, 

555 direction: str = "both", 

556 ) -> list[dict[str, Any]]: 

557 """Get all relationships for an entity. 

558 

559 Args: 

560 entity_name: Entity to find relationships for 

561 relation_type: Optional filter by relationship type 

562 direction: "outgoing", "incoming", or "both" 

563 

564 Returns: 

565 List of relationships 

566 

567 """ 

568 entity = await self.find_entity_by_name(entity_name) 

569 if not entity: 569 ↛ 570line 569 didn't jump to line 570 because the condition on line 569 was never true

570 return [] 

571 

572 conn = self._get_conn() 

573 

574 where_clause, params = self._build_relationship_filters( 

575 direction, 

576 relation_type, 

577 entity, 

578 ) 

579 

580 # Build SQL safely - all user input is parameterized via params list 

581 sql = ( 

582 "SELECT r.id, r.relation_type, e1.name as from_name, " 

583 "e2.name as to_name, r.properties, r.created_at " 

584 "FROM kg_relationships r " 

585 "JOIN kg_entities e1 ON r.from_entity = e1.id " 

586 "JOIN kg_entities e2 ON r.to_entity = e2.id " 

587 + where_clause 

588 + " ORDER BY r.created_at DESC" 

589 ) 

590 

591 results = conn.execute(sql, params).fetchall() 

592 

593 relationships: list[dict[str, Any]] = [] 

594 for row in results: 

595 rel_id: str = str(row[0]) 

596 row_relation_type: str = str(row[1]) 

597 from_name: str = str(row[2]) 

598 to_name: str = str(row[3]) 

599 properties_json: str | None = row[4] if len(row) > 4 else None 

600 created_at_raw = row[5] if len(row) > 5 else None 

601 

602 relationships.append( 

603 { 

604 "id": rel_id, 

605 "relation_type": row_relation_type, 

606 "from_entity": from_name, 

607 "to_entity": to_name, 

608 "properties": json.loads(properties_json) 

609 if properties_json 

610 else {}, 

611 "created_at": created_at_raw.isoformat() 

612 if created_at_raw 

613 else None, 

614 }, 

615 ) 

616 return relationships 

617 

618 def _build_relationship_filters( 

619 self, 

620 direction: str, 

621 relation_type: str | None, 

622 entity: dict[str, Any], 

623 ) -> tuple[str, tuple[str, ...]]: 

624 """Build WHERE clause and parameters for relationship queries.""" 

625 entity_id = entity["id"] 

626 if direction == "outgoing": 626 ↛ 627line 626 didn't jump to line 627 because the condition on line 626 was never true

627 base_clause = "WHERE r.from_entity = ?" 

628 params: tuple[str, ...] = (entity_id,) 

629 elif direction == "incoming": 629 ↛ 630line 629 didn't jump to line 630 because the condition on line 629 was never true

630 base_clause = "WHERE r.to_entity = ?" 

631 params = (entity_id,) 

632 else: 

633 base_clause = "WHERE (r.from_entity = ? OR r.to_entity = ?)" 

634 params = (entity_id, entity_id) 

635 

636 if relation_type: 636 ↛ 637line 636 didn't jump to line 637 because the condition on line 636 was never true

637 base_clause += " AND r.relation_type = ?" 

638 params = (*params, relation_type) 

639 

640 return base_clause, params 

641 

642 async def find_path( 

643 self, 

644 from_entity: str, 

645 to_entity: str, 

646 max_depth: int = 5, 

647 ) -> list[dict[str, Any]]: 

648 """Find paths between two entities using SQL/PGQ. 

649 

650 Args: 

651 from_entity: Starting entity name 

652 to_entity: Target entity name 

653 max_depth: Maximum path length 

654 

655 Returns: 

656 List of paths, each with nodes and relationships 

657 

658 Note: 

659 This uses DuckPGQ's SQL/PGQ syntax for graph pattern matching. 

660 

661 """ 

662 from_node = await self.find_entity_by_name(from_entity) 

663 to_node = await self.find_entity_by_name(to_entity) 

664 

665 if not from_node or not to_node: 665 ↛ 666line 665 didn't jump to line 666 because the condition on line 665 was never true

666 return [] 

667 

668 conn = self._get_conn() 

669 

670 # Use SQL/PGQ for path finding 

671 # This is the beautiful part - SQL:2023 standard graph queries! 

672 query = """ 

673 SELECT * 

674 FROM GRAPH_TABLE (knowledge_graph 

675 MATCH (start)-[path:*1..?] ->(end) 

676 WHERE start.id = ? 

677 AND end.id = ? 

678 COLUMNS ( 

679 start.name AS from_name, 

680 end.name AS to_name, 

681 length(path) AS path_length 

682 ) 

683 ) 

684 """ 

685 params = [max_depth, from_node["id"], to_node["id"]] 

686 

687 try: 

688 results = conn.execute(query, params).fetchall() 

689 

690 paths = [] 

691 for row in results: 

692 from_name: str = str(row[0]) 

693 to_name: str = str(row[1]) 

694 path_length: int = int(row[2]) 

695 

696 paths.append( 

697 { 

698 "from_entity": from_name, 

699 "to_entity": to_name, 

700 "path_length": path_length, 

701 }, 

702 ) 

703 return paths 

704 except Exception: 

705 # Fallback to simple check if SQL/PGQ fails 

706 # (This can happen if graph is complex) 

707 return [] 

708 

709 async def get_stats(self) -> dict[str, Any]: 

710 """Get knowledge graph statistics. 

711 

712 Returns: 

713 Stats including entity count, relationship count, types 

714 

715 """ 

716 conn = self._get_conn() 

717 

718 # Count entities 

719 entity_count_result = conn.execute( 

720 "SELECT COUNT(*) FROM kg_entities", 

721 ).fetchone() 

722 entity_count: int = int(entity_count_result[0]) if entity_count_result else 0 

723 

724 # Count relationships 

725 relationship_count_result = conn.execute( 

726 "SELECT COUNT(*) FROM kg_relationships", 

727 ).fetchone() 

728 relationship_count: int = ( 

729 int(relationship_count_result[0]) if relationship_count_result else 0 

730 ) 

731 

732 # Get entity types 

733 entity_types = conn.execute(""" 

734 SELECT entity_type, COUNT(*) as count 

735 FROM kg_entities 

736 GROUP BY entity_type 

737 ORDER BY count DESC 

738 """).fetchall() 

739 

740 # Get relationship types 

741 relationship_types = conn.execute(""" 

742 SELECT relation_type, COUNT(*) as count 

743 FROM kg_relationships 

744 GROUP BY relation_type 

745 ORDER BY count DESC 

746 """).fetchall() 

747 

748 return { 

749 "total_entities": entity_count, 

750 "total_relationships": relationship_count, 

751 "entity_types": dict(entity_types), 

752 "relationship_types": dict(relationship_types), 

753 "database_path": self.db_path, 

754 "duckpgq_installed": self._duckpgq_installed, 

755 }