Coverage for session_buddy / multi_project_coordinator.py: 86.18%
234 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1#!/usr/bin/env python3
2"""Multi-Project Session Coordination.
4Manages relationships and coordination between multiple projects and their sessions.
5"""
7import asyncio
8import hashlib
9import json
10import time
11from datetime import UTC, datetime, timedelta
12from typing import Any, Literal, Protocol
14from pydantic import BaseModel, Field, ValidationInfo, field_validator
17class ReflectionDatabaseProtocol(Protocol):
18 """Protocol for reflection database implementations."""
20 async def search_conversations(
21 self,
22 query: str,
23 limit: int = 10,
24 threshold: float = 0.7,
25 project: str | None = None,
26 min_score: float | None = None,
27 ) -> list[dict[str, Any]]: ...
29 @property
30 def conn(self) -> Any: ...
33class ProjectGroup(BaseModel):
34 """Represents a group of related projects."""
36 id: str = Field(description="Unique identifier for the project group")
37 name: str = Field(
38 min_length=1,
39 max_length=200,
40 description="Name of the project group",
41 )
42 description: str = Field(
43 default="",
44 max_length=1000,
45 description="Description of the project group",
46 )
47 projects: list[str] = Field(
48 min_length=1,
49 description="List of project identifiers in this group",
50 )
51 metadata: dict[str, Any] = Field(
52 default_factory=dict,
53 description="Additional metadata for the project group",
54 )
55 created_at: datetime = Field(
56 default_factory=lambda: datetime.now(UTC),
57 description="When the project group was created",
58 )
60 @field_validator("projects")
61 @classmethod
62 def validate_projects(cls, v: list[str]) -> list[str]:
63 """Ensure all project names are non-empty."""
64 if not v: 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true
65 msg = "Project group must contain at least one project"
66 raise ValueError(msg)
67 for project in v:
68 if not project.strip(): 68 ↛ 69line 68 didn't jump to line 69 because the condition on line 68 was never true
69 msg = "Project names cannot be empty"
70 raise ValueError(msg)
71 return [p.strip() for p in v]
74class ProjectDependency(BaseModel):
75 """Represents a dependency between two projects."""
77 id: str = Field(description="Unique identifier for the project dependency")
78 source_project: str = Field(
79 min_length=1,
80 description="The project that depends on another",
81 )
82 target_project: str = Field(
83 min_length=1,
84 description="The project that is depended upon",
85 )
86 dependency_type: Literal["uses", "extends", "references", "shares_code"] = Field(
87 description="Type of dependency relationship",
88 )
89 description: str = Field(
90 default="",
91 max_length=1000,
92 description="Description of the dependency relationship",
93 )
94 metadata: dict[str, Any] = Field(
95 default_factory=dict,
96 description="Additional metadata for the dependency",
97 )
98 created_at: datetime = Field(
99 default_factory=lambda: datetime.now(UTC),
100 description="When the dependency was created",
101 )
103 @field_validator("source_project", "target_project")
104 @classmethod
105 def validate_project_names(cls, v: str) -> str:
106 """Ensure project names are non-empty."""
107 if not v.strip(): 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true
108 msg = "Project names cannot be empty"
109 raise ValueError(msg)
110 return v.strip()
112 @field_validator("target_project")
113 @classmethod
114 def validate_not_self_dependency(cls, v: str, info: ValidationInfo) -> str:
115 """Ensure projects don't depend on themselves."""
116 if hasattr(info, "data") and info.data and v == info.data.get("source_project"): 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true
117 msg = "Project cannot depend on itself"
118 raise ValueError(msg)
119 return v
122class SessionLink(BaseModel):
123 """Represents a link between sessions across projects."""
125 id: str = Field(description="Unique identifier for the session link")
126 source_session_id: str = Field(
127 min_length=1,
128 description="The session that links to another",
129 )
130 target_session_id: str = Field(
131 min_length=1,
132 description="The session that is linked to",
133 )
134 link_type: Literal["related", "continuation", "reference", "dependency"] = Field(
135 description="Type of relationship between sessions",
136 )
137 context: str = Field(
138 default="",
139 max_length=2000,
140 description="Context or reason for the session link",
141 )
142 metadata: dict[str, Any] = Field(
143 default_factory=dict,
144 description="Additional metadata for the session link",
145 )
146 created_at: datetime = Field(
147 default_factory=lambda: datetime.now(UTC),
148 description="When the session link was created",
149 )
151 @field_validator("source_session_id", "target_session_id")
152 @classmethod
153 def validate_session_ids(cls, v: str) -> str:
154 """Ensure session IDs are non-empty."""
155 if not v.strip(): 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 msg = "Session IDs cannot be empty"
157 raise ValueError(msg)
158 return v.strip()
160 @field_validator("target_session_id")
161 @classmethod
162 def validate_not_self_link(cls, v: str, info: ValidationInfo) -> str:
163 """Ensure sessions don't link to themselves."""
164 if ( 164 ↛ 169line 164 didn't jump to line 169 because the condition on line 164 was never true
165 hasattr(info, "data")
166 and info.data
167 and v == info.data.get("source_session_id")
168 ):
169 msg = "Session cannot link to itself"
170 raise ValueError(msg)
171 return v
174class MultiProjectCoordinator:
175 """Coordinates sessions and knowledge across multiple projects."""
177 def __init__(self, reflection_db: ReflectionDatabaseProtocol) -> None:
178 """Initialize multi-project coordinator with database connection."""
179 self.reflection_db = reflection_db
180 self._initialize_caches()
182 def _initialize_caches(self) -> None:
183 """Initialize internal cache dictionaries."""
184 self.active_project_groups: dict[str, ProjectGroup] = {}
185 self.dependency_cache: dict[str, list[ProjectDependency]] = {}
186 self.session_links_cache: dict[str, list[SessionLink]] = {}
188 def _get_conn(self) -> Any:
189 """Get database connection, raising an error if not initialized."""
190 if self.reflection_db.conn is None: 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true
191 msg = "Database connection not initialized"
192 raise RuntimeError(msg)
193 return self.reflection_db.conn
195 async def create_project_group(
196 self,
197 name: str,
198 projects: list[str],
199 description: str = "",
200 metadata: dict[str, Any] | None = None,
201 ) -> ProjectGroup:
202 """Create a new project group."""
203 group_id = hashlib.md5(
204 f"{name}_{time.time()}".encode(),
205 usedforsecurity=False,
206 ).hexdigest()
208 group = ProjectGroup(
209 id=group_id,
210 name=name,
211 description=description,
212 projects=projects,
213 metadata=metadata or {},
214 )
216 # Store in database
217 await asyncio.get_event_loop().run_in_executor(
218 None,
219 lambda: self._get_conn().execute(
220 """
221 INSERT INTO project_groups (id, name, description, projects, created_at, metadata)
222 VALUES (?, ?, ?, ?, ?, ?)
223 """,
224 [
225 group.id,
226 group.name,
227 group.description,
228 group.projects,
229 group.created_at,
230 json.dumps(group.metadata),
231 ],
232 ),
233 )
235 self._get_conn().commit()
236 self.active_project_groups[group_id] = group
238 return group
240 async def add_project_dependency(
241 self,
242 source_project: str,
243 target_project: str,
244 dependency_type: Literal["uses", "extends", "references", "shares_code"],
245 description: str = "",
246 metadata: dict[str, Any] | None = None,
247 ) -> ProjectDependency:
248 """Add a dependency relationship between projects."""
249 dep_id = hashlib.md5(
250 f"{source_project}_{target_project}_{dependency_type}".encode(),
251 usedforsecurity=False,
252 ).hexdigest()
254 dependency = ProjectDependency(
255 id=dep_id,
256 source_project=source_project,
257 target_project=target_project,
258 dependency_type=dependency_type,
259 description=description,
260 metadata=metadata or {},
261 )
263 # Store in database
264 await asyncio.get_event_loop().run_in_executor(
265 None,
266 lambda: self._get_conn().execute(
267 """
268 INSERT INTO project_dependencies
269 (id, source_project, target_project, dependency_type, description, created_at, metadata)
270 VALUES (?, ?, ?, ?, ?, ?, ?)
271 ON CONFLICT (id) DO UPDATE SET
272 source_project = EXCLUDED.source_project,
273 target_project = EXCLUDED.target_project,
274 dependency_type = EXCLUDED.dependency_type,
275 description = EXCLUDED.description,
276 created_at = EXCLUDED.created_at,
277 metadata = EXCLUDED.metadata
278 """,
279 [
280 dependency.id,
281 dependency.source_project,
282 dependency.target_project,
283 dependency.dependency_type,
284 dependency.description,
285 dependency.created_at,
286 json.dumps(dependency.metadata),
287 ],
288 ),
289 )
291 self._get_conn().commit()
293 # Clear cache for affected projects
294 self._clear_dependency_cache(source_project)
295 self._clear_dependency_cache(target_project)
297 return dependency
299 async def link_sessions(
300 self,
301 source_session_id: str,
302 target_session_id: str,
303 link_type: Literal["related", "continuation", "reference", "dependency"],
304 context: str = "",
305 metadata: dict[str, Any] | None = None,
306 ) -> SessionLink:
307 """Create a link between two sessions across projects."""
308 link_id = hashlib.md5(
309 f"{source_session_id}_{target_session_id}_{link_type}".encode(),
310 usedforsecurity=False,
311 ).hexdigest()
313 link = SessionLink(
314 id=link_id,
315 source_session_id=source_session_id,
316 target_session_id=target_session_id,
317 link_type=link_type,
318 context=context,
319 metadata=metadata or {},
320 )
322 # Store in database
323 await asyncio.get_event_loop().run_in_executor(
324 None,
325 lambda: self._get_conn().execute(
326 """
327 INSERT INTO session_links
328 (id, source_session_id, target_session_id, link_type, context, created_at, metadata)
329 VALUES (?, ?, ?, ?, ?, ?, ?)
330 ON CONFLICT (id) DO UPDATE SET
331 source_session_id = EXCLUDED.source_session_id,
332 target_session_id = EXCLUDED.target_session_id,
333 link_type = EXCLUDED.link_type,
334 context = EXCLUDED.context,
335 created_at = EXCLUDED.created_at,
336 metadata = EXCLUDED.metadata
337 """,
338 [
339 link.id,
340 link.source_session_id,
341 link.target_session_id,
342 link.link_type,
343 link.context,
344 link.created_at,
345 json.dumps(link.metadata),
346 ],
347 ),
348 )
350 self._get_conn().commit()
352 # Clear cache for affected sessions
353 self._clear_session_links_cache(source_session_id)
354 self._clear_session_links_cache(target_session_id)
356 return link
358 async def get_project_groups(
359 self,
360 project: str | None = None,
361 ) -> list[ProjectGroup]:
362 """Get project groups, optionally filtered by project."""
363 sql = "SELECT id, name, description, projects, created_at, metadata FROM project_groups"
364 params = []
366 if project: 366 ↛ 367line 366 didn't jump to line 367 because the condition on line 366 was never true
367 sql += " WHERE list_contains(projects, ?)"
368 params.append(project)
370 sql += " ORDER BY created_at DESC"
372 results = await asyncio.get_event_loop().run_in_executor(
373 None,
374 lambda: self._get_conn().execute(sql, params).fetchall(),
375 )
377 groups = []
378 for row in results:
379 group = ProjectGroup(
380 id=row[0],
381 name=row[1],
382 description=row[2],
383 projects=row[3],
384 metadata=json.loads(row[5]) if row[5] else {},
385 created_at=row[4],
386 )
387 groups.append(group)
388 self.active_project_groups[group.id] = group
390 return groups
392 async def get_project_dependencies(
393 self,
394 project: str,
395 direction: str = "both", # "outbound", "inbound", "both"
396 ) -> list[ProjectDependency]:
397 """Get dependencies for a project."""
398 # Create a unique cache key that includes the direction
399 cache_key = f"{project}_{direction}"
400 if cache_key in self.dependency_cache: 400 ↛ 401line 400 didn't jump to line 401 because the condition on line 400 was never true
401 return self.dependency_cache[cache_key]
403 conditions = []
404 params = []
406 if direction == "outbound":
407 conditions.append("source_project = ?")
408 params.append(project)
409 elif direction == "inbound": 409 ↛ 410line 409 didn't jump to line 410 because the condition on line 409 was never true
410 conditions.append("target_project = ?")
411 params.append(project)
412 else: # both
413 conditions.append("(source_project = ? OR target_project = ?)")
414 params.extend([project, project])
416 # Build SQL safely - all user input is parameterized via params list
417 sql = (
418 "SELECT id, source_project, target_project, dependency_type, "
419 "description, created_at, metadata FROM project_dependencies WHERE "
420 + " OR ".join(conditions)
421 + " ORDER BY created_at DESC"
422 )
424 results = await asyncio.get_event_loop().run_in_executor(
425 None,
426 lambda: self._get_conn().execute(sql, params).fetchall(),
427 )
429 dependencies = []
430 for row in results:
431 dep = ProjectDependency(
432 id=row[0],
433 source_project=row[1],
434 target_project=row[2],
435 dependency_type=row[3],
436 description=row[4],
437 metadata=json.loads(row[6]) if row[6] else {},
438 created_at=row[5],
439 )
440 dependencies.append(dep)
442 # Cache with the direction-specific key
443 self.dependency_cache[cache_key] = dependencies
444 return dependencies
446 async def get_session_links(self, session_id: str) -> list[SessionLink]:
447 """Get all links for a session."""
448 if session_id in self.session_links_cache: 448 ↛ 449line 448 didn't jump to line 449 because the condition on line 448 was never true
449 return self.session_links_cache[session_id]
451 sql = """
452 SELECT id, source_session_id, target_session_id, link_type, context, created_at, metadata
453 FROM session_links
454 WHERE source_session_id = ? OR target_session_id = ?
455 ORDER BY created_at DESC
456 """
458 results = await asyncio.get_event_loop().run_in_executor(
459 None,
460 lambda: self._get_conn()
461 .execute(
462 sql,
463 [session_id, session_id],
464 )
465 .fetchall(),
466 )
468 links = []
469 for row in results:
470 link = SessionLink(
471 id=row[0],
472 source_session_id=row[1],
473 target_session_id=row[2],
474 link_type=row[3],
475 context=row[4],
476 metadata=json.loads(row[6]) if row[6] else {},
477 created_at=row[5],
478 )
479 links.append(link)
481 self.session_links_cache[session_id] = links
482 return links
484 async def find_related_conversations(
485 self,
486 current_project: str,
487 query: str,
488 limit: int = 10,
489 ) -> list[dict[str, Any]]:
490 """Find conversations across related projects."""
491 # Get project dependencies to find related projects
492 dependencies = await self.get_project_dependencies(current_project)
493 related_projects = {current_project}
495 for dep in dependencies:
496 if dep.source_project == current_project: 496 ↛ 498line 496 didn't jump to line 498 because the condition on line 496 was always true
497 related_projects.add(dep.target_project)
498 if dep.target_project == current_project: 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true
499 related_projects.add(dep.source_project)
501 # Search conversations in all related projects
502 results = []
504 for project in related_projects:
505 project_results = await self.reflection_db.search_conversations(
506 query=query,
507 limit=limit,
508 project=project,
509 )
511 for result in project_results:
512 result["source_project"] = project
513 result["is_current_project"] = project == current_project
514 results.append(result)
516 # Sort by relevance score and return top results
517 results.sort(key=lambda x: x.get("score", 0), reverse=True)
518 return results[:limit]
520 async def get_cross_project_insights(
521 self,
522 projects: list[str],
523 time_range_days: int = 30,
524 ) -> dict[str, Any]:
525 """Get insights across multiple projects."""
526 since_date = datetime.now(UTC) - timedelta(days=time_range_days)
528 insights = self._initialize_insights_structure()
529 insights["project_activity"] = await self._analyze_project_activity(
530 projects,
531 since_date,
532 )
533 insights["common_patterns"] = await self._find_common_patterns(
534 projects,
535 since_date,
536 )
538 return insights
540 def _initialize_insights_structure(self) -> dict[str, Any]:
541 """Initialize the insights data structure."""
542 return {
543 "project_activity": {},
544 "common_patterns": [],
545 "knowledge_gaps": [],
546 "collaboration_opportunities": [],
547 }
549 async def _analyze_project_activity(
550 self,
551 projects: list[str],
552 since_date: datetime,
553 ) -> dict[str, Any]:
554 """Analyze activity across projects."""
555 activity_data = {}
557 for project in projects:
558 project_stats = await self._get_project_stats(project, since_date)
559 if project_stats: 559 ↛ 557line 559 didn't jump to line 557 because the condition on line 559 was always true
560 activity_data[project] = project_stats
562 return activity_data
564 async def _get_project_stats(
565 self,
566 project: str,
567 since_date: datetime,
568 ) -> dict[str, Any] | None:
569 """Get statistics for a single project."""
570 sql = """
571 SELECT COUNT(*) as conversation_count,
572 MAX(timestamp) as last_activity,
573 AVG(LENGTH(content)) as avg_content_length
574 FROM conversations
575 WHERE project = ? AND timestamp >= ?
576 """
578 result = await asyncio.get_event_loop().run_in_executor(
579 None,
580 lambda: self._get_conn().execute(sql, [project, since_date]).fetchone(),
581 )
583 if result: 583 ↛ 589line 583 didn't jump to line 589 because the condition on line 583 was always true
584 return {
585 "conversation_count": result[0],
586 "last_activity": result[1],
587 "avg_content_length": result[2],
588 }
589 return None
591 async def _find_common_patterns(
592 self,
593 projects: list[str],
594 since_date: datetime,
595 ) -> list[dict[str, Any]]:
596 """Find common patterns across projects."""
597 conversation_data = await self._get_conversation_data(projects, since_date)
598 project_keywords = self._extract_project_keywords(conversation_data)
599 return self._identify_common_patterns(project_keywords)
601 async def _get_conversation_data(
602 self,
603 projects: list[str],
604 since_date: datetime,
605 ) -> list[tuple[str, str]]:
606 """Get conversation data for pattern analysis."""
607 sql = """
608 SELECT project, content
609 FROM conversations
610 WHERE project = ANY(?) AND timestamp >= ?
611 """
613 return await asyncio.get_event_loop().run_in_executor(
614 None,
615 lambda: self._get_conn().execute(sql, [projects, since_date]).fetchall(),
616 )
618 def _extract_project_keywords(
619 self,
620 conversation_data: list[tuple[str, str]],
621 ) -> dict[str, dict[str, int]]:
622 """Extract keywords from project conversations."""
623 project_keywords: dict[str, dict[str, int]] = {}
625 for project, content in conversation_data:
626 if project not in project_keywords: 626 ↛ 629line 626 didn't jump to line 629 because the condition on line 626 was always true
627 project_keywords[project] = {}
629 words = content.lower().split()
630 for word in words:
631 if len(word) > 4: # Skip short words
632 project_keywords[project][word] = (
633 project_keywords[project].get(word, 0) + 1
634 )
636 return project_keywords
638 def _identify_common_patterns(
639 self,
640 project_keywords: dict[str, dict[str, int]],
641 ) -> list[dict[str, Any]]:
642 """Identify patterns common across multiple projects."""
643 common_keywords: dict[str, list[tuple[str, int]]] = {}
644 patterns = []
646 # Group keywords by occurrence across projects
647 for project, keywords in project_keywords.items():
648 for word, count in keywords.items():
649 if word not in common_keywords:
650 common_keywords[word] = []
651 common_keywords[word].append((project, count))
653 # Filter to keywords present in multiple projects
654 for word, project_counts in common_keywords.items():
655 if len(project_counts) >= 2: # Present in at least 2 projects
656 patterns.append(
657 {
658 "pattern": word,
659 "projects": [p[0] for p in project_counts],
660 "frequency": sum(p[1] for p in project_counts),
661 },
662 )
664 # Sort by frequency
665 patterns.sort(key=lambda x: int(x["frequency"]), reverse=True) # type: ignore[arg-type,call-overload]
666 return patterns[:10] # Return top 10 patterns
668 def _clear_dependency_cache(self, project: str) -> None:
669 """Clear dependency cache for a project."""
670 # Remove all cache entries for this project (regardless of direction)
671 keys_to_remove = [
672 key for key in self.dependency_cache if key.startswith(f"{project}_")
673 ]
674 for key in keys_to_remove: 674 ↛ 675line 674 didn't jump to line 675 because the loop on line 674 never started
675 del self.dependency_cache[key]
677 def _clear_session_links_cache(self, session_id: str) -> None:
678 """Clear session links cache for a session."""
679 if session_id in self.session_links_cache: 679 ↛ 680line 679 didn't jump to line 680 because the condition on line 679 was never true
680 del self.session_links_cache[session_id]
682 async def cleanup_old_links(self, max_age_days: int = 365) -> dict[str, Any]:
683 """Clean up old session links and dependencies."""
684 cutoff_date = datetime.now(UTC) - timedelta(days=max_age_days)
686 # Count old links before deletion
687 count_before = await asyncio.get_event_loop().run_in_executor(
688 None,
689 lambda: self._get_conn()
690 .execute(
691 "SELECT COUNT(*) FROM session_links WHERE created_at < ?",
692 [cutoff_date],
693 )
694 .fetchone()[0],
695 )
697 # Clean up old session links
698 await asyncio.get_event_loop().run_in_executor(
699 None,
700 lambda: self._get_conn().execute(
701 "DELETE FROM session_links WHERE created_at < ?",
702 [cutoff_date],
703 ),
704 )
706 self._get_conn().commit()
708 # Clear caches
709 self.session_links_cache.clear()
711 return {
712 "deleted_session_links": count_before,
713 }