Coverage for session_buddy / advanced_features.py: 52.30%

359 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1"""Advanced Feature Hub for MCP Tools. 

2 

3This module provides advanced MCP tools for multi-project coordination, 

4git worktree management, natural language scheduling, and enhanced search. 

5 

6Extracted from server.py Phase 2.4 - Contains 17 MCP tool implementations: 

7- Natural language reminder tools (5 MCP tools) 

8- Interruption management tools (1 MCP tool) 

9- Multi-project coordination (4 MCP tools) 

10- Advanced search capabilities (3 MCP tools) 

11- Git worktree management (3 MCP tools) 

12- Session welcome tool (1 MCP tool) 

13""" 

14 

15from __future__ import annotations 

16 

17import typing as t 

18from pathlib import Path 

19 

20if t.TYPE_CHECKING: 

21 from session_buddy.utils.logging import SessionLogger 

22 

23 

24class AdvancedFeaturesHub: 

25 """Coordinator for advanced MCP feature tools. 

26 

27 Provides lazy initialization and coordination for optional 

28 advanced features like multi-project support, worktrees, etc. 

29 """ 

30 

31 def __init__(self, logger: SessionLogger) -> None: 

32 """Initialize advanced features hub. 

33 

34 Args: 

35 logger: Session logger for feature events 

36 

37 """ 

38 self.logger = logger 

39 self._multi_project_initialized = False 

40 self._advanced_search_initialized = False 

41 self._app_monitor_initialized = False 

42 

43 async def initialize_multi_project(self) -> bool: 

44 """Initialize multi-project coordination features. 

45 

46 Returns: 

47 True if initialized successfully 

48 

49 """ 

50 msg = "initialize_multi_project not yet implemented" 

51 raise NotImplementedError(msg) 

52 

53 async def initialize_advanced_search(self) -> bool: 

54 """Initialize advanced search capabilities. 

55 

56 Returns: 

57 True if initialized successfully 

58 

59 """ 

60 msg = "initialize_advanced_search not yet implemented" 

61 raise NotImplementedError(msg) 

62 

63 async def initialize_app_monitor(self) -> bool: 

64 """Initialize application monitoring. 

65 

66 Returns: 

67 True if initialized successfully 

68 

69 """ 

70 msg = "initialize_app_monitor not yet implemented" 

71 raise NotImplementedError(msg) 

72 

73 

74# ================================ 

75# Natural Language Scheduling Tools 

76# ================================ 

77 

78 

79async def create_natural_reminder( 

80 title: str, 

81 time_expression: str, 

82 description: str = "", 

83 user_id: str = "default", 

84 project_id: str | None = None, 

85 notification_method: str = "session", 

86) -> str: 

87 """Create reminder from natural language time expression.""" 

88 try: 

89 from .natural_scheduler import ( 

90 create_natural_reminder as _create_natural_reminder, 

91 ) 

92 

93 reminder_id = await _create_natural_reminder( 

94 title, 

95 time_expression, 

96 description, 

97 user_id, 

98 project_id, 

99 notification_method, 

100 ) 

101 

102 if reminder_id: 102 ↛ 123line 102 didn't jump to line 123 because the condition on line 102 was always true

103 output = [] 

104 output.extend( 

105 ( 

106 "⏰ Natural reminder created successfully!", 

107 f"🆔 Reminder ID: {reminder_id}", 

108 f"📝 Title: {title}", 

109 f"📄 Description: {description}", 

110 f"🕐 When: {time_expression}", 

111 f"👤 User: {user_id}", 

112 ) 

113 ) 

114 if project_id: 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true

115 output.append(f"📁 Project: {project_id}") 

116 output.extend( 

117 ( 

118 f"📢 Notification: {notification_method}", 

119 "🎯 Reminder will trigger automatically at the scheduled time", 

120 ) 

121 ) 

122 return "\n".join(output) 

123 return f"❌ Failed to parse time expression: '{time_expression}'\n💡 Try formats like 'in 30 minutes', 'tomorrow at 9am', 'every day at 5pm'" 

124 

125 except ImportError: 

126 return "❌ Natural scheduling tools not available. Install: pip install python-dateutil schedule python-crontab" 

127 except Exception as e: 

128 return f"❌ Error creating reminder: {e}" 

129 

130 

131async def list_user_reminders( 

132 user_id: str = "default", 

133 project_id: str | None = None, 

134) -> str: 

135 """List pending reminders for user/project.""" 

136 try: 

137 from .natural_scheduler import list_user_reminders as _list_user_reminders 

138 

139 # Import formatting functions 

140 from .utils.server_helpers import ( 

141 _format_no_reminders_message, 

142 _format_reminders_list, 

143 ) 

144 

145 reminders = await _list_user_reminders(user_id, project_id) 

146 

147 if not reminders: 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true

148 output = _format_no_reminders_message(user_id, project_id) 

149 return "\n".join(output) 

150 

151 output = _format_reminders_list(reminders, user_id, project_id) 

152 return "\n".join(output) 

153 

154 except ImportError: 

155 return "❌ Natural scheduling tools not available" 

156 except Exception as e: 

157 return f"❌ Error listing reminders: {e}" 

158 

159 

160async def cancel_user_reminder(reminder_id: str) -> str: 

161 """Cancel a specific reminder.""" 

162 try: 

163 from .natural_scheduler import cancel_user_reminder as _cancel_user_reminder 

164 

165 success = await _cancel_user_reminder(reminder_id) 

166 

167 if success: 167 ↛ 178line 167 didn't jump to line 178 because the condition on line 167 was always true

168 output = [] 

169 output.extend( 

170 ( 

171 "❌ Reminder cancelled successfully!", 

172 f"🆔 Reminder ID: {reminder_id}", 

173 "🚫 The reminder will no longer trigger", 

174 "💡 You can create a new reminder if needed", 

175 ) 

176 ) 

177 return "\n".join(output) 

178 return f"❌ Failed to cancel reminder {reminder_id}. Check that the ID is correct and the reminder exists" 

179 

180 except ImportError: 

181 return "❌ Natural scheduling tools not available" 

182 except Exception as e: 

183 return f"❌ Error cancelling reminder: {e}" 

184 

185 

186def _calculate_overdue_time(scheduled_for: str) -> str: 

187 """Calculate and format overdue time.""" 

188 try: 

189 from datetime import datetime 

190 

191 scheduled = datetime.fromisoformat(scheduled_for) 

192 now = datetime.now() 

193 overdue = now - scheduled 

194 

195 if overdue.total_seconds() > 0: 195 ↛ 201line 195 didn't jump to line 201 because the condition on line 195 was always true

196 hours = int(overdue.total_seconds() // 3600) 

197 minutes = int((overdue.total_seconds() % 3600) // 60) 

198 if hours > 0: 198 ↛ 200line 198 didn't jump to line 200 because the condition on line 198 was always true

199 return f"⏱️ Overdue: {hours}h {minutes}m" 

200 return f"⏱️ Overdue: {minutes}m" 

201 return "⏱️ Not yet due" 

202 except Exception as e: 

203 return f"❌ Error checking due reminders: {e}" 

204 

205 

206async def start_reminder_service() -> str: 

207 """Start the background reminder service.""" 

208 try: 

209 from .natural_scheduler import ( 

210 register_session_notifications, 

211 ) 

212 from .natural_scheduler import ( 

213 start_reminder_service as _start_reminder_service, 

214 ) 

215 

216 # Register default session notifications 

217 register_session_notifications() 

218 

219 # Start the service 

220 _start_reminder_service() 

221 

222 output = [] 

223 output.extend( 

224 ( 

225 "🚀 Natural reminder service started!", 

226 "⏰ Background scheduler is now active", 

227 "🔍 Checking for due reminders every minute", 

228 "📢 Session notifications are registered", 

229 "💡 Reminders will automatically trigger at their scheduled times", 

230 "🛑 Use 'stop_reminder_service' to stop the background service", 

231 ) 

232 ) 

233 

234 return "\n".join(output) 

235 

236 except ImportError: 

237 return "❌ Natural scheduling tools not available" 

238 except Exception as e: 

239 return f"❌ Error starting reminder service: {e}" 

240 

241 

242async def stop_reminder_service() -> str: 

243 """Stop the background reminder service.""" 

244 try: 

245 from .natural_scheduler import stop_reminder_service as _stop_reminder_service 

246 

247 _stop_reminder_service() 

248 

249 output = [] 

250 output.extend( 

251 ( 

252 "🛑 Natural reminder service stopped", 

253 "❌ Background scheduler is no longer active", 

254 "⚠️ Existing reminders will not trigger automatically", 

255 "🚀 Use 'start_reminder_service' to restart the service", 

256 "💡 You can still check due reminders manually with 'check_due_reminders'", 

257 ) 

258 ) 

259 

260 return "\n".join(output) 

261 

262 except ImportError: 

263 return "❌ Natural scheduling tools not available" 

264 except Exception as e: 

265 return f"❌ Error stopping reminder service: {e}" 

266 

267 

268# ================================ 

269# Interruption Management Tools 

270# ================================ 

271 

272 

273async def get_interruption_statistics(user_id: str) -> str: 

274 """Get comprehensive interruption and context preservation statistics.""" 

275 try: 

276 from .interruption_manager import ( 

277 get_interruption_statistics as _get_interruption_statistics, 

278 ) 

279 

280 # Import formatting functions 

281 from .utils import ( 

282 _format_efficiency_metrics, 

283 _format_no_data_message, 

284 _format_statistics_header, 

285 ) 

286 from .utils.server_helpers import ( 

287 _format_interruption_statistics, 

288 _format_snapshot_statistics, 

289 ) 

290 

291 stats = await _get_interruption_statistics(user_id) 

292 output = _format_statistics_header(user_id) 

293 

294 # Get statistics sections 

295 sessions = stats.get("sessions", {}) 

296 interruptions = stats.get("interruptions", {}) 

297 snapshots = stats.get("snapshots", {}) 

298 by_type = interruptions.get("by_type", []) 

299 

300 # Format all sections 

301 output.extend(_format_session_statistics(sessions)) 

302 output.extend(_format_interruption_statistics(interruptions)) 

303 output.extend(_format_snapshot_statistics(snapshots)) 

304 output.extend(_format_efficiency_metrics(sessions, interruptions, by_type)) 

305 

306 # Check if we have any data 

307 if not _has_statistics_data(sessions, interruptions, snapshots): 

308 output = _format_no_data_message(user_id) 

309 

310 return "\n".join(output) 

311 

312 except ImportError: 

313 return "❌ Interruption management tools not available" 

314 except Exception as e: 

315 return f"❌ Error getting statistics: {e}" 

316 

317 

318def _format_session_statistics(sessions: dict[str, t.Any]) -> list[str]: 

319 """Format session statistics section.""" 

320 output = [] 

321 if sessions: 321 ↛ 329line 321 didn't jump to line 329 because the condition on line 321 was always true

322 output.append("\n📊 Session Statistics:") 

323 if "total_sessions" in sessions: 323 ↛ 325line 323 didn't jump to line 325 because the condition on line 323 was always true

324 output.append(f" • Total sessions: {sessions['total_sessions']}") 

325 if "active_sessions" in sessions: 325 ↛ 327line 325 didn't jump to line 327 because the condition on line 325 was always true

326 output.append(f" • Active sessions: {sessions['active_sessions']}") 

327 if "avg_duration" in sessions: 327 ↛ 329line 327 didn't jump to line 329 because the condition on line 327 was always true

328 output.append(f" • Average duration: {sessions['avg_duration']}") 

329 return output 

330 

331 

332def _has_statistics_data( 

333 sessions: t.Any, 

334 interruptions: t.Any, 

335 snapshots: t.Any, 

336) -> bool: 

337 """Check if we have any statistics data to display.""" 

338 return bool(sessions or interruptions or snapshots) 

339 

340 

341# ================================ 

342# Multi-Project Coordination Tools 

343# ================================ 

344 

345 

346async def create_project_group( 

347 name: str, 

348 projects: list[str], 

349 description: str = "", 

350) -> str: 

351 """Create a new project group for multi-project coordination.""" 

352 # Lazy initialization 

353 

354 multi_project_coordinator = await _get_multi_project_coordinator() 

355 if not multi_project_coordinator: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true

356 return "❌ Multi-project coordination not available" 

357 

358 try: 

359 group = await multi_project_coordinator.create_project_group( 

360 name=name, 

361 projects=projects, 

362 description=description, 

363 ) 

364 

365 return f"""✅ **Project Group Created** 

366 

367**Group:** {group.name} 

368**Projects:** {", ".join(group.projects)} 

369**Description:** {group.description or "None"} 

370**ID:** {group.id} 

371 

372The project group is now available for cross-project coordination and knowledge sharing.""" 

373 

374 except Exception as e: 

375 return f"❌ Failed to create project group: {e}" 

376 

377 

378async def add_project_dependency( 

379 source_project: str, 

380 target_project: str, 

381 dependency_type: t.Literal["uses", "extends", "references", "shares_code"], 

382 description: str = "", 

383) -> str: 

384 """Add a dependency relationship between projects.""" 

385 multi_project_coordinator = await _get_multi_project_coordinator() 

386 if not multi_project_coordinator: 386 ↛ 387line 386 didn't jump to line 387 because the condition on line 386 was never true

387 return "❌ Multi-project coordination not available" 

388 

389 try: 

390 dependency = await multi_project_coordinator.add_project_dependency( 

391 source_project=source_project, 

392 target_project=target_project, 

393 dependency_type=dependency_type, 

394 description=description, 

395 ) 

396 

397 return f"""✅ **Project Dependency Added** 

398 

399**Source:** {dependency.source_project} 

400**Target:** {dependency.target_project} 

401**Type:** {dependency.dependency_type} 

402**Description:** {dependency.description or "None"} 

403 

404This relationship will be used for cross-project search and coordination.""" 

405 

406 except Exception as e: 

407 return f"❌ Failed to add project dependency: {e}" 

408 

409 

410async def search_across_projects( 

411 query: str, 

412 current_project: str, 

413 limit: int = 10, 

414) -> str: 

415 """Search conversations across related projects.""" 

416 multi_project_coordinator = await _get_multi_project_coordinator() 

417 if not multi_project_coordinator: 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true

418 return "❌ Multi-project coordination not available" 

419 

420 try: 

421 results = await multi_project_coordinator.find_related_conversations( 

422 current_project=current_project, 

423 query=query, 

424 limit=limit, 

425 ) 

426 

427 if not results: 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true

428 return f"🔍 No results found for '{query}' across related projects" 

429 

430 output = [f"🔍 **Cross-Project Search Results** ({len(results)} found)\n"] 

431 

432 for i, result in enumerate(results, 1): 

433 project_indicator = ( 

434 "📍 Current" 

435 if result["is_current_project"] 

436 else f"🔗 {result['source_project']}" 

437 ) 

438 

439 output.append(f"""**{i}.** {project_indicator} 

440**Score:** {result["score"]:.3f} 

441**Content:** {result["content"][:200]}{"..." if len(result["content"]) > 200 else ""} 

442**Timestamp:** {result.get("timestamp", "Unknown")} 

443---""") 

444 

445 return "\n".join(output) 

446 

447 except Exception as e: 

448 return f"❌ Search failed: {e}" 

449 

450 

451async def get_project_insights(projects: list[str], time_range_days: int = 30) -> str: 

452 """Get cross-project insights and collaboration opportunities.""" 

453 multi_project_coordinator = await _get_multi_project_coordinator() 

454 if not multi_project_coordinator: 454 ↛ 457line 454 didn't jump to line 457 because the condition on line 454 was always true

455 return "❌ Multi-project coordination not available" 

456 

457 try: 

458 from .utils.server_helpers import _format_project_insights 

459 

460 insights = await multi_project_coordinator.get_cross_project_insights( 

461 projects=projects, 

462 time_range_days=time_range_days, 

463 ) 

464 return _format_project_insights(insights, time_range_days) 

465 

466 except Exception as e: 

467 return f"❌ Failed to get insights: {e}" 

468 

469 

470async def _get_multi_project_coordinator() -> t.Any: 

471 """Get or initialize multi-project coordinator.""" 

472 try: 

473 from session_buddy.multi_project_coordinator import MultiProjectCoordinator 

474 from session_buddy.reflection_tools import get_reflection_database 

475 

476 # Type ignore: get_reflection_database returns ReflectionDatabaseAdapter 

477 # which is compatible with ReflectionDatabaseProtocol 

478 db = await get_reflection_database() # type: ignore[arg-type] 

479 return MultiProjectCoordinator(db) 

480 except Exception: 

481 return None 

482 

483 

484# ================================ 

485# Advanced Search Tools 

486# ================================ 

487 

488 

489async def advanced_search( 

490 query: str, 

491 content_type: str | None = None, 

492 project: str | None = None, 

493 timeframe: str | None = None, 

494 sort_by: str = "relevance", 

495 limit: int = 10, 

496) -> str: 

497 """Perform advanced search with faceted filtering.""" 

498 advanced_search_engine = await _get_advanced_search_engine() 

499 if not advanced_search_engine: 499 ↛ 502line 499 didn't jump to line 502 because the condition on line 499 was always true

500 return "❌ Advanced search not available" 

501 

502 try: 

503 from .utils.server_helpers import _format_advanced_search_results 

504 

505 filters = _build_advanced_search_filters(content_type, project, timeframe) 

506 search_results = await advanced_search_engine.search( 

507 query=query, 

508 filters=filters, 

509 sort_by=sort_by, 

510 limit=limit, 

511 include_highlights=True, 

512 ) 

513 

514 results = search_results["results"] 

515 if not results: 

516 return f"🔍 No results found for '{query}'" 

517 

518 return _format_advanced_search_results(results) 

519 

520 except Exception as e: 

521 return f"❌ Advanced search failed: {e}" 

522 

523 

524def _build_advanced_search_filters( 

525 content_type: str | None, 

526 project: str | None, 

527 timeframe: str | None, 

528) -> list[t.Any]: 

529 """Build search filters from parameters.""" 

530 filters = [] 

531 

532 if content_type: 

533 from session_buddy.advanced_search import SearchFilter 

534 

535 filters.append( 

536 SearchFilter(field="content_type", operator="eq", value=content_type), 

537 ) 

538 

539 if project: 

540 from session_buddy.advanced_search import SearchFilter 

541 

542 filters.append(SearchFilter(field="project", operator="eq", value=project)) 

543 

544 if timeframe: 

545 from session_buddy.advanced_search import SearchFilter 

546 

547 # Get engine for timeframe parsing 

548 advanced_search_engine = _get_advanced_search_engine_sync() 

549 if advanced_search_engine: 

550 start_time, end_time = advanced_search_engine._parse_timeframe(timeframe) 

551 filters.append( 

552 SearchFilter( 

553 field="timestamp", 

554 operator="range", 

555 value=(start_time, end_time), 

556 ), 

557 ) 

558 

559 return filters 

560 

561 

562async def search_suggestions(query: str, field: str = "content", limit: int = 5) -> str: 

563 """Get search completion suggestions.""" 

564 advanced_search_engine = await _get_advanced_search_engine() 

565 if not advanced_search_engine: 565 ↛ 566line 565 didn't jump to line 566 because the condition on line 565 was never true

566 return "❌ Advanced search not available" 

567 

568 try: 

569 suggestions = await advanced_search_engine.suggest_completions( 

570 query=query, 

571 field=field, 

572 limit=limit, 

573 ) 

574 

575 if not suggestions: 575 ↛ 576line 575 didn't jump to line 576 because the condition on line 575 was never true

576 return f"💡 No suggestions found for '{query}'" 

577 

578 output = [f"💡 **Search Suggestions** for '{query}':\n"] 

579 

580 for i, suggestion in enumerate(suggestions, 1): 580 ↛ 581line 580 didn't jump to line 581 because the loop on line 580 never started

581 output.append( 

582 f"{i}. {suggestion['text']} (frequency: {suggestion['frequency']})", 

583 ) 

584 

585 return "\n".join(output) 

586 

587 except Exception as e: 

588 return f"❌ Failed to get suggestions: {e}" 

589 

590 

591async def get_search_metrics(metric_type: str, timeframe: str = "30d") -> str: 

592 """Get search and activity metrics.""" 

593 advanced_search_engine = await _get_advanced_search_engine() 

594 if not advanced_search_engine: 594 ↛ 595line 594 didn't jump to line 595 because the condition on line 594 was never true

595 return "❌ Advanced search not available" 

596 

597 try: 

598 metrics = await advanced_search_engine.aggregate_metrics( 

599 metric_type=metric_type, 

600 timeframe=timeframe, 

601 ) 

602 

603 if "error" in metrics: 603 ↛ 604line 603 didn't jump to line 604 because the condition on line 603 was never true

604 return f"{metrics['error']}" 

605 

606 output = [f"📊 **{metric_type.title()} Metrics** ({timeframe})\n"] 

607 

608 for item in metrics["data"][:10]: # Top 10 608 ↛ 609line 608 didn't jump to line 609 because the loop on line 608 never started

609 output.append(f"• **{item['key']}:** {item['value']}") 

610 

611 if not metrics["data"]: 611 ↛ 612line 611 didn't jump to line 612 because the condition on line 611 was never true

612 output.append("No data available for the specified timeframe") 

613 

614 return "\n".join(output) 

615 

616 except Exception as e: 

617 return f"❌ Failed to get metrics: {e}" 

618 

619 

620async def _get_advanced_search_engine() -> t.Any: 

621 """Get or initialize advanced search engine.""" 

622 try: 

623 from session_buddy.advanced_search import AdvancedSearchEngine 

624 from session_buddy.reflection_tools import get_reflection_database 

625 

626 # Type ignore: get_reflection_database returns ReflectionDatabaseAdapter 

627 # which is compatible with AdvancedSearchEngine's expected type 

628 db = await get_reflection_database() # type: ignore[arg-type] 

629 return AdvancedSearchEngine(db) # type: ignore[arg-type] 

630 except Exception: 

631 return None 

632 

633 

634def _get_advanced_search_engine_sync() -> t.Any: 

635 """Synchronous helper to get advanced search engine.""" 

636 try: 

637 import asyncio 

638 

639 return asyncio.run(_get_advanced_search_engine()) 

640 except Exception: 

641 return None 

642 

643 

644# ================================ 

645# Git Worktree Management Tools 

646# ================================ 

647 

648 

649def _get_worktree_indicators(is_main: bool, is_detached: bool) -> tuple[str, str]: 

650 """Get the main and detached indicators for a worktree.""" 

651 main_indicator = " (main)" if is_main else "" 

652 detached_indicator = " (detached)" if is_detached else "" 

653 return main_indicator, detached_indicator 

654 

655 

656def _resolve_worktree_working_dir(working_directory: str | None) -> Path: 

657 """Resolve a safe working directory for git worktree operations.""" 

658 if working_directory: 658 ↛ 659line 658 didn't jump to line 659 because the condition on line 658 was never true

659 return Path(working_directory) 

660 try: 

661 return Path.cwd() 

662 except FileNotFoundError: 

663 return Path.home() 

664 

665 

666async def git_worktree_add( 

667 branch: str, 

668 path: str, 

669 working_directory: str | None = None, 

670 create_branch: bool = False, 

671) -> str: 

672 """Create a new git worktree.""" 

673 from .utils.logging import get_session_logger 

674 from .worktree_manager import WorktreeManager 

675 

676 # Get session logger from DI container (using helper to avoid type conflicts) 

677 session_logger = get_session_logger() 

678 

679 working_dir = _resolve_worktree_working_dir(working_directory) 

680 new_path = Path(path) 

681 

682 if not new_path.is_absolute(): 682 ↛ 683line 682 didn't jump to line 683 because the condition on line 682 was never true

683 new_path = working_dir.parent / path 

684 

685 manager = WorktreeManager(session_logger=session_logger) 

686 

687 try: 

688 result = await manager.create_worktree( 

689 repository_path=working_dir, 

690 new_path=new_path, 

691 branch=branch, 

692 create_branch=create_branch, 

693 ) 

694 

695 if not result["success"]: 695 ↛ 696line 695 didn't jump to line 696 because the condition on line 695 was never true

696 return f"{result['error']}" 

697 

698 output = [ 

699 "🎉 **Worktree Created Successfully!**\n", 

700 f"🌿 Branch: {result['branch']}", 

701 f"📁 Path: {result['worktree_path']}", 

702 f"🎯 Created new branch: {'Yes' if create_branch else 'No'}", 

703 ] 

704 

705 if result.get("output"): 705 ↛ 708line 705 didn't jump to line 708 because the condition on line 705 was always true

706 output.append(f"\n📝 Git output: {result['output']}") 

707 

708 output.extend( 

709 ( 

710 f"\n💡 To start working: cd {result['worktree_path']}", 

711 "💡 Use `git_worktree_list` to see all worktrees", 

712 ) 

713 ) 

714 

715 return "\n".join(output) 

716 

717 except Exception as e: 

718 session_logger.exception(f"git_worktree_add failed: {e}") 

719 return f"❌ Failed to create worktree: {e}" 

720 

721 

722async def git_worktree_remove( 

723 path: str, 

724 working_directory: str | None = None, 

725 force: bool = False, 

726) -> str: 

727 """Remove an existing git worktree.""" 

728 from .utils.logging import get_session_logger 

729 from .worktree_manager import WorktreeManager 

730 

731 # Get session logger from DI container (using helper to avoid type conflicts) 

732 session_logger = get_session_logger() 

733 

734 working_dir = _resolve_worktree_working_dir(working_directory) 

735 remove_path = Path(path) 

736 

737 if not remove_path.is_absolute(): 737 ↛ 738line 737 didn't jump to line 738 because the condition on line 737 was never true

738 remove_path = working_dir.parent / path 

739 

740 manager = WorktreeManager(session_logger=session_logger) 

741 

742 try: 

743 result = await manager.remove_worktree( 

744 repository_path=working_dir, 

745 worktree_path=remove_path, 

746 force=force, 

747 ) 

748 

749 if not result["success"]: 749 ↛ 750line 749 didn't jump to line 750 because the condition on line 749 was never true

750 return f"{result['error']}" 

751 

752 output = [ 

753 "🗑️ **Worktree Removed Successfully!**\n", 

754 f"📁 Removed path: {result['removed_path']}", 

755 ] 

756 

757 if result.get("output"): 757 ↛ 760line 757 didn't jump to line 760 because the condition on line 757 was always true

758 output.append(f"📝 Git output: {result['output']}") 

759 

760 output.extend( 

761 ( 

762 f"\n💡 Used force removal: {'Yes' if force else 'No'}", 

763 "💡 Use `git_worktree_list` to see remaining worktrees", 

764 ) 

765 ) 

766 

767 return "\n".join(output) 

768 

769 except Exception as e: 

770 session_logger.exception(f"git_worktree_remove failed: {e}") 

771 return f"❌ Failed to remove worktree: {e}" 

772 

773 

774def _format_worktree_switch_result(result: dict[str, t.Any]) -> str: 

775 """Format worktree switch result into human-readable output.""" 

776 output = [ 

777 "**Worktree Context Switch Complete**\n", 

778 f" From: {result['from_worktree']['branch']} ({result['from_worktree']['path']})", 

779 f" To: {result['to_worktree']['branch']} ({result['to_worktree']['path']})", 

780 ] 

781 

782 if result["context_preserved"]: 782 ↛ 785line 782 didn't jump to line 785 because the condition on line 782 was always true

783 output.extend(_format_context_preserved(result)) 

784 else: 

785 output.extend(_format_context_failed(result)) 

786 

787 return "\n".join(output) 

788 

789 

790def _format_context_preserved(result: dict[str, t.Any]) -> list[str]: 

791 """Format preserved context information.""" 

792 messages = [" Session context preserved during switch"] 

793 

794 if result.get("session_state_saved"): 794 ↛ 795line 794 didn't jump to line 795 because the condition on line 794 was never true

795 messages.append(" Current session state saved") 

796 if result.get("session_state_restored"): 796 ↛ 797line 796 didn't jump to line 797 because the condition on line 796 was never true

797 messages.append(" Session state restored for target worktree") 

798 

799 return messages 

800 

801 

802def _format_context_failed(result: dict[str, t.Any]) -> list[str]: 

803 """Format failed context information.""" 

804 messages = [" Session context preservation failed (basic switch performed)"] 

805 

806 if result.get("session_error"): 

807 messages.append(f" Error: {result['session_error']}") 

808 

809 return messages 

810 

811 

812async def git_worktree_switch(from_path: str, to_path: str) -> str: 

813 """Switch context between git worktrees with session preservation.""" 

814 from .utils.logging import get_session_logger 

815 from .worktree_manager import WorktreeManager 

816 

817 # Get session logger from DI container (using helper to avoid type conflicts) 

818 session_logger = get_session_logger() 

819 

820 manager = WorktreeManager(session_logger=session_logger) 

821 

822 try: 

823 result = await manager.switch_worktree_context(Path(from_path), Path(to_path)) 

824 

825 if not result["success"]: 825 ↛ 826line 825 didn't jump to line 826 because the condition on line 825 was never true

826 return f" {result['error']}" 

827 

828 return _format_worktree_switch_result(result) 

829 

830 except Exception as e: 

831 session_logger.exception(f"git_worktree_switch failed: {e}") 

832 return f"❌ Failed to switch worktree context: {e}" 

833 

834 

835# ================================ 

836# Session Welcome Tool 

837# ================================ 

838 

839# Global connection info (will be set by server lifecycle) 

840_connection_info: dict[str, t.Any] | None = None 

841 

842 

843def set_connection_info(info: dict[str, t.Any]) -> None: 

844 """Set connection info for session welcome (called from server lifespan).""" 

845 global _connection_info 

846 _connection_info = info 

847 

848 

849async def session_welcome() -> str: 

850 """Display session connection information and previous session details.""" 

851 global _connection_info 

852 

853 if not _connection_info: 853 ↛ 856line 853 didn't jump to line 856 because the condition on line 853 was always true

854 return "ℹ️ Session information not available (may not be a git repository)" 

855 

856 output = [] 

857 output.append("🚀 Session Management Connected!") 

858 output.append("=" * 40) 

859 

860 # Current session info 

861 output.append(f"📁 Project: {_connection_info['project']}") 

862 output.append(f"📊 Current quality score: {_connection_info['quality_score']}/100") 

863 output.append(f"🔗 Connection status: {_connection_info['connected_at']}") 

864 

865 # Previous session info 

866 previous = _connection_info.get("previous_session") 

867 if previous: 

868 output.extend(("\n📋 Previous Session Summary:", "-" * 30)) 

869 

870 if "ended_at" in previous: 

871 output.append(f"⏰ Last session ended: {previous['ended_at']}") 

872 if "quality_score" in previous: 

873 output.append(f"📈 Final score: {previous['quality_score']}") 

874 if "top_recommendation" in previous: 

875 output.append(f"💡 Key recommendation: {previous['top_recommendation']}") 

876 

877 output.append("\n✨ Session continuity restored - your progress is preserved!") 

878 else: 

879 output.extend( 

880 ( 

881 "\n🌟 This is your first session in this project!", 

882 "💡 Session data will be preserved for future continuity", 

883 ) 

884 ) 

885 

886 # Current recommendations 

887 recommendations = _connection_info.get("recommendations", []) 

888 if recommendations: 

889 output.append("\n🎯 Current Recommendations:") 

890 for i, rec in enumerate(recommendations[:3], 1): 

891 output.append(f" {i}. {rec}") 

892 

893 output.extend( 

894 ( 

895 "\n🔧 Use other session-mgmt tools for:", 

896 " • /session-buddy:status - Detailed project health", 

897 " • /session-buddy:checkpoint - Mid-session quality check", 

898 " • /session-buddy:end - Graceful session cleanup", 

899 ) 

900 ) 

901 

902 # Clear the connection info after display 

903 _connection_info = None 

904 

905 return "\n".join(output)