Coverage for session_buddy / adapters / knowledge_graph_adapter_oneiric.py: 79.52%

197 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1"""Oneiric-compatible knowledge graph adapter using native DuckDB implementation. 

2 

3Provides a Oneiric-compatible knowledge graph adapter that maintains the existing 

4KnowledgeGraphDatabase API while using native DuckDB operations instead of ACB. 

5 

6Phase 5: Oneiric Adapter Conversion - Knowledge Graph Adapter 

7 

8Key Features: 

9 - Native DuckDB PGQ extension for property graph queries 

10 - Oneiric settings and lifecycle management 

11 - Backward-compatible API with existing KnowledgeGraphDatabase 

12 - No ACB dependencies 

13 - Fast local/in-memory operations 

14 

15""" 

16 

17from __future__ import annotations 

18 

19import json 

20import typing as t 

21import uuid 

22from contextlib import suppress 

23from datetime import UTC, datetime 

24 

25from session_buddy.adapters.settings import KnowledgeGraphAdapterSettings 

26 

27if t.TYPE_CHECKING: 

28 from pathlib import Path 

29 from types import TracebackType 

30 

31 import duckdb 

32 

33# DuckDB will be imported at runtime 

34DUCKDB_AVAILABLE = True 

35try: 

36 import duckdb 

37except ImportError: 

38 DUCKDB_AVAILABLE = False 

39 if t.TYPE_CHECKING: 

40 # Type stub for type checking when duckdb is not installed 

41 import types 

42 

43 duckdb = types.SimpleNamespace() # type: ignore[misc,assignment] 

44 

45 

46class KnowledgeGraphDatabaseAdapterOneiric: 

47 """Oneiric-compatible knowledge graph adapter using native DuckDB. 

48 

49 This adapter provides the same API as the ACB-based KnowledgeGraphDatabaseAdapter 

50 but uses native DuckDB operations and Oneiric settings instead of ACB configuration. 

51 

52 Key differences from ACB implementation: 

53 - Uses Oneiric settings (dataclass-based) instead of ACB Config 

54 - No ACB dependency injection 

55 - Same hybrid sync/async pattern (sync DuckDB ops, async interface) 

56 - Maintains full API compatibility 

57 

58 Example: 

59 >>> settings = KnowledgeGraphAdapterSettings.from_settings() 

60 >>> async with KnowledgeGraphDatabaseAdapterOneiric(settings=settings) as kg: 

61 >>> entity = await kg.create_entity("project", "project", ["observation"]) 

62 >>> relation = await kg.create_relation("proj1", "proj2", "depends_on") 

63 

64 """ 

65 

66 def __init__( 

67 self, 

68 db_path: str | Path | None = None, 

69 settings: KnowledgeGraphAdapterSettings | None = None, 

70 ) -> None: 

71 """Initialize adapter with optional database path. 

72 

73 Args: 

74 db_path: Path to DuckDB database file. If None, uses path from settings. 

75 settings: KnowledgeGraphAdapterSettings instance. If None, creates from defaults. 

76 

77 """ 

78 self.db_path = str(db_path) if db_path else None 

79 self.settings = settings or KnowledgeGraphAdapterSettings.from_settings() 

80 self.conn: t.Any = None # DuckDB connection (sync) 

81 self._duckpgq_installed = False 

82 self._initialized = False 

83 

84 def __enter__(self) -> t.Self: 

85 """Sync context manager entry (not recommended - use async).""" 

86 msg = "Use 'async with' instead of 'with' for KnowledgeGraphDatabaseAdapterOneiric" 

87 raise RuntimeError(msg) 

88 

89 def __exit__( 

90 self, 

91 exc_type: type[BaseException] | None, 

92 exc_value: BaseException | None, 

93 traceback: TracebackType | None, 

94 ) -> None: 

95 """Sync context manager exit.""" 

96 

97 async def __aenter__(self) -> t.Self: 

98 """Async context manager entry.""" 

99 await self.initialize() 

100 return self 

101 

102 async def __aexit__( 

103 self, 

104 exc_type: type[BaseException] | None, 

105 exc_value: BaseException | None, 

106 traceback: TracebackType | None, 

107 ) -> None: 

108 """Async context manager exit with cleanup.""" 

109 self.close() 

110 

111 def close(self) -> None: 

112 """Close DuckDB connection.""" 

113 if self.conn is not None: 

114 self.conn.close() 

115 self.conn = None 

116 

117 def __del__(self) -> None: 

118 """Destructor to ensure cleanup.""" 

119 self.close() 

120 

121 def _get_db_path(self) -> str: 

122 """Get database path from settings or use default. 

123 

124 Returns: 

125 Database file path 

126 

127 """ 

128 # Prefer instance path when provided 

129 if self.db_path: 

130 return self.db_path 

131 

132 # Use settings path 

133 return str(self.settings.database_path) 

134 

135 async def initialize(self) -> None: 

136 """Initialize DuckDB connection and create schema. 

137 

138 This method: 

139 1. Gets database path from settings 

140 2. Creates sync DuckDB connection (fast, local) 

141 3. Installs and loads DuckPGQ extension 

142 4. Creates knowledge graph schema 

143 """ 

144 if self._initialized: 

145 return 

146 

147 if not DUCKDB_AVAILABLE: 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true

148 msg = "DuckDB not available. Install with: uv add duckdb" 

149 raise ImportError(msg) 

150 

151 # Get database path 

152 db_path = self._get_db_path() 

153 

154 # Create sync DuckDB connection (fast, local operation) 

155 self.conn = duckdb.connect(db_path) 

156 

157 # Install and load DuckPGQ extension 

158 try: 

159 extensions = self.settings.install_extensions 

160 for extension in extensions: 

161 self.conn.execute(f"INSTALL {extension} FROM community") 

162 self.conn.execute(f"LOAD {extension}") 

163 self._duckpgq_installed = True 

164 except Exception as e: 

165 msg = f"Failed to install DuckPGQ extension: {e}" 

166 raise RuntimeError(msg) from e 

167 

168 # Create schema (sync operations, complete quickly) 

169 await self._create_schema() 

170 

171 self._initialized = True 

172 

173 def _get_conn(self) -> t.Any: 

174 """Get DuckDB connection, raising error if not initialized. 

175 

176 Returns: 

177 Active DuckDB connection 

178 

179 Raises: 

180 RuntimeError: If connection not initialized 

181 

182 """ 

183 if self.conn is None: 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true

184 msg = "Database connection not initialized. Call initialize() first" 

185 raise RuntimeError(msg) 

186 return self.conn 

187 

188 async def _resolve_entity_id(self, identifier: str) -> str: 

189 """Resolve an entity identifier to its canonical ID.""" 

190 entity = await self.find_entity_by_name(identifier) 

191 if entity: 191 ↛ 196line 191 didn't jump to line 196 because the condition on line 191 was always true

192 # Type narrow entity["id"] to str 

193 entity_id = entity["id"] 

194 return entity_id if isinstance(entity_id, str) else str(entity_id) 

195 

196 row = ( 

197 self._get_conn() 

198 .execute( 

199 "SELECT id FROM kg_entities WHERE id = ?", 

200 (identifier,), 

201 ) 

202 .fetchone() 

203 ) 

204 if row: 

205 # Type narrow row[0] to str (id column is TEXT in SQL) 

206 return row[0] if isinstance(row[0], str) else str(row[0]) 

207 

208 msg = f"Entity '{identifier}' not found" 

209 raise ValueError(msg) 

210 

211 def _format_timestamp(self, value: t.Any) -> str | None: 

212 """Format timestamps consistently across DuckDB outputs.""" 

213 if value is None: 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true

214 return None 

215 return value.isoformat() if hasattr(value, "isoformat") else str(value) 

216 

217 async def _create_schema(self) -> None: 

218 """Create knowledge graph schema. 

219 

220 Creates: 

221 - kg_entities table (nodes) 

222 - kg_relationships table (edges) 

223 - Indexes for performance 

224 

225 Note: Executes synchronously but completes quickly (local operation) 

226 """ 

227 conn = self._get_conn() 

228 

229 # Create entities table (nodes/vertices) 

230 conn.execute(""" 

231 CREATE TABLE IF NOT EXISTS kg_entities ( 

232 id VARCHAR PRIMARY KEY, 

233 name VARCHAR NOT NULL, 

234 entity_type VARCHAR NOT NULL, 

235 observations VARCHAR[], 

236 properties JSON, 

237 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 

238 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 

239 metadata JSON 

240 ) 

241 """) 

242 

243 # Create relationships table (edges) 

244 conn.execute(""" 

245 CREATE TABLE IF NOT EXISTS kg_relationships ( 

246 id VARCHAR PRIMARY KEY, 

247 from_entity VARCHAR NOT NULL, 

248 to_entity VARCHAR NOT NULL, 

249 relation_type VARCHAR NOT NULL, 

250 properties JSON, 

251 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 

252 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 

253 metadata JSON 

254 ) 

255 """) 

256 

257 # Ensure columns exist when DuckPGQ pre-creates tables without all fields. 

258 relationship_columns = { 

259 row[1] 

260 for row in conn.execute("PRAGMA table_info('kg_relationships')").fetchall() 

261 } 

262 if "updated_at" not in relationship_columns: 262 ↛ 263line 262 didn't jump to line 263 because the condition on line 262 was never true

263 conn.execute( 

264 "ALTER TABLE kg_relationships " 

265 "ADD COLUMN updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP" 

266 ) 

267 

268 # Create indexes for performance 

269 conn.execute( 

270 "CREATE INDEX IF NOT EXISTS idx_entities_name ON kg_entities(name)", 

271 ) 

272 conn.execute( 

273 "CREATE INDEX IF NOT EXISTS idx_entities_type ON kg_entities(entity_type)", 

274 ) 

275 conn.execute( 

276 "CREATE INDEX IF NOT EXISTS idx_relationships_from " 

277 "ON kg_relationships(from_entity)", 

278 ) 

279 conn.execute( 

280 "CREATE INDEX IF NOT EXISTS idx_relationships_to " 

281 "ON kg_relationships(to_entity)", 

282 ) 

283 conn.execute( 

284 "CREATE INDEX IF NOT EXISTS idx_relationships_type " 

285 "ON kg_relationships(relation_type)", 

286 ) 

287 

288 async def create_entity( 

289 self, 

290 name: str, 

291 entity_type: str, 

292 observations: list[str] | None = None, 

293 properties: dict[str, t.Any] | None = None, 

294 metadata: dict[str, t.Any] | None = None, 

295 ) -> dict[str, t.Any]: 

296 """Create a new entity (node) in the knowledge graph. 

297 

298 Args: 

299 name: Entity name (must be unique) 

300 entity_type: Type/category of entity 

301 observations: List of observation strings 

302 properties: Additional properties as key-value pairs 

303 metadata: Additional metadata 

304 

305 Returns: 

306 Created entity as dictionary 

307 

308 Raises: 

309 ValueError: If entity with name already exists 

310 

311 """ 

312 conn = self._get_conn() 

313 

314 # Check if entity already exists 

315 existing = await self.find_entity_by_name(name) 

316 if existing: 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true

317 msg = f"Entity with name '{name}' already exists" 

318 raise ValueError(msg) 

319 

320 entity_id = str(uuid.uuid4()) 

321 now = datetime.now(tz=UTC) 

322 

323 # Sync DuckDB execution (fast, local operation) 

324 conn.execute( 

325 """ 

326 INSERT INTO kg_entities 

327 (id, name, entity_type, observations, properties, created_at, updated_at, metadata) 

328 VALUES (?, ?, ?, ?, ?, ?, ?, ?) 

329 """, 

330 ( 

331 entity_id, 

332 name, 

333 entity_type, 

334 observations or [], 

335 json.dumps(properties or {}), 

336 now, 

337 now, 

338 json.dumps(metadata or {}), 

339 ), 

340 ) 

341 

342 return { 

343 "id": entity_id, 

344 "name": name, 

345 "entity_type": entity_type, 

346 "observations": observations or [], 

347 "properties": properties or {}, 

348 "created_at": now.isoformat(), 

349 "updated_at": now.isoformat(), 

350 "metadata": metadata or {}, 

351 } 

352 

353 async def get_entity(self, entity_id: str) -> dict[str, t.Any] | None: 

354 """Get entity by ID. 

355 

356 Args: 

357 entity_id: Entity UUID 

358 

359 Returns: 

360 Entity dictionary or None if not found 

361 

362 """ 

363 conn = self._get_conn() 

364 

365 result = conn.execute( 

366 "SELECT * FROM kg_entities WHERE id = ?", 

367 (entity_id,), 

368 ).fetchone() 

369 

370 if not result: 

371 return None 

372 

373 return { 

374 "id": result[0], 

375 "name": result[1], 

376 "entity_type": result[2], 

377 "observations": list(result[3]) if result[3] else [], 

378 "properties": json.loads(result[4]) if result[4] else {}, 

379 "created_at": self._format_timestamp(result[5]), 

380 "updated_at": self._format_timestamp(result[6]), 

381 "metadata": json.loads(result[7]) if result[7] else {}, 

382 } 

383 

384 async def find_entity_by_name(self, name: str) -> dict[str, t.Any] | None: 

385 """Find entity by name. 

386 

387 Args: 

388 name: Entity name to search for 

389 

390 Returns: 

391 Entity dictionary or None if not found 

392 

393 """ 

394 conn = self._get_conn() 

395 

396 result = conn.execute( 

397 """ 

398 SELECT id, name, entity_type, observations, properties, 

399 created_at, updated_at, metadata 

400 FROM kg_entities 

401 WHERE name = ? 

402 """, 

403 (name,), 

404 ).fetchone() 

405 

406 if not result: 

407 return None 

408 

409 return { 

410 "id": result[0], 

411 "name": result[1], 

412 "entity_type": result[2], 

413 "observations": list(result[3]) if result[3] else [], 

414 "properties": json.loads(result[4]) if result[4] else {}, 

415 "created_at": result[5].isoformat() if result[5] else None, 

416 "updated_at": result[6].isoformat() if result[6] else None, 

417 "metadata": json.loads(result[7]) if result[7] else {}, 

418 } 

419 

420 async def create_relation( 

421 self, 

422 from_entity: str, 

423 to_entity: str, 

424 relation_type: str, 

425 properties: dict[str, t.Any] | None = None, 

426 metadata: dict[str, t.Any] | None = None, 

427 ) -> dict[str, t.Any]: 

428 """Create a relationship (edge) between two entities. 

429 

430 Args: 

431 from_entity: Source entity name 

432 to_entity: Target entity name 

433 relation_type: Type of relationship 

434 properties: Additional properties 

435 metadata: Additional metadata 

436 

437 Returns: 

438 Created relationship as dictionary 

439 

440 Raises: 

441 ValueError: If either entity doesn't exist 

442 

443 """ 

444 conn = self._get_conn() 

445 

446 # Resolve entity identifiers to IDs (accepts names or IDs) 

447 resolved_from_entity = await self._resolve_entity_id(from_entity) 

448 resolved_to_entity = await self._resolve_entity_id(to_entity) 

449 

450 relation_id = str(uuid.uuid4()) 

451 now = datetime.now(tz=UTC) 

452 

453 conn.execute( 

454 """ 

455 INSERT INTO kg_relationships 

456 (id, from_entity, to_entity, relation_type, properties, created_at, updated_at, metadata) 

457 VALUES (?, ?, ?, ?, ?, ?, ?, ?) 

458 """, 

459 ( 

460 relation_id, 

461 resolved_from_entity, 

462 resolved_to_entity, 

463 relation_type, 

464 json.dumps(properties or {}), 

465 now, 

466 now, 

467 json.dumps(metadata or {}), 

468 ), 

469 ) 

470 

471 return { 

472 "id": relation_id, 

473 "from_entity": resolved_from_entity, 

474 "to_entity": resolved_to_entity, 

475 "relation_type": relation_type, 

476 "properties": properties or {}, 

477 "created_at": now.isoformat(), 

478 "updated_at": now.isoformat(), 

479 "metadata": metadata or {}, 

480 } 

481 

482 async def add_observation( 

483 self, 

484 entity_name: str, 

485 observation: str, 

486 ) -> dict[str, t.Any]: 

487 """Add an observation to an entity. 

488 

489 Args: 

490 entity_name: Name of entity to update 

491 observation: Observation text to add 

492 

493 Returns: 

494 Updated entity dictionary 

495 

496 Raises: 

497 ValueError: If entity doesn't exist 

498 

499 """ 

500 conn = self._get_conn() 

501 

502 entity = await self.find_entity_by_name(entity_name) 

503 if not entity: 503 ↛ 504line 503 didn't jump to line 504 because the condition on line 503 was never true

504 msg = f"Entity '{entity_name}' not found" 

505 raise ValueError(msg) 

506 

507 now = datetime.now(tz=UTC) 

508 

509 # Append observation to array 

510 conn.execute( 

511 """ 

512 UPDATE kg_entities 

513 SET observations = list_append(observations, ?), 

514 updated_at = ? 

515 WHERE name = ? 

516 """, 

517 (observation, now, entity_name), 

518 ) 

519 

520 # Return updated entity 

521 return await self.find_entity_by_name(entity_name) # type: ignore[return-value] 

522 

523 async def search_entities( 

524 self, 

525 query: str | None = None, 

526 entity_type: str | None = None, 

527 limit: int = 10, 

528 ) -> list[dict[str, t.Any]]: 

529 """Search for entities by name or observations. 

530 

531 Args: 

532 query: Search query (matches name and observations) 

533 entity_type: Filter by entity type 

534 limit: Maximum number of results 

535 

536 Returns: 

537 List of matching entities 

538 

539 """ 

540 conn = self._get_conn() 

541 

542 # Build query dynamically 

543 conditions = [] 

544 params: list[t.Any] = [] 

545 

546 if query: 546 ↛ 550line 546 didn't jump to line 550 because the condition on line 546 was always true

547 conditions.append("(name LIKE ? OR list_contains(observations, ?))") 

548 params.extend([f"%{query}%", query]) 

549 

550 if entity_type: 550 ↛ 551line 550 didn't jump to line 551 because the condition on line 550 was never true

551 conditions.append("entity_type = ?") 

552 params.append(entity_type) 

553 

554 where_clause = " AND ".join(conditions) if conditions else "1=1" 

555 # Build SQL safely - all user input is parameterized via params list 

556 sql = ( 

557 "SELECT id, name, entity_type, observations, properties, " 

558 "created_at, updated_at, metadata " 

559 "FROM kg_entities WHERE " 

560 + where_clause 

561 + " ORDER BY created_at DESC LIMIT ?" 

562 ) 

563 params.append(limit) 

564 

565 result = conn.execute(sql, params).fetchall() 

566 

567 # Use list comprehension for better readability (refurb FURB138) 

568 return [ 

569 { 

570 "id": row[0], 

571 "name": row[1], 

572 "entity_type": row[2], 

573 "observations": list(row[3]) if row[3] else [], 

574 "properties": json.loads(row[4]) if row[4] else {}, 

575 "created_at": self._format_timestamp(row[5]), 

576 "updated_at": self._format_timestamp(row[6]), 

577 "metadata": json.loads(row[7]) if row[7] else {}, 

578 } 

579 for row in result 

580 ] 

581 

582 async def get_relationships( 

583 self, 

584 entity_name: str, 

585 relation_type: str | None = None, 

586 direction: str = "both", 

587 ) -> list[dict[str, t.Any]]: 

588 """Get all relationships for a specific entity. 

589 

590 Args: 

591 entity_name: Name of entity to get relationships for 

592 relation_type: Optional filter by relationship type 

593 direction: "outgoing", "incoming", or "both" (default) 

594 

595 Returns: 

596 List of relationships involving this entity 

597 

598 """ 

599 conn = self._get_conn() 

600 resolved_entity = await self._resolve_entity_id(entity_name) 

601 

602 conditions = [] 

603 params: list[t.Any] = [] 

604 

605 if direction == "outgoing": 605 ↛ 606line 605 didn't jump to line 606 because the condition on line 605 was never true

606 conditions.append("from_entity = ?") 

607 params.append(resolved_entity) 

608 elif direction == "incoming": 608 ↛ 609line 608 didn't jump to line 609 because the condition on line 608 was never true

609 conditions.append("to_entity = ?") 

610 params.append(resolved_entity) 

611 else: # both 

612 conditions.append("(from_entity = ? OR to_entity = ?)") 

613 params.extend([resolved_entity, resolved_entity]) 

614 

615 if relation_type: 615 ↛ 616line 615 didn't jump to line 616 because the condition on line 615 was never true

616 conditions.append("relation_type = ?") 

617 params.append(relation_type) 

618 

619 where_clause = " AND ".join(conditions) 

620 # Build SQL safely - all user input is parameterized via params list 

621 sql = ( 

622 "SELECT id, from_entity, to_entity, relation_type, properties, " 

623 "created_at, updated_at, metadata " 

624 "FROM kg_relationships WHERE " + where_clause + " ORDER BY created_at DESC" 

625 ) 

626 

627 result = conn.execute(sql, params).fetchall() 

628 

629 # Use list comprehension for better readability (refurb FURB138) 

630 return [ 

631 { 

632 "id": row[0], 

633 "from_entity": row[1], 

634 "to_entity": row[2], 

635 "relation_type": row[3], 

636 "properties": json.loads(row[4]) if row[4] else {}, 

637 "created_at": self._format_timestamp(row[5]), 

638 "updated_at": self._format_timestamp(row[6]), 

639 "metadata": json.loads(row[7]) if row[7] else {}, 

640 } 

641 for row in result 

642 ] 

643 

644 async def find_path( 

645 self, 

646 from_entity: str, 

647 to_entity: str, 

648 max_depth: int = 5, 

649 ) -> list[dict[str, t.Any]]: 

650 """Find paths between two entities using breadth-first search. 

651 

652 Args: 

653 from_entity: Starting entity name 

654 to_entity: Target entity name 

655 max_depth: Maximum path length to search 

656 

657 Returns: 

658 Paths found between entities with hop counts 

659 

660 """ 

661 conn = self._get_conn() 

662 resolved_from_entity = await self._resolve_entity_id(from_entity) 

663 resolved_to_entity = await self._resolve_entity_id(to_entity) 

664 

665 # Get all relationships in one query (sync, fast local operation) 

666 result = conn.execute( 

667 "SELECT from_entity, to_entity, relation_type FROM kg_relationships", 

668 ).fetchall() 

669 

670 # Build adjacency list 

671 graph: dict[str, list[tuple[str, str]]] = {} 

672 for row in result: 

673 from_e = row[0] 

674 to_e = row[1] 

675 rel_type = row[2] 

676 

677 if from_e not in graph: 677 ↛ 679line 677 didn't jump to line 679 because the condition on line 677 was always true

678 graph[from_e] = [] 

679 graph[from_e].append((to_e, rel_type)) 

680 

681 # BFS to find shortest path 

682 from collections import deque 

683 

684 queue: deque[tuple[str, list[str], list[str]]] = deque( 

685 [(resolved_from_entity, [resolved_from_entity], [])], 

686 ) 

687 visited = {resolved_from_entity} 

688 

689 paths: list[dict[str, t.Any]] = [] 

690 while queue and not paths: # Find first path only (refurb FURB115) 690 ↛ 711line 690 didn't jump to line 711 because the condition on line 690 was always true

691 current, path, relations = queue.popleft() 

692 

693 if len(path) > max_depth + 1: 693 ↛ 694line 693 didn't jump to line 694 because the condition on line 693 was never true

694 continue 

695 

696 if current == resolved_to_entity and len(path) > 1: 

697 paths.append( 

698 { 

699 "path": path, 

700 "relations": relations, 

701 "hops": len(path) - 1, 

702 }, 

703 ) 

704 break 

705 

706 for neighbor, rel_type in graph.get(current, []): 

707 if neighbor not in visited: 707 ↛ 706line 707 didn't jump to line 706 because the condition on line 707 was always true

708 visited.add(neighbor) 

709 queue.append((neighbor, [*path, neighbor], [*relations, rel_type])) 

710 

711 return paths 

712 

713 async def get_stats(self) -> dict[str, t.Any]: 

714 """Get statistics about the knowledge graph. 

715 

716 Returns: 

717 Summary with entity count, relationship count, type distributions 

718 

719 """ 

720 conn = self._get_conn() 

721 

722 # Entity count 

723 entity_count = conn.execute("SELECT COUNT(*) FROM kg_entities").fetchone()[0] 

724 

725 # Relationship count 

726 relationship_count = conn.execute( 

727 "SELECT COUNT(*) FROM kg_relationships", 

728 ).fetchone()[0] 

729 

730 # Entity types distribution 

731 entity_types_result = conn.execute( 

732 """ 

733 SELECT entity_type, COUNT(*) as count 

734 FROM kg_entities 

735 GROUP BY entity_type 

736 """, 

737 ).fetchall() 

738 entity_types = {row[0]: row[1] for row in entity_types_result} 

739 

740 # Relationship types distribution 

741 relationship_types_result = conn.execute( 

742 """ 

743 SELECT relation_type, COUNT(*) as count 

744 FROM kg_relationships 

745 GROUP BY relation_type 

746 """, 

747 ).fetchall() 

748 relationship_types = {row[0]: row[1] for row in relationship_types_result} 

749 

750 return { 

751 "total_entities": entity_count or 0, 

752 "total_relationships": relationship_count or 0, 

753 "entity_types": entity_types, 

754 "relationship_types": relationship_types, 

755 }