Coverage for session_mgmt_mcp/multi_project_coordinator.py: 21.43%

172 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-01 05:22 -0700

1#!/usr/bin/env python3 

2"""Multi-Project Session Coordination. 

3 

4Manages relationships and coordination between multiple projects and their sessions. 

5""" 

6 

7import asyncio 

8import hashlib 

9import json 

10import time 

11from dataclasses import dataclass, field 

12from datetime import UTC, datetime, timedelta 

13from typing import Any 

14 

15from .reflection_tools import ReflectionDatabase 

16 

17 

18@dataclass 

19class ProjectGroup: 

20 """Represents a group of related projects.""" 

21 

22 id: str 

23 name: str 

24 description: str 

25 projects: list[str] 

26 metadata: dict[str, Any] = field(default_factory=dict) 

27 created_at: datetime = field(default_factory=lambda: datetime.now(UTC)) 

28 

29 

30@dataclass 

31class ProjectDependency: 

32 """Represents a dependency between two projects.""" 

33 

34 id: str 

35 source_project: str 

36 target_project: str 

37 dependency_type: str # 'uses', 'extends', 'references', 'shares_code' 

38 description: str 

39 metadata: dict[str, Any] = field(default_factory=dict) 

40 created_at: datetime = field(default_factory=lambda: datetime.now(UTC)) 

41 

42 

43@dataclass 

44class SessionLink: 

45 """Represents a link between sessions across projects.""" 

46 

47 id: str 

48 source_session_id: str 

49 target_session_id: str 

50 link_type: str # 'related', 'continuation', 'reference', 'dependency' 

51 context: str 

52 metadata: dict[str, Any] = field(default_factory=dict) 

53 created_at: datetime = field(default_factory=lambda: datetime.now(UTC)) 

54 

55 

56class MultiProjectCoordinator: 

57 """Coordinates sessions and knowledge across multiple projects.""" 

58 

59 def __init__(self, reflection_db: ReflectionDatabase) -> None: 

60 self.reflection_db = reflection_db 

61 self.active_project_groups: dict[str, ProjectGroup] = {} 

62 self.dependency_cache: dict[str, list[ProjectDependency]] = {} 

63 self.session_links_cache: dict[str, list[SessionLink]] = {} 

64 

65 async def create_project_group( 

66 self, 

67 name: str, 

68 projects: list[str], 

69 description: str = "", 

70 metadata: dict[str, Any] | None = None, 

71 ) -> ProjectGroup: 

72 """Create a new project group.""" 

73 group_id = hashlib.md5(f"{name}_{time.time()}".encode()).hexdigest() 

74 

75 group = ProjectGroup( 

76 id=group_id, 

77 name=name, 

78 description=description, 

79 projects=projects, 

80 metadata=metadata or {}, 

81 ) 

82 

83 # Store in database 

84 await asyncio.get_event_loop().run_in_executor( 

85 None, 

86 lambda: self.reflection_db.conn.execute( 

87 """ 

88 INSERT INTO project_groups (id, name, description, projects, created_at, metadata) 

89 VALUES (?, ?, ?, ?, ?, ?) 

90 """, 

91 [ 

92 group.id, 

93 group.name, 

94 group.description, 

95 group.projects, 

96 group.created_at, 

97 json.dumps(group.metadata), 

98 ], 

99 ), 

100 ) 

101 

102 self.reflection_db.conn.commit() 

103 self.active_project_groups[group_id] = group 

104 

105 return group 

106 

107 async def add_project_dependency( 

108 self, 

109 source_project: str, 

110 target_project: str, 

111 dependency_type: str, 

112 description: str = "", 

113 metadata: dict[str, Any] | None = None, 

114 ) -> ProjectDependency: 

115 """Add a dependency relationship between projects.""" 

116 dep_id = hashlib.md5( 

117 f"{source_project}_{target_project}_{dependency_type}".encode(), 

118 ).hexdigest() 

119 

120 dependency = ProjectDependency( 

121 id=dep_id, 

122 source_project=source_project, 

123 target_project=target_project, 

124 dependency_type=dependency_type, 

125 description=description, 

126 metadata=metadata or {}, 

127 ) 

128 

129 # Store in database 

130 await asyncio.get_event_loop().run_in_executor( 

131 None, 

132 lambda: self.reflection_db.conn.execute( 

133 """ 

134 INSERT OR REPLACE INTO project_dependencies 

135 (id, source_project, target_project, dependency_type, description, created_at, metadata) 

136 VALUES (?, ?, ?, ?, ?, ?, ?) 

137 """, 

138 [ 

139 dependency.id, 

140 dependency.source_project, 

141 dependency.target_project, 

142 dependency.dependency_type, 

143 dependency.description, 

144 dependency.created_at, 

145 json.dumps(dependency.metadata), 

146 ], 

147 ), 

148 ) 

149 

150 self.reflection_db.conn.commit() 

151 

152 # Clear cache for affected projects 

153 self._clear_dependency_cache(source_project) 

154 self._clear_dependency_cache(target_project) 

155 

156 return dependency 

157 

158 async def link_sessions( 

159 self, 

160 source_session_id: str, 

161 target_session_id: str, 

162 link_type: str, 

163 context: str = "", 

164 metadata: dict[str, Any] | None = None, 

165 ) -> SessionLink: 

166 """Create a link between two sessions across projects.""" 

167 link_id = hashlib.md5( 

168 f"{source_session_id}_{target_session_id}_{link_type}".encode(), 

169 ).hexdigest() 

170 

171 link = SessionLink( 

172 id=link_id, 

173 source_session_id=source_session_id, 

174 target_session_id=target_session_id, 

175 link_type=link_type, 

176 context=context, 

177 metadata=metadata or {}, 

178 ) 

179 

180 # Store in database 

181 await asyncio.get_event_loop().run_in_executor( 

182 None, 

183 lambda: self.reflection_db.conn.execute( 

184 """ 

185 INSERT OR REPLACE INTO session_links 

186 (id, source_session_id, target_session_id, link_type, context, created_at, metadata) 

187 VALUES (?, ?, ?, ?, ?, ?, ?) 

188 """, 

189 [ 

190 link.id, 

191 link.source_session_id, 

192 link.target_session_id, 

193 link.link_type, 

194 link.context, 

195 link.created_at, 

196 json.dumps(link.metadata), 

197 ], 

198 ), 

199 ) 

200 

201 self.reflection_db.conn.commit() 

202 

203 # Clear cache for affected sessions 

204 self._clear_session_links_cache(source_session_id) 

205 self._clear_session_links_cache(target_session_id) 

206 

207 return link 

208 

209 async def get_project_groups( 

210 self, 

211 project: str | None = None, 

212 ) -> list[ProjectGroup]: 

213 """Get project groups, optionally filtered by project.""" 

214 sql = "SELECT id, name, description, projects, created_at, metadata FROM project_groups" 

215 params = [] 

216 

217 if project: 

218 sql += " WHERE list_contains(projects, ?)" 

219 params.append(project) 

220 

221 sql += " ORDER BY created_at DESC" 

222 

223 results = await asyncio.get_event_loop().run_in_executor( 

224 None, 

225 lambda: self.reflection_db.conn.execute(sql, params).fetchall(), 

226 ) 

227 

228 groups = [] 

229 for row in results: 

230 group = ProjectGroup( 

231 id=row[0], 

232 name=row[1], 

233 description=row[2], 

234 projects=row[3], 

235 metadata=json.loads(row[5]) if row[5] else {}, 

236 created_at=row[4], 

237 ) 

238 groups.append(group) 

239 self.active_project_groups[group.id] = group 

240 

241 return groups 

242 

243 async def get_project_dependencies( 

244 self, 

245 project: str, 

246 direction: str = "both", # "outbound", "inbound", "both" 

247 ) -> list[ProjectDependency]: 

248 """Get dependencies for a project.""" 

249 if project in self.dependency_cache: 

250 return self.dependency_cache[project] 

251 

252 conditions = [] 

253 params = [] 

254 

255 if direction == "outbound": 

256 conditions.append("source_project = ?") 

257 params.append(project) 

258 elif direction == "inbound": 

259 conditions.append("target_project = ?") 

260 params.append(project) 

261 else: # both 

262 conditions.append("(source_project = ? OR target_project = ?)") 

263 params.extend([project, project]) 

264 

265 sql = f""" 

266 SELECT id, source_project, target_project, dependency_type, description, created_at, metadata 

267 FROM project_dependencies 

268 WHERE {" OR ".join(conditions)} 

269 ORDER BY created_at DESC 

270 """ 

271 

272 results = await asyncio.get_event_loop().run_in_executor( 

273 None, 

274 lambda: self.reflection_db.conn.execute(sql, params).fetchall(), 

275 ) 

276 

277 dependencies = [] 

278 for row in results: 

279 dep = ProjectDependency( 

280 id=row[0], 

281 source_project=row[1], 

282 target_project=row[2], 

283 dependency_type=row[3], 

284 description=row[4], 

285 metadata=json.loads(row[6]) if row[6] else {}, 

286 created_at=row[5], 

287 ) 

288 dependencies.append(dep) 

289 

290 self.dependency_cache[project] = dependencies 

291 return dependencies 

292 

293 async def get_session_links(self, session_id: str) -> list[SessionLink]: 

294 """Get all links for a session.""" 

295 if session_id in self.session_links_cache: 

296 return self.session_links_cache[session_id] 

297 

298 sql = """ 

299 SELECT id, source_session_id, target_session_id, link_type, context, created_at, metadata 

300 FROM session_links 

301 WHERE source_session_id = ? OR target_session_id = ? 

302 ORDER BY created_at DESC 

303 """ 

304 

305 results = await asyncio.get_event_loop().run_in_executor( 

306 None, 

307 lambda: self.reflection_db.conn.execute( 

308 sql, 

309 [session_id, session_id], 

310 ).fetchall(), 

311 ) 

312 

313 links = [] 

314 for row in results: 

315 link = SessionLink( 

316 id=row[0], 

317 source_session_id=row[1], 

318 target_session_id=row[2], 

319 link_type=row[3], 

320 context=row[4], 

321 metadata=json.loads(row[6]) if row[6] else {}, 

322 created_at=row[5], 

323 ) 

324 links.append(link) 

325 

326 self.session_links_cache[session_id] = links 

327 return links 

328 

329 async def find_related_conversations( 

330 self, 

331 current_project: str, 

332 query: str, 

333 limit: int = 10, 

334 ) -> list[dict[str, Any]]: 

335 """Find conversations across related projects.""" 

336 # Get project dependencies to find related projects 

337 dependencies = await self.get_project_dependencies(current_project) 

338 related_projects = {current_project} 

339 

340 for dep in dependencies: 

341 if dep.source_project == current_project: 

342 related_projects.add(dep.target_project) 

343 if dep.target_project == current_project: 

344 related_projects.add(dep.source_project) 

345 

346 # Search conversations in all related projects 

347 results = [] 

348 

349 for project in related_projects: 

350 project_results = await self.reflection_db.search_conversations( 

351 query=query, 

352 limit=limit, 

353 project=project, 

354 ) 

355 

356 for result in project_results: 

357 result["source_project"] = project 

358 result["is_current_project"] = project == current_project 

359 results.append(result) 

360 

361 # Sort by relevance score and return top results 

362 results.sort(key=lambda x: x.get("score", 0), reverse=True) 

363 return results[:limit] 

364 

365 async def get_cross_project_insights( 

366 self, 

367 projects: list[str], 

368 time_range_days: int = 30, 

369 ) -> dict[str, Any]: 

370 """Get insights across multiple projects.""" 

371 since_date = datetime.now(UTC) - timedelta(days=time_range_days) 

372 insights = { 

373 "project_activity": {}, 

374 "common_patterns": [], 

375 "knowledge_gaps": [], 

376 "collaboration_opportunities": [], 

377 } 

378 

379 # Analyze activity per project 

380 for project in projects: 

381 sql = """ 

382 SELECT COUNT(*) as conversation_count, 

383 MAX(timestamp) as last_activity, 

384 AVG(LENGTH(content)) as avg_content_length 

385 FROM conversations 

386 WHERE project = ? AND timestamp >= ? 

387 """ 

388 

389 result = await asyncio.get_event_loop().run_in_executor( 

390 None, 

391 lambda: self.reflection_db.conn.execute( 

392 sql, 

393 [project, since_date], 

394 ).fetchone(), 

395 ) 

396 

397 if result: 

398 insights["project_activity"][project] = { 

399 "conversation_count": result[0], 

400 "last_activity": result[1], 

401 "avg_content_length": result[2], 

402 } 

403 

404 # Find common patterns across projects 

405 common_patterns = await self._find_common_patterns(projects, since_date) 

406 insights["common_patterns"] = common_patterns 

407 

408 return insights 

409 

410 async def _find_common_patterns( 

411 self, 

412 projects: list[str], 

413 since_date: datetime, 

414 ) -> list[dict[str, Any]]: 

415 """Find common patterns across projects.""" 

416 # Simple pattern detection based on common keywords 

417 patterns = [] 

418 

419 # Get frequent terms across all projects 

420 sql = """ 

421 SELECT project, content 

422 FROM conversations 

423 WHERE project = ANY(?) AND timestamp >= ? 

424 """ 

425 

426 results = await asyncio.get_event_loop().run_in_executor( 

427 None, 

428 lambda: self.reflection_db.conn.execute( 

429 sql, 

430 [projects, since_date], 

431 ).fetchall(), 

432 ) 

433 

434 # Simple keyword frequency analysis 

435 project_keywords = {} 

436 for project, content in results: 

437 if project not in project_keywords: 

438 project_keywords[project] = {} 

439 

440 # Extract simple keywords (could be enhanced with NLP) 

441 words = content.lower().split() 

442 for word in words: 

443 if len(word) > 4: # Skip short words 

444 project_keywords[project][word] = ( 

445 project_keywords[project].get(word, 0) + 1 

446 ) 

447 

448 # Find keywords common across multiple projects 

449 common_keywords = {} 

450 for project, keywords in project_keywords.items(): 

451 for word, count in keywords.items(): 

452 if word not in common_keywords: 

453 common_keywords[word] = [] 

454 common_keywords[word].append((project, count)) 

455 

456 # Filter to keywords present in multiple projects 

457 for word, project_counts in common_keywords.items(): 

458 if len(project_counts) >= 2: # Present in at least 2 projects 

459 patterns.append( 

460 { 

461 "pattern": word, 

462 "projects": [p[0] for p in project_counts], 

463 "frequency": sum(p[1] for p in project_counts), 

464 }, 

465 ) 

466 

467 # Sort by frequency 

468 patterns.sort(key=lambda x: x["frequency"], reverse=True) 

469 return patterns[:10] # Return top 10 patterns 

470 

471 def _clear_dependency_cache(self, project: str) -> None: 

472 """Clear dependency cache for a project.""" 

473 if project in self.dependency_cache: 

474 del self.dependency_cache[project] 

475 

476 def _clear_session_links_cache(self, session_id: str) -> None: 

477 """Clear session links cache for a session.""" 

478 if session_id in self.session_links_cache: 

479 del self.session_links_cache[session_id] 

480 

481 async def cleanup_old_links(self, max_age_days: int = 365): 

482 """Clean up old session links and dependencies.""" 

483 cutoff_date = datetime.now(UTC) - timedelta(days=max_age_days) 

484 

485 # Clean up old session links 

486 deleted_links = await asyncio.get_event_loop().run_in_executor( 

487 None, 

488 lambda: self.reflection_db.conn.execute( 

489 "DELETE FROM session_links WHERE created_at < ?", 

490 [cutoff_date], 

491 ).rowcount, 

492 ) 

493 

494 # Clean up old dependencies (optional - might want to keep these) 

495 # deleted_deps = await asyncio.get_event_loop().run_in_executor( 

496 # None, 

497 # lambda: self.reflection_db.conn.execute( 

498 # "DELETE FROM project_dependencies WHERE created_at < ?", 

499 # [cutoff_date] 

500 # ).rowcount 

501 # ) 

502 

503 self.reflection_db.conn.commit() 

504 

505 # Clear caches 

506 self.session_links_cache.clear() 

507 # self.dependency_cache.clear() 

508 

509 return { 

510 "deleted_session_links": deleted_links, 

511 # 'deleted_dependencies': deleted_deps 

512 }