Coverage for session_buddy / multi_project_coordinator.py: 86.18%

234 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1#!/usr/bin/env python3 

2"""Multi-Project Session Coordination. 

3 

4Manages relationships and coordination between multiple projects and their sessions. 

5""" 

6 

7import asyncio 

8import hashlib 

9import json 

10import time 

11from datetime import UTC, datetime, timedelta 

12from typing import Any, Literal, Protocol 

13 

14from pydantic import BaseModel, Field, ValidationInfo, field_validator 

15 

16 

17class ReflectionDatabaseProtocol(Protocol): 

18 """Protocol for reflection database implementations.""" 

19 

20 async def search_conversations( 

21 self, 

22 query: str, 

23 limit: int = 10, 

24 threshold: float = 0.7, 

25 project: str | None = None, 

26 min_score: float | None = None, 

27 ) -> list[dict[str, Any]]: ... 

28 

29 @property 

30 def conn(self) -> Any: ... 

31 

32 

33class ProjectGroup(BaseModel): 

34 """Represents a group of related projects.""" 

35 

36 id: str = Field(description="Unique identifier for the project group") 

37 name: str = Field( 

38 min_length=1, 

39 max_length=200, 

40 description="Name of the project group", 

41 ) 

42 description: str = Field( 

43 default="", 

44 max_length=1000, 

45 description="Description of the project group", 

46 ) 

47 projects: list[str] = Field( 

48 min_length=1, 

49 description="List of project identifiers in this group", 

50 ) 

51 metadata: dict[str, Any] = Field( 

52 default_factory=dict, 

53 description="Additional metadata for the project group", 

54 ) 

55 created_at: datetime = Field( 

56 default_factory=lambda: datetime.now(UTC), 

57 description="When the project group was created", 

58 ) 

59 

60 @field_validator("projects") 

61 @classmethod 

62 def validate_projects(cls, v: list[str]) -> list[str]: 

63 """Ensure all project names are non-empty.""" 

64 if not v: 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true

65 msg = "Project group must contain at least one project" 

66 raise ValueError(msg) 

67 for project in v: 

68 if not project.strip(): 68 ↛ 69line 68 didn't jump to line 69 because the condition on line 68 was never true

69 msg = "Project names cannot be empty" 

70 raise ValueError(msg) 

71 return [p.strip() for p in v] 

72 

73 

74class ProjectDependency(BaseModel): 

75 """Represents a dependency between two projects.""" 

76 

77 id: str = Field(description="Unique identifier for the project dependency") 

78 source_project: str = Field( 

79 min_length=1, 

80 description="The project that depends on another", 

81 ) 

82 target_project: str = Field( 

83 min_length=1, 

84 description="The project that is depended upon", 

85 ) 

86 dependency_type: Literal["uses", "extends", "references", "shares_code"] = Field( 

87 description="Type of dependency relationship", 

88 ) 

89 description: str = Field( 

90 default="", 

91 max_length=1000, 

92 description="Description of the dependency relationship", 

93 ) 

94 metadata: dict[str, Any] = Field( 

95 default_factory=dict, 

96 description="Additional metadata for the dependency", 

97 ) 

98 created_at: datetime = Field( 

99 default_factory=lambda: datetime.now(UTC), 

100 description="When the dependency was created", 

101 ) 

102 

103 @field_validator("source_project", "target_project") 

104 @classmethod 

105 def validate_project_names(cls, v: str) -> str: 

106 """Ensure project names are non-empty.""" 

107 if not v.strip(): 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 msg = "Project names cannot be empty" 

109 raise ValueError(msg) 

110 return v.strip() 

111 

112 @field_validator("target_project") 

113 @classmethod 

114 def validate_not_self_dependency(cls, v: str, info: ValidationInfo) -> str: 

115 """Ensure projects don't depend on themselves.""" 

116 if hasattr(info, "data") and info.data and v == info.data.get("source_project"): 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true

117 msg = "Project cannot depend on itself" 

118 raise ValueError(msg) 

119 return v 

120 

121 

122class SessionLink(BaseModel): 

123 """Represents a link between sessions across projects.""" 

124 

125 id: str = Field(description="Unique identifier for the session link") 

126 source_session_id: str = Field( 

127 min_length=1, 

128 description="The session that links to another", 

129 ) 

130 target_session_id: str = Field( 

131 min_length=1, 

132 description="The session that is linked to", 

133 ) 

134 link_type: Literal["related", "continuation", "reference", "dependency"] = Field( 

135 description="Type of relationship between sessions", 

136 ) 

137 context: str = Field( 

138 default="", 

139 max_length=2000, 

140 description="Context or reason for the session link", 

141 ) 

142 metadata: dict[str, Any] = Field( 

143 default_factory=dict, 

144 description="Additional metadata for the session link", 

145 ) 

146 created_at: datetime = Field( 

147 default_factory=lambda: datetime.now(UTC), 

148 description="When the session link was created", 

149 ) 

150 

151 @field_validator("source_session_id", "target_session_id") 

152 @classmethod 

153 def validate_session_ids(cls, v: str) -> str: 

154 """Ensure session IDs are non-empty.""" 

155 if not v.strip(): 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true

156 msg = "Session IDs cannot be empty" 

157 raise ValueError(msg) 

158 return v.strip() 

159 

160 @field_validator("target_session_id") 

161 @classmethod 

162 def validate_not_self_link(cls, v: str, info: ValidationInfo) -> str: 

163 """Ensure sessions don't link to themselves.""" 

164 if ( 164 ↛ 169line 164 didn't jump to line 169 because the condition on line 164 was never true

165 hasattr(info, "data") 

166 and info.data 

167 and v == info.data.get("source_session_id") 

168 ): 

169 msg = "Session cannot link to itself" 

170 raise ValueError(msg) 

171 return v 

172 

173 

174class MultiProjectCoordinator: 

175 """Coordinates sessions and knowledge across multiple projects.""" 

176 

177 def __init__(self, reflection_db: ReflectionDatabaseProtocol) -> None: 

178 """Initialize multi-project coordinator with database connection.""" 

179 self.reflection_db = reflection_db 

180 self._initialize_caches() 

181 

182 def _initialize_caches(self) -> None: 

183 """Initialize internal cache dictionaries.""" 

184 self.active_project_groups: dict[str, ProjectGroup] = {} 

185 self.dependency_cache: dict[str, list[ProjectDependency]] = {} 

186 self.session_links_cache: dict[str, list[SessionLink]] = {} 

187 

188 def _get_conn(self) -> Any: 

189 """Get database connection, raising an error if not initialized.""" 

190 if self.reflection_db.conn is None: 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true

191 msg = "Database connection not initialized" 

192 raise RuntimeError(msg) 

193 return self.reflection_db.conn 

194 

195 async def create_project_group( 

196 self, 

197 name: str, 

198 projects: list[str], 

199 description: str = "", 

200 metadata: dict[str, Any] | None = None, 

201 ) -> ProjectGroup: 

202 """Create a new project group.""" 

203 group_id = hashlib.md5( 

204 f"{name}_{time.time()}".encode(), 

205 usedforsecurity=False, 

206 ).hexdigest() 

207 

208 group = ProjectGroup( 

209 id=group_id, 

210 name=name, 

211 description=description, 

212 projects=projects, 

213 metadata=metadata or {}, 

214 ) 

215 

216 # Store in database 

217 await asyncio.get_event_loop().run_in_executor( 

218 None, 

219 lambda: self._get_conn().execute( 

220 """ 

221 INSERT INTO project_groups (id, name, description, projects, created_at, metadata) 

222 VALUES (?, ?, ?, ?, ?, ?) 

223 """, 

224 [ 

225 group.id, 

226 group.name, 

227 group.description, 

228 group.projects, 

229 group.created_at, 

230 json.dumps(group.metadata), 

231 ], 

232 ), 

233 ) 

234 

235 self._get_conn().commit() 

236 self.active_project_groups[group_id] = group 

237 

238 return group 

239 

240 async def add_project_dependency( 

241 self, 

242 source_project: str, 

243 target_project: str, 

244 dependency_type: Literal["uses", "extends", "references", "shares_code"], 

245 description: str = "", 

246 metadata: dict[str, Any] | None = None, 

247 ) -> ProjectDependency: 

248 """Add a dependency relationship between projects.""" 

249 dep_id = hashlib.md5( 

250 f"{source_project}_{target_project}_{dependency_type}".encode(), 

251 usedforsecurity=False, 

252 ).hexdigest() 

253 

254 dependency = ProjectDependency( 

255 id=dep_id, 

256 source_project=source_project, 

257 target_project=target_project, 

258 dependency_type=dependency_type, 

259 description=description, 

260 metadata=metadata or {}, 

261 ) 

262 

263 # Store in database 

264 await asyncio.get_event_loop().run_in_executor( 

265 None, 

266 lambda: self._get_conn().execute( 

267 """ 

268 INSERT INTO project_dependencies 

269 (id, source_project, target_project, dependency_type, description, created_at, metadata) 

270 VALUES (?, ?, ?, ?, ?, ?, ?) 

271 ON CONFLICT (id) DO UPDATE SET 

272 source_project = EXCLUDED.source_project, 

273 target_project = EXCLUDED.target_project, 

274 dependency_type = EXCLUDED.dependency_type, 

275 description = EXCLUDED.description, 

276 created_at = EXCLUDED.created_at, 

277 metadata = EXCLUDED.metadata 

278 """, 

279 [ 

280 dependency.id, 

281 dependency.source_project, 

282 dependency.target_project, 

283 dependency.dependency_type, 

284 dependency.description, 

285 dependency.created_at, 

286 json.dumps(dependency.metadata), 

287 ], 

288 ), 

289 ) 

290 

291 self._get_conn().commit() 

292 

293 # Clear cache for affected projects 

294 self._clear_dependency_cache(source_project) 

295 self._clear_dependency_cache(target_project) 

296 

297 return dependency 

298 

299 async def link_sessions( 

300 self, 

301 source_session_id: str, 

302 target_session_id: str, 

303 link_type: Literal["related", "continuation", "reference", "dependency"], 

304 context: str = "", 

305 metadata: dict[str, Any] | None = None, 

306 ) -> SessionLink: 

307 """Create a link between two sessions across projects.""" 

308 link_id = hashlib.md5( 

309 f"{source_session_id}_{target_session_id}_{link_type}".encode(), 

310 usedforsecurity=False, 

311 ).hexdigest() 

312 

313 link = SessionLink( 

314 id=link_id, 

315 source_session_id=source_session_id, 

316 target_session_id=target_session_id, 

317 link_type=link_type, 

318 context=context, 

319 metadata=metadata or {}, 

320 ) 

321 

322 # Store in database 

323 await asyncio.get_event_loop().run_in_executor( 

324 None, 

325 lambda: self._get_conn().execute( 

326 """ 

327 INSERT INTO session_links 

328 (id, source_session_id, target_session_id, link_type, context, created_at, metadata) 

329 VALUES (?, ?, ?, ?, ?, ?, ?) 

330 ON CONFLICT (id) DO UPDATE SET 

331 source_session_id = EXCLUDED.source_session_id, 

332 target_session_id = EXCLUDED.target_session_id, 

333 link_type = EXCLUDED.link_type, 

334 context = EXCLUDED.context, 

335 created_at = EXCLUDED.created_at, 

336 metadata = EXCLUDED.metadata 

337 """, 

338 [ 

339 link.id, 

340 link.source_session_id, 

341 link.target_session_id, 

342 link.link_type, 

343 link.context, 

344 link.created_at, 

345 json.dumps(link.metadata), 

346 ], 

347 ), 

348 ) 

349 

350 self._get_conn().commit() 

351 

352 # Clear cache for affected sessions 

353 self._clear_session_links_cache(source_session_id) 

354 self._clear_session_links_cache(target_session_id) 

355 

356 return link 

357 

358 async def get_project_groups( 

359 self, 

360 project: str | None = None, 

361 ) -> list[ProjectGroup]: 

362 """Get project groups, optionally filtered by project.""" 

363 sql = "SELECT id, name, description, projects, created_at, metadata FROM project_groups" 

364 params = [] 

365 

366 if project: 366 ↛ 367line 366 didn't jump to line 367 because the condition on line 366 was never true

367 sql += " WHERE list_contains(projects, ?)" 

368 params.append(project) 

369 

370 sql += " ORDER BY created_at DESC" 

371 

372 results = await asyncio.get_event_loop().run_in_executor( 

373 None, 

374 lambda: self._get_conn().execute(sql, params).fetchall(), 

375 ) 

376 

377 groups = [] 

378 for row in results: 

379 group = ProjectGroup( 

380 id=row[0], 

381 name=row[1], 

382 description=row[2], 

383 projects=row[3], 

384 metadata=json.loads(row[5]) if row[5] else {}, 

385 created_at=row[4], 

386 ) 

387 groups.append(group) 

388 self.active_project_groups[group.id] = group 

389 

390 return groups 

391 

392 async def get_project_dependencies( 

393 self, 

394 project: str, 

395 direction: str = "both", # "outbound", "inbound", "both" 

396 ) -> list[ProjectDependency]: 

397 """Get dependencies for a project.""" 

398 # Create a unique cache key that includes the direction 

399 cache_key = f"{project}_{direction}" 

400 if cache_key in self.dependency_cache: 400 ↛ 401line 400 didn't jump to line 401 because the condition on line 400 was never true

401 return self.dependency_cache[cache_key] 

402 

403 conditions = [] 

404 params = [] 

405 

406 if direction == "outbound": 

407 conditions.append("source_project = ?") 

408 params.append(project) 

409 elif direction == "inbound": 409 ↛ 410line 409 didn't jump to line 410 because the condition on line 409 was never true

410 conditions.append("target_project = ?") 

411 params.append(project) 

412 else: # both 

413 conditions.append("(source_project = ? OR target_project = ?)") 

414 params.extend([project, project]) 

415 

416 # Build SQL safely - all user input is parameterized via params list 

417 sql = ( 

418 "SELECT id, source_project, target_project, dependency_type, " 

419 "description, created_at, metadata FROM project_dependencies WHERE " 

420 + " OR ".join(conditions) 

421 + " ORDER BY created_at DESC" 

422 ) 

423 

424 results = await asyncio.get_event_loop().run_in_executor( 

425 None, 

426 lambda: self._get_conn().execute(sql, params).fetchall(), 

427 ) 

428 

429 dependencies = [] 

430 for row in results: 

431 dep = ProjectDependency( 

432 id=row[0], 

433 source_project=row[1], 

434 target_project=row[2], 

435 dependency_type=row[3], 

436 description=row[4], 

437 metadata=json.loads(row[6]) if row[6] else {}, 

438 created_at=row[5], 

439 ) 

440 dependencies.append(dep) 

441 

442 # Cache with the direction-specific key 

443 self.dependency_cache[cache_key] = dependencies 

444 return dependencies 

445 

446 async def get_session_links(self, session_id: str) -> list[SessionLink]: 

447 """Get all links for a session.""" 

448 if session_id in self.session_links_cache: 448 ↛ 449line 448 didn't jump to line 449 because the condition on line 448 was never true

449 return self.session_links_cache[session_id] 

450 

451 sql = """ 

452 SELECT id, source_session_id, target_session_id, link_type, context, created_at, metadata 

453 FROM session_links 

454 WHERE source_session_id = ? OR target_session_id = ? 

455 ORDER BY created_at DESC 

456 """ 

457 

458 results = await asyncio.get_event_loop().run_in_executor( 

459 None, 

460 lambda: self._get_conn() 

461 .execute( 

462 sql, 

463 [session_id, session_id], 

464 ) 

465 .fetchall(), 

466 ) 

467 

468 links = [] 

469 for row in results: 

470 link = SessionLink( 

471 id=row[0], 

472 source_session_id=row[1], 

473 target_session_id=row[2], 

474 link_type=row[3], 

475 context=row[4], 

476 metadata=json.loads(row[6]) if row[6] else {}, 

477 created_at=row[5], 

478 ) 

479 links.append(link) 

480 

481 self.session_links_cache[session_id] = links 

482 return links 

483 

484 async def find_related_conversations( 

485 self, 

486 current_project: str, 

487 query: str, 

488 limit: int = 10, 

489 ) -> list[dict[str, Any]]: 

490 """Find conversations across related projects.""" 

491 # Get project dependencies to find related projects 

492 dependencies = await self.get_project_dependencies(current_project) 

493 related_projects = {current_project} 

494 

495 for dep in dependencies: 

496 if dep.source_project == current_project: 496 ↛ 498line 496 didn't jump to line 498 because the condition on line 496 was always true

497 related_projects.add(dep.target_project) 

498 if dep.target_project == current_project: 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true

499 related_projects.add(dep.source_project) 

500 

501 # Search conversations in all related projects 

502 results = [] 

503 

504 for project in related_projects: 

505 project_results = await self.reflection_db.search_conversations( 

506 query=query, 

507 limit=limit, 

508 project=project, 

509 ) 

510 

511 for result in project_results: 

512 result["source_project"] = project 

513 result["is_current_project"] = project == current_project 

514 results.append(result) 

515 

516 # Sort by relevance score and return top results 

517 results.sort(key=lambda x: x.get("score", 0), reverse=True) 

518 return results[:limit] 

519 

520 async def get_cross_project_insights( 

521 self, 

522 projects: list[str], 

523 time_range_days: int = 30, 

524 ) -> dict[str, Any]: 

525 """Get insights across multiple projects.""" 

526 since_date = datetime.now(UTC) - timedelta(days=time_range_days) 

527 

528 insights = self._initialize_insights_structure() 

529 insights["project_activity"] = await self._analyze_project_activity( 

530 projects, 

531 since_date, 

532 ) 

533 insights["common_patterns"] = await self._find_common_patterns( 

534 projects, 

535 since_date, 

536 ) 

537 

538 return insights 

539 

540 def _initialize_insights_structure(self) -> dict[str, Any]: 

541 """Initialize the insights data structure.""" 

542 return { 

543 "project_activity": {}, 

544 "common_patterns": [], 

545 "knowledge_gaps": [], 

546 "collaboration_opportunities": [], 

547 } 

548 

549 async def _analyze_project_activity( 

550 self, 

551 projects: list[str], 

552 since_date: datetime, 

553 ) -> dict[str, Any]: 

554 """Analyze activity across projects.""" 

555 activity_data = {} 

556 

557 for project in projects: 

558 project_stats = await self._get_project_stats(project, since_date) 

559 if project_stats: 559 ↛ 557line 559 didn't jump to line 557 because the condition on line 559 was always true

560 activity_data[project] = project_stats 

561 

562 return activity_data 

563 

564 async def _get_project_stats( 

565 self, 

566 project: str, 

567 since_date: datetime, 

568 ) -> dict[str, Any] | None: 

569 """Get statistics for a single project.""" 

570 sql = """ 

571 SELECT COUNT(*) as conversation_count, 

572 MAX(timestamp) as last_activity, 

573 AVG(LENGTH(content)) as avg_content_length 

574 FROM conversations 

575 WHERE project = ? AND timestamp >= ? 

576 """ 

577 

578 result = await asyncio.get_event_loop().run_in_executor( 

579 None, 

580 lambda: self._get_conn().execute(sql, [project, since_date]).fetchone(), 

581 ) 

582 

583 if result: 583 ↛ 589line 583 didn't jump to line 589 because the condition on line 583 was always true

584 return { 

585 "conversation_count": result[0], 

586 "last_activity": result[1], 

587 "avg_content_length": result[2], 

588 } 

589 return None 

590 

591 async def _find_common_patterns( 

592 self, 

593 projects: list[str], 

594 since_date: datetime, 

595 ) -> list[dict[str, Any]]: 

596 """Find common patterns across projects.""" 

597 conversation_data = await self._get_conversation_data(projects, since_date) 

598 project_keywords = self._extract_project_keywords(conversation_data) 

599 return self._identify_common_patterns(project_keywords) 

600 

601 async def _get_conversation_data( 

602 self, 

603 projects: list[str], 

604 since_date: datetime, 

605 ) -> list[tuple[str, str]]: 

606 """Get conversation data for pattern analysis.""" 

607 sql = """ 

608 SELECT project, content 

609 FROM conversations 

610 WHERE project = ANY(?) AND timestamp >= ? 

611 """ 

612 

613 return await asyncio.get_event_loop().run_in_executor( 

614 None, 

615 lambda: self._get_conn().execute(sql, [projects, since_date]).fetchall(), 

616 ) 

617 

618 def _extract_project_keywords( 

619 self, 

620 conversation_data: list[tuple[str, str]], 

621 ) -> dict[str, dict[str, int]]: 

622 """Extract keywords from project conversations.""" 

623 project_keywords: dict[str, dict[str, int]] = {} 

624 

625 for project, content in conversation_data: 

626 if project not in project_keywords: 626 ↛ 629line 626 didn't jump to line 629 because the condition on line 626 was always true

627 project_keywords[project] = {} 

628 

629 words = content.lower().split() 

630 for word in words: 

631 if len(word) > 4: # Skip short words 

632 project_keywords[project][word] = ( 

633 project_keywords[project].get(word, 0) + 1 

634 ) 

635 

636 return project_keywords 

637 

638 def _identify_common_patterns( 

639 self, 

640 project_keywords: dict[str, dict[str, int]], 

641 ) -> list[dict[str, Any]]: 

642 """Identify patterns common across multiple projects.""" 

643 common_keywords: dict[str, list[tuple[str, int]]] = {} 

644 patterns = [] 

645 

646 # Group keywords by occurrence across projects 

647 for project, keywords in project_keywords.items(): 

648 for word, count in keywords.items(): 

649 if word not in common_keywords: 

650 common_keywords[word] = [] 

651 common_keywords[word].append((project, count)) 

652 

653 # Filter to keywords present in multiple projects 

654 for word, project_counts in common_keywords.items(): 

655 if len(project_counts) >= 2: # Present in at least 2 projects 

656 patterns.append( 

657 { 

658 "pattern": word, 

659 "projects": [p[0] for p in project_counts], 

660 "frequency": sum(p[1] for p in project_counts), 

661 }, 

662 ) 

663 

664 # Sort by frequency 

665 patterns.sort(key=lambda x: int(x["frequency"]), reverse=True) # type: ignore[arg-type,call-overload] 

666 return patterns[:10] # Return top 10 patterns 

667 

668 def _clear_dependency_cache(self, project: str) -> None: 

669 """Clear dependency cache for a project.""" 

670 # Remove all cache entries for this project (regardless of direction) 

671 keys_to_remove = [ 

672 key for key in self.dependency_cache if key.startswith(f"{project}_") 

673 ] 

674 for key in keys_to_remove: 674 ↛ 675line 674 didn't jump to line 675 because the loop on line 674 never started

675 del self.dependency_cache[key] 

676 

677 def _clear_session_links_cache(self, session_id: str) -> None: 

678 """Clear session links cache for a session.""" 

679 if session_id in self.session_links_cache: 679 ↛ 680line 679 didn't jump to line 680 because the condition on line 679 was never true

680 del self.session_links_cache[session_id] 

681 

682 async def cleanup_old_links(self, max_age_days: int = 365) -> dict[str, Any]: 

683 """Clean up old session links and dependencies.""" 

684 cutoff_date = datetime.now(UTC) - timedelta(days=max_age_days) 

685 

686 # Count old links before deletion 

687 count_before = await asyncio.get_event_loop().run_in_executor( 

688 None, 

689 lambda: self._get_conn() 

690 .execute( 

691 "SELECT COUNT(*) FROM session_links WHERE created_at < ?", 

692 [cutoff_date], 

693 ) 

694 .fetchone()[0], 

695 ) 

696 

697 # Clean up old session links 

698 await asyncio.get_event_loop().run_in_executor( 

699 None, 

700 lambda: self._get_conn().execute( 

701 "DELETE FROM session_links WHERE created_at < ?", 

702 [cutoff_date], 

703 ), 

704 ) 

705 

706 self._get_conn().commit() 

707 

708 # Clear caches 

709 self.session_links_cache.clear() 

710 

711 return { 

712 "deleted_session_links": count_before, 

713 }