Coverage for session_buddy / advanced_features.py: 52.30%
359 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1"""Advanced Feature Hub for MCP Tools.
3This module provides advanced MCP tools for multi-project coordination,
4git worktree management, natural language scheduling, and enhanced search.
6Extracted from server.py Phase 2.4 - Contains 17 MCP tool implementations:
7- Natural language reminder tools (5 MCP tools)
8- Interruption management tools (1 MCP tool)
9- Multi-project coordination (4 MCP tools)
10- Advanced search capabilities (3 MCP tools)
11- Git worktree management (3 MCP tools)
12- Session welcome tool (1 MCP tool)
13"""
15from __future__ import annotations
17import typing as t
18from pathlib import Path
20if t.TYPE_CHECKING:
21 from session_buddy.utils.logging import SessionLogger
24class AdvancedFeaturesHub:
25 """Coordinator for advanced MCP feature tools.
27 Provides lazy initialization and coordination for optional
28 advanced features like multi-project support, worktrees, etc.
29 """
31 def __init__(self, logger: SessionLogger) -> None:
32 """Initialize advanced features hub.
34 Args:
35 logger: Session logger for feature events
37 """
38 self.logger = logger
39 self._multi_project_initialized = False
40 self._advanced_search_initialized = False
41 self._app_monitor_initialized = False
43 async def initialize_multi_project(self) -> bool:
44 """Initialize multi-project coordination features.
46 Returns:
47 True if initialized successfully
49 """
50 msg = "initialize_multi_project not yet implemented"
51 raise NotImplementedError(msg)
53 async def initialize_advanced_search(self) -> bool:
54 """Initialize advanced search capabilities.
56 Returns:
57 True if initialized successfully
59 """
60 msg = "initialize_advanced_search not yet implemented"
61 raise NotImplementedError(msg)
63 async def initialize_app_monitor(self) -> bool:
64 """Initialize application monitoring.
66 Returns:
67 True if initialized successfully
69 """
70 msg = "initialize_app_monitor not yet implemented"
71 raise NotImplementedError(msg)
74# ================================
75# Natural Language Scheduling Tools
76# ================================
79async def create_natural_reminder(
80 title: str,
81 time_expression: str,
82 description: str = "",
83 user_id: str = "default",
84 project_id: str | None = None,
85 notification_method: str = "session",
86) -> str:
87 """Create reminder from natural language time expression."""
88 try:
89 from .natural_scheduler import (
90 create_natural_reminder as _create_natural_reminder,
91 )
93 reminder_id = await _create_natural_reminder(
94 title,
95 time_expression,
96 description,
97 user_id,
98 project_id,
99 notification_method,
100 )
102 if reminder_id: 102 ↛ 123line 102 didn't jump to line 123 because the condition on line 102 was always true
103 output = []
104 output.extend(
105 (
106 "⏰ Natural reminder created successfully!",
107 f"🆔 Reminder ID: {reminder_id}",
108 f"📝 Title: {title}",
109 f"📄 Description: {description}",
110 f"🕐 When: {time_expression}",
111 f"👤 User: {user_id}",
112 )
113 )
114 if project_id: 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true
115 output.append(f"📁 Project: {project_id}")
116 output.extend(
117 (
118 f"📢 Notification: {notification_method}",
119 "🎯 Reminder will trigger automatically at the scheduled time",
120 )
121 )
122 return "\n".join(output)
123 return f"❌ Failed to parse time expression: '{time_expression}'\n💡 Try formats like 'in 30 minutes', 'tomorrow at 9am', 'every day at 5pm'"
125 except ImportError:
126 return "❌ Natural scheduling tools not available. Install: pip install python-dateutil schedule python-crontab"
127 except Exception as e:
128 return f"❌ Error creating reminder: {e}"
131async def list_user_reminders(
132 user_id: str = "default",
133 project_id: str | None = None,
134) -> str:
135 """List pending reminders for user/project."""
136 try:
137 from .natural_scheduler import list_user_reminders as _list_user_reminders
139 # Import formatting functions
140 from .utils.server_helpers import (
141 _format_no_reminders_message,
142 _format_reminders_list,
143 )
145 reminders = await _list_user_reminders(user_id, project_id)
147 if not reminders: 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true
148 output = _format_no_reminders_message(user_id, project_id)
149 return "\n".join(output)
151 output = _format_reminders_list(reminders, user_id, project_id)
152 return "\n".join(output)
154 except ImportError:
155 return "❌ Natural scheduling tools not available"
156 except Exception as e:
157 return f"❌ Error listing reminders: {e}"
160async def cancel_user_reminder(reminder_id: str) -> str:
161 """Cancel a specific reminder."""
162 try:
163 from .natural_scheduler import cancel_user_reminder as _cancel_user_reminder
165 success = await _cancel_user_reminder(reminder_id)
167 if success: 167 ↛ 178line 167 didn't jump to line 178 because the condition on line 167 was always true
168 output = []
169 output.extend(
170 (
171 "❌ Reminder cancelled successfully!",
172 f"🆔 Reminder ID: {reminder_id}",
173 "🚫 The reminder will no longer trigger",
174 "💡 You can create a new reminder if needed",
175 )
176 )
177 return "\n".join(output)
178 return f"❌ Failed to cancel reminder {reminder_id}. Check that the ID is correct and the reminder exists"
180 except ImportError:
181 return "❌ Natural scheduling tools not available"
182 except Exception as e:
183 return f"❌ Error cancelling reminder: {e}"
186def _calculate_overdue_time(scheduled_for: str) -> str:
187 """Calculate and format overdue time."""
188 try:
189 from datetime import datetime
191 scheduled = datetime.fromisoformat(scheduled_for)
192 now = datetime.now()
193 overdue = now - scheduled
195 if overdue.total_seconds() > 0: 195 ↛ 201line 195 didn't jump to line 201 because the condition on line 195 was always true
196 hours = int(overdue.total_seconds() // 3600)
197 minutes = int((overdue.total_seconds() % 3600) // 60)
198 if hours > 0: 198 ↛ 200line 198 didn't jump to line 200 because the condition on line 198 was always true
199 return f"⏱️ Overdue: {hours}h {minutes}m"
200 return f"⏱️ Overdue: {minutes}m"
201 return "⏱️ Not yet due"
202 except Exception as e:
203 return f"❌ Error checking due reminders: {e}"
206async def start_reminder_service() -> str:
207 """Start the background reminder service."""
208 try:
209 from .natural_scheduler import (
210 register_session_notifications,
211 )
212 from .natural_scheduler import (
213 start_reminder_service as _start_reminder_service,
214 )
216 # Register default session notifications
217 register_session_notifications()
219 # Start the service
220 _start_reminder_service()
222 output = []
223 output.extend(
224 (
225 "🚀 Natural reminder service started!",
226 "⏰ Background scheduler is now active",
227 "🔍 Checking for due reminders every minute",
228 "📢 Session notifications are registered",
229 "💡 Reminders will automatically trigger at their scheduled times",
230 "🛑 Use 'stop_reminder_service' to stop the background service",
231 )
232 )
234 return "\n".join(output)
236 except ImportError:
237 return "❌ Natural scheduling tools not available"
238 except Exception as e:
239 return f"❌ Error starting reminder service: {e}"
242async def stop_reminder_service() -> str:
243 """Stop the background reminder service."""
244 try:
245 from .natural_scheduler import stop_reminder_service as _stop_reminder_service
247 _stop_reminder_service()
249 output = []
250 output.extend(
251 (
252 "🛑 Natural reminder service stopped",
253 "❌ Background scheduler is no longer active",
254 "⚠️ Existing reminders will not trigger automatically",
255 "🚀 Use 'start_reminder_service' to restart the service",
256 "💡 You can still check due reminders manually with 'check_due_reminders'",
257 )
258 )
260 return "\n".join(output)
262 except ImportError:
263 return "❌ Natural scheduling tools not available"
264 except Exception as e:
265 return f"❌ Error stopping reminder service: {e}"
268# ================================
269# Interruption Management Tools
270# ================================
273async def get_interruption_statistics(user_id: str) -> str:
274 """Get comprehensive interruption and context preservation statistics."""
275 try:
276 from .interruption_manager import (
277 get_interruption_statistics as _get_interruption_statistics,
278 )
280 # Import formatting functions
281 from .utils import (
282 _format_efficiency_metrics,
283 _format_no_data_message,
284 _format_statistics_header,
285 )
286 from .utils.server_helpers import (
287 _format_interruption_statistics,
288 _format_snapshot_statistics,
289 )
291 stats = await _get_interruption_statistics(user_id)
292 output = _format_statistics_header(user_id)
294 # Get statistics sections
295 sessions = stats.get("sessions", {})
296 interruptions = stats.get("interruptions", {})
297 snapshots = stats.get("snapshots", {})
298 by_type = interruptions.get("by_type", [])
300 # Format all sections
301 output.extend(_format_session_statistics(sessions))
302 output.extend(_format_interruption_statistics(interruptions))
303 output.extend(_format_snapshot_statistics(snapshots))
304 output.extend(_format_efficiency_metrics(sessions, interruptions, by_type))
306 # Check if we have any data
307 if not _has_statistics_data(sessions, interruptions, snapshots):
308 output = _format_no_data_message(user_id)
310 return "\n".join(output)
312 except ImportError:
313 return "❌ Interruption management tools not available"
314 except Exception as e:
315 return f"❌ Error getting statistics: {e}"
318def _format_session_statistics(sessions: dict[str, t.Any]) -> list[str]:
319 """Format session statistics section."""
320 output = []
321 if sessions: 321 ↛ 329line 321 didn't jump to line 329 because the condition on line 321 was always true
322 output.append("\n📊 Session Statistics:")
323 if "total_sessions" in sessions: 323 ↛ 325line 323 didn't jump to line 325 because the condition on line 323 was always true
324 output.append(f" • Total sessions: {sessions['total_sessions']}")
325 if "active_sessions" in sessions: 325 ↛ 327line 325 didn't jump to line 327 because the condition on line 325 was always true
326 output.append(f" • Active sessions: {sessions['active_sessions']}")
327 if "avg_duration" in sessions: 327 ↛ 329line 327 didn't jump to line 329 because the condition on line 327 was always true
328 output.append(f" • Average duration: {sessions['avg_duration']}")
329 return output
332def _has_statistics_data(
333 sessions: t.Any,
334 interruptions: t.Any,
335 snapshots: t.Any,
336) -> bool:
337 """Check if we have any statistics data to display."""
338 return bool(sessions or interruptions or snapshots)
341# ================================
342# Multi-Project Coordination Tools
343# ================================
346async def create_project_group(
347 name: str,
348 projects: list[str],
349 description: str = "",
350) -> str:
351 """Create a new project group for multi-project coordination."""
352 # Lazy initialization
354 multi_project_coordinator = await _get_multi_project_coordinator()
355 if not multi_project_coordinator: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true
356 return "❌ Multi-project coordination not available"
358 try:
359 group = await multi_project_coordinator.create_project_group(
360 name=name,
361 projects=projects,
362 description=description,
363 )
365 return f"""✅ **Project Group Created**
367**Group:** {group.name}
368**Projects:** {", ".join(group.projects)}
369**Description:** {group.description or "None"}
370**ID:** {group.id}
372The project group is now available for cross-project coordination and knowledge sharing."""
374 except Exception as e:
375 return f"❌ Failed to create project group: {e}"
378async def add_project_dependency(
379 source_project: str,
380 target_project: str,
381 dependency_type: t.Literal["uses", "extends", "references", "shares_code"],
382 description: str = "",
383) -> str:
384 """Add a dependency relationship between projects."""
385 multi_project_coordinator = await _get_multi_project_coordinator()
386 if not multi_project_coordinator: 386 ↛ 387line 386 didn't jump to line 387 because the condition on line 386 was never true
387 return "❌ Multi-project coordination not available"
389 try:
390 dependency = await multi_project_coordinator.add_project_dependency(
391 source_project=source_project,
392 target_project=target_project,
393 dependency_type=dependency_type,
394 description=description,
395 )
397 return f"""✅ **Project Dependency Added**
399**Source:** {dependency.source_project}
400**Target:** {dependency.target_project}
401**Type:** {dependency.dependency_type}
402**Description:** {dependency.description or "None"}
404This relationship will be used for cross-project search and coordination."""
406 except Exception as e:
407 return f"❌ Failed to add project dependency: {e}"
410async def search_across_projects(
411 query: str,
412 current_project: str,
413 limit: int = 10,
414) -> str:
415 """Search conversations across related projects."""
416 multi_project_coordinator = await _get_multi_project_coordinator()
417 if not multi_project_coordinator: 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true
418 return "❌ Multi-project coordination not available"
420 try:
421 results = await multi_project_coordinator.find_related_conversations(
422 current_project=current_project,
423 query=query,
424 limit=limit,
425 )
427 if not results: 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true
428 return f"🔍 No results found for '{query}' across related projects"
430 output = [f"🔍 **Cross-Project Search Results** ({len(results)} found)\n"]
432 for i, result in enumerate(results, 1):
433 project_indicator = (
434 "📍 Current"
435 if result["is_current_project"]
436 else f"🔗 {result['source_project']}"
437 )
439 output.append(f"""**{i}.** {project_indicator}
440**Score:** {result["score"]:.3f}
441**Content:** {result["content"][:200]}{"..." if len(result["content"]) > 200 else ""}
442**Timestamp:** {result.get("timestamp", "Unknown")}
443---""")
445 return "\n".join(output)
447 except Exception as e:
448 return f"❌ Search failed: {e}"
451async def get_project_insights(projects: list[str], time_range_days: int = 30) -> str:
452 """Get cross-project insights and collaboration opportunities."""
453 multi_project_coordinator = await _get_multi_project_coordinator()
454 if not multi_project_coordinator: 454 ↛ 457line 454 didn't jump to line 457 because the condition on line 454 was always true
455 return "❌ Multi-project coordination not available"
457 try:
458 from .utils.server_helpers import _format_project_insights
460 insights = await multi_project_coordinator.get_cross_project_insights(
461 projects=projects,
462 time_range_days=time_range_days,
463 )
464 return _format_project_insights(insights, time_range_days)
466 except Exception as e:
467 return f"❌ Failed to get insights: {e}"
470async def _get_multi_project_coordinator() -> t.Any:
471 """Get or initialize multi-project coordinator."""
472 try:
473 from session_buddy.multi_project_coordinator import MultiProjectCoordinator
474 from session_buddy.reflection_tools import get_reflection_database
476 # Type ignore: get_reflection_database returns ReflectionDatabaseAdapter
477 # which is compatible with ReflectionDatabaseProtocol
478 db = await get_reflection_database() # type: ignore[arg-type]
479 return MultiProjectCoordinator(db)
480 except Exception:
481 return None
484# ================================
485# Advanced Search Tools
486# ================================
489async def advanced_search(
490 query: str,
491 content_type: str | None = None,
492 project: str | None = None,
493 timeframe: str | None = None,
494 sort_by: str = "relevance",
495 limit: int = 10,
496) -> str:
497 """Perform advanced search with faceted filtering."""
498 advanced_search_engine = await _get_advanced_search_engine()
499 if not advanced_search_engine: 499 ↛ 502line 499 didn't jump to line 502 because the condition on line 499 was always true
500 return "❌ Advanced search not available"
502 try:
503 from .utils.server_helpers import _format_advanced_search_results
505 filters = _build_advanced_search_filters(content_type, project, timeframe)
506 search_results = await advanced_search_engine.search(
507 query=query,
508 filters=filters,
509 sort_by=sort_by,
510 limit=limit,
511 include_highlights=True,
512 )
514 results = search_results["results"]
515 if not results:
516 return f"🔍 No results found for '{query}'"
518 return _format_advanced_search_results(results)
520 except Exception as e:
521 return f"❌ Advanced search failed: {e}"
524def _build_advanced_search_filters(
525 content_type: str | None,
526 project: str | None,
527 timeframe: str | None,
528) -> list[t.Any]:
529 """Build search filters from parameters."""
530 filters = []
532 if content_type:
533 from session_buddy.advanced_search import SearchFilter
535 filters.append(
536 SearchFilter(field="content_type", operator="eq", value=content_type),
537 )
539 if project:
540 from session_buddy.advanced_search import SearchFilter
542 filters.append(SearchFilter(field="project", operator="eq", value=project))
544 if timeframe:
545 from session_buddy.advanced_search import SearchFilter
547 # Get engine for timeframe parsing
548 advanced_search_engine = _get_advanced_search_engine_sync()
549 if advanced_search_engine:
550 start_time, end_time = advanced_search_engine._parse_timeframe(timeframe)
551 filters.append(
552 SearchFilter(
553 field="timestamp",
554 operator="range",
555 value=(start_time, end_time),
556 ),
557 )
559 return filters
562async def search_suggestions(query: str, field: str = "content", limit: int = 5) -> str:
563 """Get search completion suggestions."""
564 advanced_search_engine = await _get_advanced_search_engine()
565 if not advanced_search_engine: 565 ↛ 566line 565 didn't jump to line 566 because the condition on line 565 was never true
566 return "❌ Advanced search not available"
568 try:
569 suggestions = await advanced_search_engine.suggest_completions(
570 query=query,
571 field=field,
572 limit=limit,
573 )
575 if not suggestions: 575 ↛ 576line 575 didn't jump to line 576 because the condition on line 575 was never true
576 return f"💡 No suggestions found for '{query}'"
578 output = [f"💡 **Search Suggestions** for '{query}':\n"]
580 for i, suggestion in enumerate(suggestions, 1): 580 ↛ 581line 580 didn't jump to line 581 because the loop on line 580 never started
581 output.append(
582 f"{i}. {suggestion['text']} (frequency: {suggestion['frequency']})",
583 )
585 return "\n".join(output)
587 except Exception as e:
588 return f"❌ Failed to get suggestions: {e}"
591async def get_search_metrics(metric_type: str, timeframe: str = "30d") -> str:
592 """Get search and activity metrics."""
593 advanced_search_engine = await _get_advanced_search_engine()
594 if not advanced_search_engine: 594 ↛ 595line 594 didn't jump to line 595 because the condition on line 594 was never true
595 return "❌ Advanced search not available"
597 try:
598 metrics = await advanced_search_engine.aggregate_metrics(
599 metric_type=metric_type,
600 timeframe=timeframe,
601 )
603 if "error" in metrics: 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true
604 return f"❌ {metrics['error']}"
606 output = [f"📊 **{metric_type.title()} Metrics** ({timeframe})\n"]
608 for item in metrics["data"][:10]: # Top 10 608 ↛ 609line 608 didn't jump to line 609 because the loop on line 608 never started
609 output.append(f"• **{item['key']}:** {item['value']}")
611 if not metrics["data"]: 611 ↛ 612line 611 didn't jump to line 612 because the condition on line 611 was never true
612 output.append("No data available for the specified timeframe")
614 return "\n".join(output)
616 except Exception as e:
617 return f"❌ Failed to get metrics: {e}"
620async def _get_advanced_search_engine() -> t.Any:
621 """Get or initialize advanced search engine."""
622 try:
623 from session_buddy.advanced_search import AdvancedSearchEngine
624 from session_buddy.reflection_tools import get_reflection_database
626 # Type ignore: get_reflection_database returns ReflectionDatabaseAdapter
627 # which is compatible with AdvancedSearchEngine's expected type
628 db = await get_reflection_database() # type: ignore[arg-type]
629 return AdvancedSearchEngine(db) # type: ignore[arg-type]
630 except Exception:
631 return None
634def _get_advanced_search_engine_sync() -> t.Any:
635 """Synchronous helper to get advanced search engine."""
636 try:
637 import asyncio
639 return asyncio.run(_get_advanced_search_engine())
640 except Exception:
641 return None
644# ================================
645# Git Worktree Management Tools
646# ================================
649def _get_worktree_indicators(is_main: bool, is_detached: bool) -> tuple[str, str]:
650 """Get the main and detached indicators for a worktree."""
651 main_indicator = " (main)" if is_main else ""
652 detached_indicator = " (detached)" if is_detached else ""
653 return main_indicator, detached_indicator
656def _resolve_worktree_working_dir(working_directory: str | None) -> Path:
657 """Resolve a safe working directory for git worktree operations."""
658 if working_directory: 658 ↛ 659line 658 didn't jump to line 659 because the condition on line 658 was never true
659 return Path(working_directory)
660 try:
661 return Path.cwd()
662 except FileNotFoundError:
663 return Path.home()
666async def git_worktree_add(
667 branch: str,
668 path: str,
669 working_directory: str | None = None,
670 create_branch: bool = False,
671) -> str:
672 """Create a new git worktree."""
673 from .utils.logging import get_session_logger
674 from .worktree_manager import WorktreeManager
676 # Get session logger from DI container (using helper to avoid type conflicts)
677 session_logger = get_session_logger()
679 working_dir = _resolve_worktree_working_dir(working_directory)
680 new_path = Path(path)
682 if not new_path.is_absolute(): 682 ↛ 683line 682 didn't jump to line 683 because the condition on line 682 was never true
683 new_path = working_dir.parent / path
685 manager = WorktreeManager(session_logger=session_logger)
687 try:
688 result = await manager.create_worktree(
689 repository_path=working_dir,
690 new_path=new_path,
691 branch=branch,
692 create_branch=create_branch,
693 )
695 if not result["success"]: 695 ↛ 696line 695 didn't jump to line 696 because the condition on line 695 was never true
696 return f"❌ {result['error']}"
698 output = [
699 "🎉 **Worktree Created Successfully!**\n",
700 f"🌿 Branch: {result['branch']}",
701 f"📁 Path: {result['worktree_path']}",
702 f"🎯 Created new branch: {'Yes' if create_branch else 'No'}",
703 ]
705 if result.get("output"): 705 ↛ 708line 705 didn't jump to line 708 because the condition on line 705 was always true
706 output.append(f"\n📝 Git output: {result['output']}")
708 output.extend(
709 (
710 f"\n💡 To start working: cd {result['worktree_path']}",
711 "💡 Use `git_worktree_list` to see all worktrees",
712 )
713 )
715 return "\n".join(output)
717 except Exception as e:
718 session_logger.exception(f"git_worktree_add failed: {e}")
719 return f"❌ Failed to create worktree: {e}"
722async def git_worktree_remove(
723 path: str,
724 working_directory: str | None = None,
725 force: bool = False,
726) -> str:
727 """Remove an existing git worktree."""
728 from .utils.logging import get_session_logger
729 from .worktree_manager import WorktreeManager
731 # Get session logger from DI container (using helper to avoid type conflicts)
732 session_logger = get_session_logger()
734 working_dir = _resolve_worktree_working_dir(working_directory)
735 remove_path = Path(path)
737 if not remove_path.is_absolute(): 737 ↛ 738line 737 didn't jump to line 738 because the condition on line 737 was never true
738 remove_path = working_dir.parent / path
740 manager = WorktreeManager(session_logger=session_logger)
742 try:
743 result = await manager.remove_worktree(
744 repository_path=working_dir,
745 worktree_path=remove_path,
746 force=force,
747 )
749 if not result["success"]: 749 ↛ 750line 749 didn't jump to line 750 because the condition on line 749 was never true
750 return f"❌ {result['error']}"
752 output = [
753 "🗑️ **Worktree Removed Successfully!**\n",
754 f"📁 Removed path: {result['removed_path']}",
755 ]
757 if result.get("output"): 757 ↛ 760line 757 didn't jump to line 760 because the condition on line 757 was always true
758 output.append(f"📝 Git output: {result['output']}")
760 output.extend(
761 (
762 f"\n💡 Used force removal: {'Yes' if force else 'No'}",
763 "💡 Use `git_worktree_list` to see remaining worktrees",
764 )
765 )
767 return "\n".join(output)
769 except Exception as e:
770 session_logger.exception(f"git_worktree_remove failed: {e}")
771 return f"❌ Failed to remove worktree: {e}"
774def _format_worktree_switch_result(result: dict[str, t.Any]) -> str:
775 """Format worktree switch result into human-readable output."""
776 output = [
777 "**Worktree Context Switch Complete**\n",
778 f" From: {result['from_worktree']['branch']} ({result['from_worktree']['path']})",
779 f" To: {result['to_worktree']['branch']} ({result['to_worktree']['path']})",
780 ]
782 if result["context_preserved"]: 782 ↛ 785line 782 didn't jump to line 785 because the condition on line 782 was always true
783 output.extend(_format_context_preserved(result))
784 else:
785 output.extend(_format_context_failed(result))
787 return "\n".join(output)
790def _format_context_preserved(result: dict[str, t.Any]) -> list[str]:
791 """Format preserved context information."""
792 messages = [" Session context preserved during switch"]
794 if result.get("session_state_saved"): 794 ↛ 795line 794 didn't jump to line 795 because the condition on line 794 was never true
795 messages.append(" Current session state saved")
796 if result.get("session_state_restored"): 796 ↛ 797line 796 didn't jump to line 797 because the condition on line 796 was never true
797 messages.append(" Session state restored for target worktree")
799 return messages
802def _format_context_failed(result: dict[str, t.Any]) -> list[str]:
803 """Format failed context information."""
804 messages = [" Session context preservation failed (basic switch performed)"]
806 if result.get("session_error"):
807 messages.append(f" Error: {result['session_error']}")
809 return messages
812async def git_worktree_switch(from_path: str, to_path: str) -> str:
813 """Switch context between git worktrees with session preservation."""
814 from .utils.logging import get_session_logger
815 from .worktree_manager import WorktreeManager
817 # Get session logger from DI container (using helper to avoid type conflicts)
818 session_logger = get_session_logger()
820 manager = WorktreeManager(session_logger=session_logger)
822 try:
823 result = await manager.switch_worktree_context(Path(from_path), Path(to_path))
825 if not result["success"]: 825 ↛ 826line 825 didn't jump to line 826 because the condition on line 825 was never true
826 return f" {result['error']}"
828 return _format_worktree_switch_result(result)
830 except Exception as e:
831 session_logger.exception(f"git_worktree_switch failed: {e}")
832 return f"❌ Failed to switch worktree context: {e}"
835# ================================
836# Session Welcome Tool
837# ================================
839# Global connection info (will be set by server lifecycle)
840_connection_info: dict[str, t.Any] | None = None
843def set_connection_info(info: dict[str, t.Any]) -> None:
844 """Set connection info for session welcome (called from server lifespan)."""
845 global _connection_info
846 _connection_info = info
849async def session_welcome() -> str:
850 """Display session connection information and previous session details."""
851 global _connection_info
853 if not _connection_info: 853 ↛ 856line 853 didn't jump to line 856 because the condition on line 853 was always true
854 return "ℹ️ Session information not available (may not be a git repository)"
856 output = []
857 output.append("🚀 Session Management Connected!")
858 output.append("=" * 40)
860 # Current session info
861 output.append(f"📁 Project: {_connection_info['project']}")
862 output.append(f"📊 Current quality score: {_connection_info['quality_score']}/100")
863 output.append(f"🔗 Connection status: {_connection_info['connected_at']}")
865 # Previous session info
866 previous = _connection_info.get("previous_session")
867 if previous:
868 output.extend(("\n📋 Previous Session Summary:", "-" * 30))
870 if "ended_at" in previous:
871 output.append(f"⏰ Last session ended: {previous['ended_at']}")
872 if "quality_score" in previous:
873 output.append(f"📈 Final score: {previous['quality_score']}")
874 if "top_recommendation" in previous:
875 output.append(f"💡 Key recommendation: {previous['top_recommendation']}")
877 output.append("\n✨ Session continuity restored - your progress is preserved!")
878 else:
879 output.extend(
880 (
881 "\n🌟 This is your first session in this project!",
882 "💡 Session data will be preserved for future continuity",
883 )
884 )
886 # Current recommendations
887 recommendations = _connection_info.get("recommendations", [])
888 if recommendations:
889 output.append("\n🎯 Current Recommendations:")
890 for i, rec in enumerate(recommendations[:3], 1):
891 output.append(f" {i}. {rec}")
893 output.extend(
894 (
895 "\n🔧 Use other session-mgmt tools for:",
896 " • /session-buddy:status - Detailed project health",
897 " • /session-buddy:checkpoint - Mid-session quality check",
898 " • /session-buddy:end - Graceful session cleanup",
899 )
900 )
902 # Clear the connection info after display
903 _connection_info = None
905 return "\n".join(output)