Coverage for session_mgmt_mcp/multi_project_coordinator.py: 21.43%
172 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 05:22 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 05:22 -0700
1#!/usr/bin/env python3
2"""Multi-Project Session Coordination.
4Manages relationships and coordination between multiple projects and their sessions.
5"""
7import asyncio
8import hashlib
9import json
10import time
11from dataclasses import dataclass, field
12from datetime import UTC, datetime, timedelta
13from typing import Any
15from .reflection_tools import ReflectionDatabase
18@dataclass
19class ProjectGroup:
20 """Represents a group of related projects."""
22 id: str
23 name: str
24 description: str
25 projects: list[str]
26 metadata: dict[str, Any] = field(default_factory=dict)
27 created_at: datetime = field(default_factory=lambda: datetime.now(UTC))
30@dataclass
31class ProjectDependency:
32 """Represents a dependency between two projects."""
34 id: str
35 source_project: str
36 target_project: str
37 dependency_type: str # 'uses', 'extends', 'references', 'shares_code'
38 description: str
39 metadata: dict[str, Any] = field(default_factory=dict)
40 created_at: datetime = field(default_factory=lambda: datetime.now(UTC))
43@dataclass
44class SessionLink:
45 """Represents a link between sessions across projects."""
47 id: str
48 source_session_id: str
49 target_session_id: str
50 link_type: str # 'related', 'continuation', 'reference', 'dependency'
51 context: str
52 metadata: dict[str, Any] = field(default_factory=dict)
53 created_at: datetime = field(default_factory=lambda: datetime.now(UTC))
56class MultiProjectCoordinator:
57 """Coordinates sessions and knowledge across multiple projects."""
59 def __init__(self, reflection_db: ReflectionDatabase) -> None:
60 self.reflection_db = reflection_db
61 self.active_project_groups: dict[str, ProjectGroup] = {}
62 self.dependency_cache: dict[str, list[ProjectDependency]] = {}
63 self.session_links_cache: dict[str, list[SessionLink]] = {}
65 async def create_project_group(
66 self,
67 name: str,
68 projects: list[str],
69 description: str = "",
70 metadata: dict[str, Any] | None = None,
71 ) -> ProjectGroup:
72 """Create a new project group."""
73 group_id = hashlib.md5(f"{name}_{time.time()}".encode()).hexdigest()
75 group = ProjectGroup(
76 id=group_id,
77 name=name,
78 description=description,
79 projects=projects,
80 metadata=metadata or {},
81 )
83 # Store in database
84 await asyncio.get_event_loop().run_in_executor(
85 None,
86 lambda: self.reflection_db.conn.execute(
87 """
88 INSERT INTO project_groups (id, name, description, projects, created_at, metadata)
89 VALUES (?, ?, ?, ?, ?, ?)
90 """,
91 [
92 group.id,
93 group.name,
94 group.description,
95 group.projects,
96 group.created_at,
97 json.dumps(group.metadata),
98 ],
99 ),
100 )
102 self.reflection_db.conn.commit()
103 self.active_project_groups[group_id] = group
105 return group
107 async def add_project_dependency(
108 self,
109 source_project: str,
110 target_project: str,
111 dependency_type: str,
112 description: str = "",
113 metadata: dict[str, Any] | None = None,
114 ) -> ProjectDependency:
115 """Add a dependency relationship between projects."""
116 dep_id = hashlib.md5(
117 f"{source_project}_{target_project}_{dependency_type}".encode(),
118 ).hexdigest()
120 dependency = ProjectDependency(
121 id=dep_id,
122 source_project=source_project,
123 target_project=target_project,
124 dependency_type=dependency_type,
125 description=description,
126 metadata=metadata or {},
127 )
129 # Store in database
130 await asyncio.get_event_loop().run_in_executor(
131 None,
132 lambda: self.reflection_db.conn.execute(
133 """
134 INSERT OR REPLACE INTO project_dependencies
135 (id, source_project, target_project, dependency_type, description, created_at, metadata)
136 VALUES (?, ?, ?, ?, ?, ?, ?)
137 """,
138 [
139 dependency.id,
140 dependency.source_project,
141 dependency.target_project,
142 dependency.dependency_type,
143 dependency.description,
144 dependency.created_at,
145 json.dumps(dependency.metadata),
146 ],
147 ),
148 )
150 self.reflection_db.conn.commit()
152 # Clear cache for affected projects
153 self._clear_dependency_cache(source_project)
154 self._clear_dependency_cache(target_project)
156 return dependency
158 async def link_sessions(
159 self,
160 source_session_id: str,
161 target_session_id: str,
162 link_type: str,
163 context: str = "",
164 metadata: dict[str, Any] | None = None,
165 ) -> SessionLink:
166 """Create a link between two sessions across projects."""
167 link_id = hashlib.md5(
168 f"{source_session_id}_{target_session_id}_{link_type}".encode(),
169 ).hexdigest()
171 link = SessionLink(
172 id=link_id,
173 source_session_id=source_session_id,
174 target_session_id=target_session_id,
175 link_type=link_type,
176 context=context,
177 metadata=metadata or {},
178 )
180 # Store in database
181 await asyncio.get_event_loop().run_in_executor(
182 None,
183 lambda: self.reflection_db.conn.execute(
184 """
185 INSERT OR REPLACE INTO session_links
186 (id, source_session_id, target_session_id, link_type, context, created_at, metadata)
187 VALUES (?, ?, ?, ?, ?, ?, ?)
188 """,
189 [
190 link.id,
191 link.source_session_id,
192 link.target_session_id,
193 link.link_type,
194 link.context,
195 link.created_at,
196 json.dumps(link.metadata),
197 ],
198 ),
199 )
201 self.reflection_db.conn.commit()
203 # Clear cache for affected sessions
204 self._clear_session_links_cache(source_session_id)
205 self._clear_session_links_cache(target_session_id)
207 return link
209 async def get_project_groups(
210 self,
211 project: str | None = None,
212 ) -> list[ProjectGroup]:
213 """Get project groups, optionally filtered by project."""
214 sql = "SELECT id, name, description, projects, created_at, metadata FROM project_groups"
215 params = []
217 if project:
218 sql += " WHERE list_contains(projects, ?)"
219 params.append(project)
221 sql += " ORDER BY created_at DESC"
223 results = await asyncio.get_event_loop().run_in_executor(
224 None,
225 lambda: self.reflection_db.conn.execute(sql, params).fetchall(),
226 )
228 groups = []
229 for row in results:
230 group = ProjectGroup(
231 id=row[0],
232 name=row[1],
233 description=row[2],
234 projects=row[3],
235 metadata=json.loads(row[5]) if row[5] else {},
236 created_at=row[4],
237 )
238 groups.append(group)
239 self.active_project_groups[group.id] = group
241 return groups
243 async def get_project_dependencies(
244 self,
245 project: str,
246 direction: str = "both", # "outbound", "inbound", "both"
247 ) -> list[ProjectDependency]:
248 """Get dependencies for a project."""
249 if project in self.dependency_cache:
250 return self.dependency_cache[project]
252 conditions = []
253 params = []
255 if direction == "outbound":
256 conditions.append("source_project = ?")
257 params.append(project)
258 elif direction == "inbound":
259 conditions.append("target_project = ?")
260 params.append(project)
261 else: # both
262 conditions.append("(source_project = ? OR target_project = ?)")
263 params.extend([project, project])
265 sql = f"""
266 SELECT id, source_project, target_project, dependency_type, description, created_at, metadata
267 FROM project_dependencies
268 WHERE {" OR ".join(conditions)}
269 ORDER BY created_at DESC
270 """
272 results = await asyncio.get_event_loop().run_in_executor(
273 None,
274 lambda: self.reflection_db.conn.execute(sql, params).fetchall(),
275 )
277 dependencies = []
278 for row in results:
279 dep = ProjectDependency(
280 id=row[0],
281 source_project=row[1],
282 target_project=row[2],
283 dependency_type=row[3],
284 description=row[4],
285 metadata=json.loads(row[6]) if row[6] else {},
286 created_at=row[5],
287 )
288 dependencies.append(dep)
290 self.dependency_cache[project] = dependencies
291 return dependencies
293 async def get_session_links(self, session_id: str) -> list[SessionLink]:
294 """Get all links for a session."""
295 if session_id in self.session_links_cache:
296 return self.session_links_cache[session_id]
298 sql = """
299 SELECT id, source_session_id, target_session_id, link_type, context, created_at, metadata
300 FROM session_links
301 WHERE source_session_id = ? OR target_session_id = ?
302 ORDER BY created_at DESC
303 """
305 results = await asyncio.get_event_loop().run_in_executor(
306 None,
307 lambda: self.reflection_db.conn.execute(
308 sql,
309 [session_id, session_id],
310 ).fetchall(),
311 )
313 links = []
314 for row in results:
315 link = SessionLink(
316 id=row[0],
317 source_session_id=row[1],
318 target_session_id=row[2],
319 link_type=row[3],
320 context=row[4],
321 metadata=json.loads(row[6]) if row[6] else {},
322 created_at=row[5],
323 )
324 links.append(link)
326 self.session_links_cache[session_id] = links
327 return links
329 async def find_related_conversations(
330 self,
331 current_project: str,
332 query: str,
333 limit: int = 10,
334 ) -> list[dict[str, Any]]:
335 """Find conversations across related projects."""
336 # Get project dependencies to find related projects
337 dependencies = await self.get_project_dependencies(current_project)
338 related_projects = {current_project}
340 for dep in dependencies:
341 if dep.source_project == current_project:
342 related_projects.add(dep.target_project)
343 if dep.target_project == current_project:
344 related_projects.add(dep.source_project)
346 # Search conversations in all related projects
347 results = []
349 for project in related_projects:
350 project_results = await self.reflection_db.search_conversations(
351 query=query,
352 limit=limit,
353 project=project,
354 )
356 for result in project_results:
357 result["source_project"] = project
358 result["is_current_project"] = project == current_project
359 results.append(result)
361 # Sort by relevance score and return top results
362 results.sort(key=lambda x: x.get("score", 0), reverse=True)
363 return results[:limit]
365 async def get_cross_project_insights(
366 self,
367 projects: list[str],
368 time_range_days: int = 30,
369 ) -> dict[str, Any]:
370 """Get insights across multiple projects."""
371 since_date = datetime.now(UTC) - timedelta(days=time_range_days)
372 insights = {
373 "project_activity": {},
374 "common_patterns": [],
375 "knowledge_gaps": [],
376 "collaboration_opportunities": [],
377 }
379 # Analyze activity per project
380 for project in projects:
381 sql = """
382 SELECT COUNT(*) as conversation_count,
383 MAX(timestamp) as last_activity,
384 AVG(LENGTH(content)) as avg_content_length
385 FROM conversations
386 WHERE project = ? AND timestamp >= ?
387 """
389 result = await asyncio.get_event_loop().run_in_executor(
390 None,
391 lambda: self.reflection_db.conn.execute(
392 sql,
393 [project, since_date],
394 ).fetchone(),
395 )
397 if result:
398 insights["project_activity"][project] = {
399 "conversation_count": result[0],
400 "last_activity": result[1],
401 "avg_content_length": result[2],
402 }
404 # Find common patterns across projects
405 common_patterns = await self._find_common_patterns(projects, since_date)
406 insights["common_patterns"] = common_patterns
408 return insights
410 async def _find_common_patterns(
411 self,
412 projects: list[str],
413 since_date: datetime,
414 ) -> list[dict[str, Any]]:
415 """Find common patterns across projects."""
416 # Simple pattern detection based on common keywords
417 patterns = []
419 # Get frequent terms across all projects
420 sql = """
421 SELECT project, content
422 FROM conversations
423 WHERE project = ANY(?) AND timestamp >= ?
424 """
426 results = await asyncio.get_event_loop().run_in_executor(
427 None,
428 lambda: self.reflection_db.conn.execute(
429 sql,
430 [projects, since_date],
431 ).fetchall(),
432 )
434 # Simple keyword frequency analysis
435 project_keywords = {}
436 for project, content in results:
437 if project not in project_keywords:
438 project_keywords[project] = {}
440 # Extract simple keywords (could be enhanced with NLP)
441 words = content.lower().split()
442 for word in words:
443 if len(word) > 4: # Skip short words
444 project_keywords[project][word] = (
445 project_keywords[project].get(word, 0) + 1
446 )
448 # Find keywords common across multiple projects
449 common_keywords = {}
450 for project, keywords in project_keywords.items():
451 for word, count in keywords.items():
452 if word not in common_keywords:
453 common_keywords[word] = []
454 common_keywords[word].append((project, count))
456 # Filter to keywords present in multiple projects
457 for word, project_counts in common_keywords.items():
458 if len(project_counts) >= 2: # Present in at least 2 projects
459 patterns.append(
460 {
461 "pattern": word,
462 "projects": [p[0] for p in project_counts],
463 "frequency": sum(p[1] for p in project_counts),
464 },
465 )
467 # Sort by frequency
468 patterns.sort(key=lambda x: x["frequency"], reverse=True)
469 return patterns[:10] # Return top 10 patterns
471 def _clear_dependency_cache(self, project: str) -> None:
472 """Clear dependency cache for a project."""
473 if project in self.dependency_cache:
474 del self.dependency_cache[project]
476 def _clear_session_links_cache(self, session_id: str) -> None:
477 """Clear session links cache for a session."""
478 if session_id in self.session_links_cache:
479 del self.session_links_cache[session_id]
481 async def cleanup_old_links(self, max_age_days: int = 365):
482 """Clean up old session links and dependencies."""
483 cutoff_date = datetime.now(UTC) - timedelta(days=max_age_days)
485 # Clean up old session links
486 deleted_links = await asyncio.get_event_loop().run_in_executor(
487 None,
488 lambda: self.reflection_db.conn.execute(
489 "DELETE FROM session_links WHERE created_at < ?",
490 [cutoff_date],
491 ).rowcount,
492 )
494 # Clean up old dependencies (optional - might want to keep these)
495 # deleted_deps = await asyncio.get_event_loop().run_in_executor(
496 # None,
497 # lambda: self.reflection_db.conn.execute(
498 # "DELETE FROM project_dependencies WHERE created_at < ?",
499 # [cutoff_date]
500 # ).rowcount
501 # )
503 self.reflection_db.conn.commit()
505 # Clear caches
506 self.session_links_cache.clear()
507 # self.dependency_cache.clear()
509 return {
510 "deleted_session_links": deleted_links,
511 # 'deleted_dependencies': deleted_deps
512 }