Coverage for session_buddy / utils / server_helpers.py: 29.41%
193 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1"""Server Utility and Formatting Functions.
3This module provides helper functions extracted from server.py including:
4- Display formatting functions (26 functions)
5- Session initialization helpers (6 functions)
6- Additional helper functions (8 functions)
8Phase 2.2 Migration: Extracted utility functions for modularity.
9Total: 40 functions, ~900 lines
10"""
12from __future__ import annotations
14import os
15import subprocess # nosec B404
16from contextlib import suppress
17from pathlib import Path
18from typing import Any
20# Feature availability flags - set based on module availability
21TOKEN_OPTIMIZER_AVAILABLE = False
22CONFIG_AVAILABLE = False
23CRACKERJACK_INTEGRATION_AVAILABLE = False
25with suppress(ImportError):
26 from session_buddy.token_optimizer import TokenOptimizer
28 TOKEN_OPTIMIZER_AVAILABLE = True
30with suppress(ImportError):
31 from session_buddy.di.config import SessionPaths
33 CONFIG_AVAILABLE = True
35with suppress(ImportError):
36 from session_buddy.crackerjack_integration import (
37 CrackerjackIntegration,
38 )
40 CRACKERJACK_INTEGRATION_AVAILABLE = True
42# ============================================================================
43# Display Formatting Functions (26 functions)
44# ============================================================================
47def _format_metrics_summary(session_stats: dict[str, Any]) -> str:
48 """Format session metrics summary."""
49 detail_summary = (
50 f"Session metrics - Duration: {session_stats.get('duration_minutes', 0)}min, "
51 )
52 detail_summary += f"Success rate: {session_stats.get('success_rate', 0):.1f}%, "
53 detail_summary += f"Checkpoints: {session_stats.get('total_checkpoints', 0)}"
54 return detail_summary
57def _format_project_maturity_section(context_score: int, max_score: int) -> list[str]:
58 """Format the project maturity section."""
59 return [f"\n\x75 Project maturity: {context_score}/{max_score}"]
62def _format_git_worktree_header() -> str:
63 """Format the git worktree information header."""
64 return "\n\x72 Git Worktree Information:"
67def _format_current_worktree_info(worktree_info: Any) -> list[str]:
68 """Format current worktree information."""
69 output = []
70 if worktree_info.is_main_worktree:
71 output.append(
72 f" \x73 Current: Main repository on '{worktree_info.branch}'",
73 )
74 else:
75 output.extend(
76 (
77 f" s Current: Worktree on '{worktree_info.branch}'",
78 f" r Path: {worktree_info.path}",
79 )
80 )
81 return output
84def _format_worktree_count_info(all_worktrees: list[Any]) -> list[str]:
85 """Format worktree count information."""
86 output = []
87 if len(all_worktrees) > 1:
88 output.append(f" \x74 Total worktrees: {len(all_worktrees)}")
89 return output
92def _format_other_branches_info(
93 all_worktrees: list[Any],
94 worktree_info: Any,
95) -> list[str]:
96 """Format information about other branches."""
97 output = []
98 other_branches = [
99 wt.branch for wt in all_worktrees if wt.path != worktree_info.path
100 ]
101 if other_branches:
102 output.append(
103 f" \x75 Other branches: {', '.join(other_branches[:3])}",
104 )
105 if len(other_branches) > 3:
106 output.append(f" ... and {len(other_branches) - 3} more")
107 return output
110def _format_worktree_suggestions(all_worktrees: list[Any]) -> list[str]:
111 """Format worktree-related suggestions."""
112 output = []
113 if len(all_worktrees) > 1:
114 output.append(" \x74 Use 'git_worktree_list' to see all worktrees")
115 else:
116 output.append(
117 " \x74 Use 'git_worktree_add <branch> <path>' to create parallel worktrees",
118 )
119 return output
122def _format_detached_head_warning(worktree_info: Any) -> list[str]:
123 """Format detached HEAD warning if applicable."""
124 output = []
125 if worktree_info.is_detached:
126 output.append(" \x77 Detached HEAD - consider checking out a branch")
127 return output
130def _format_no_reminders_message(user_id: str, project_id: str | None) -> list[str]:
131 """Format message when no reminders are found."""
132 output = []
133 output.extend(("📋 No pending reminders found", f"👤 User: {user_id}"))
134 if project_id:
135 output.append(f"📁 Project: {project_id}")
136 output.append(
137 "💡 Use 'create_natural_reminder' to set up time-based reminders",
138 )
139 return output
142def _format_reminders_header(
143 reminders: list[dict[str, Any]],
144 user_id: str,
145 project_id: str | None,
146) -> list[str]:
147 """Format header for reminders list."""
148 output = []
149 output.extend(
150 (
151 f"⏰ Found {len(reminders)} pending reminders",
152 f"👤 User: {user_id}",
153 )
154 )
155 if project_id:
156 output.append(f"📁 Project: {project_id}")
157 output.append("=" * 50)
158 return output
161def _format_single_reminder(reminder: dict[str, Any], index: int) -> list[str]:
162 """Format a single reminder for display."""
163 output = []
164 output.extend(
165 (
166 f"\n#{index}",
167 f"🆔 ID: {reminder['id']}",
168 f"📝 Title: {reminder['title']}",
169 )
170 )
171 return output
174def _format_reminders_list(
175 reminders: list[dict[str, Any]],
176 user_id: str,
177 project_id: str | None,
178) -> list[str]:
179 """Format the complete reminders list."""
180 return _format_reminders_header(reminders, user_id, project_id)
183def _format_reminder_basic_info(reminder: dict[str, Any], index: int) -> list[str]:
184 """Format basic reminder information."""
185 return [
186 f"\n🔥 #{index} OVERDUE",
187 f"🆔 ID: {reminder['id']}",
188 f"📝 Title: {reminder['title']}",
189 ]
192def _format_project_insights(insights: dict[str, Any], time_range_days: int) -> str:
193 """Format project insights for display."""
194 return f"Project insights over {time_range_days} days: {len(insights)} items"
197def _format_project_activity_section(project_activity: dict[str, Any]) -> list[str]:
198 """Format project activity section."""
199 output = ["**📈 Project Activity:**"]
200 for project, stats in project_activity.items():
201 output.append(
202 f"• **{project}:** {stats['conversation_count']} conversations, last active: {stats.get('last_activity', 'Unknown')}",
203 )
204 output.append("")
205 return output
208def _format_common_patterns_section(common_patterns: list[dict[str, Any]]) -> list[str]:
209 """Format common patterns section."""
210 output = ["**🔍 Common Patterns:**"]
211 for pattern in common_patterns[:5]: # Top 5
212 projects_str = ", ".join(pattern["projects"])
213 output.append(
214 f"• **{pattern['pattern']}** across {projects_str} (frequency: {pattern['frequency']})",
215 )
216 output.append("")
217 return output
220def _format_advanced_search_results(results: list[Any]) -> str:
221 """Format advanced search results for display."""
222 return f"🔍 **Advanced Search Results** ({len(results)} found)\n"
225def _format_worktree_status(wt: dict[str, Any]) -> str:
226 """Format worktree status items."""
227 status_items = []
228 if wt["locked"]: 228 ↛ 230line 228 didn't jump to line 230 because the condition on line 228 was always true
229 status_items.append("🔒 locked")
230 if wt["prunable"]: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true
231 status_items.append("🗑️ prunable")
232 if not wt["exists"]: 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true
233 status_items.append("❌ missing")
234 if wt["has_session"]: 234 ↛ 236line 234 didn't jump to line 236 because the condition on line 234 was always true
235 status_items.append("🧠 has session")
236 return ", ".join(status_items) if status_items else "✓ normal"
239def _format_worktree_list_header(
240 total_count: int,
241 repo_name: str,
242 current_worktree: str,
243) -> list[str]:
244 """Format the header for the worktree list output."""
245 return [
246 f"🌿 **Git Worktrees** ({total_count} total)\\n",
247 f"📂 Repository: {repo_name}",
248 f"🎯 Current: {current_worktree}\\n",
249 ]
252def _format_single_worktree(wt: dict[str, Any]) -> list[str]:
253 """Format a single worktree entry."""
254 output = [
255 f"• {wt['branch']}",
256 f" Path: {wt['path']}",
257 ]
258 status = _format_worktree_status(wt)
259 if status != "✓ normal":
260 output.append(f" Status: {status}")
261 return output
264def _format_session_summary(result: dict[str, Any]) -> list[str]:
265 """Format session summary across all worktrees."""
266 session_summary = result["session_summary"]
267 return [
268 "📊 **Multi-Worktree Summary:**",
269 f"• Total worktrees: {result['total_worktrees']}",
270 f"• Active sessions: {session_summary['active_sessions']}",
271 f"• Unique branches: {session_summary['unique_branches']}",
272 f"• Branches: {', '.join(session_summary['branches'])}\n",
273 ]
276def _format_worktree_status_display(
277 status_info: dict[str, Any],
278 working_dir: Path,
279) -> str:
280 """Format worktree status information for display."""
281 output = _format_basic_worktree_info(status_info, working_dir)
282 session_output = _format_session_info(status_info.get("session_info"))
283 output.extend(session_output)
284 return "\n".join(output)
287def _format_basic_worktree_info(
288 status_info: dict[str, Any],
289 working_dir: Path,
290) -> list[str]:
291 """Format basic worktree information."""
292 return [
293 f"📂 Repository: {working_dir.name}",
294 f"🎯 Current worktree: {status_info['branch']}",
295 f"📁 Path: {status_info['path']}",
296 f"🧠 Has session: {'Yes' if status_info['has_session'] else 'No'}",
297 f"🔸 Detached HEAD: {'Yes' if status_info['is_detached'] else 'No'}\n",
298 ]
301def _format_session_info(session_info: dict[str, Any] | None) -> list[str]:
302 """Format session information if available."""
303 if not session_info:
304 return []
305 return [
306 "📊 Session Information:",
307 f" ID: {session_info.get('id', 'N/A')}",
308 f" Status: {session_info.get('status', 'unknown')}",
309 ]
312def _format_interruption_statistics(interruptions: list[dict[str, Any]]) -> list[str]:
313 """Format interruption statistics for display."""
314 if not interruptions:
315 return ["📊 **Interruption Patterns**: No recent interruptions"]
316 output = [
317 f"📊 **Interruption Patterns**: {len(interruptions)} interruptions",
318 ]
319 for i, interruption in enumerate(interruptions[:5], 1):
320 output.append(
321 f" {i}. {interruption.get('type', 'unknown')}: {interruption.get('timestamp', 'N/A')}",
322 )
323 return output
326def _format_snapshot_statistics(snapshots: list[dict[str, Any]]) -> list[str]:
327 """Format snapshot statistics for display."""
328 if not snapshots:
329 return ["💾 **Context Snapshots**: No snapshots available"]
330 output = [
331 f"💾 **Context Snapshots**: {len(snapshots)} snapshots",
332 ]
333 for i, snapshot in enumerate(snapshots[:5], 1):
334 output.append(
335 f" {i}. {snapshot.get('type', 'unknown')}: {snapshot.get('timestamp', 'N/A')}",
336 )
337 return output
340# ============================================================================
341# Session Setup & Helper Functions (14 functions)
342# ============================================================================
345def _setup_claude_directory(output: list[str]) -> dict[str, Any]:
346 """Setup Claude directory and return validation results."""
347 output.append("\n📋 Phase 1: Claude directory setup...")
348 # Placeholder implementation - actual logic should be in server.py
349 return {"status": "success", "directories_created": []}
352def _setup_uv_dependencies(output: list[str], current_dir: Path) -> None:
353 """Setup UV dependencies and package management."""
354 output.append("\n🔧 Phase 2: UV dependency management & session setup...")
357def _handle_uv_operations(
358 output: list[str],
359 current_dir: Path,
360 uv_trusted: bool,
361) -> None:
362 """Handle UV operations for dependency management."""
363 (current_dir / "pyproject.toml").exists()
366def _run_uv_sync_and_compile(output: list[str], current_dir: Path) -> None:
367 """Run UV sync and compile operations."""
368 # Sync dependencies
369 sync_result = subprocess.run(
370 ["uv", "sync"],
371 check=False,
372 capture_output=True,
373 text=True,
374 )
375 if sync_result.returncode == 0:
376 output.append("✅ UV sync completed successfully")
379def _setup_session_management(output: list[str]) -> None:
380 """Setup session management functionality."""
381 output.extend(
382 (
383 "\n🔧 Phase 3: Session management setup...",
384 "✅ Session management functionality ready",
385 " 📊 Conversation memory system enabled",
386 " 🔍 Semantic search capabilities available",
387 )
388 )
391def _add_final_summary(
392 output: list[str],
393 current_project: str,
394 context_score: int,
395 project_context: dict[str, Any],
396 claude_validation: dict[str, Any],
397) -> None:
398 """Add final summary information to output."""
399 output.extend(
400 (
401 "\n" + "=" * 60,
402 f"🎯 {current_project.upper()} SESSION INITIALIZATION COMPLETE",
403 "=" * 60,
404 )
405 )
408def _add_permissions_and_tools_summary(
409 output: list[str],
410 current_project: str,
411 permissions_manager: Any | None = None,
412) -> None:
413 """Add permissions summary and available tools."""
414 # Permissions Summary
415 if permissions_manager is not None:
416 permissions_status = permissions_manager.get_permission_status()
417 output.extend(
418 (
419 "\n🔐 Session Permissions Summary:",
420 f" 📊 Trusted operations: {permissions_status['trusted_operations_count']}",
421 )
422 )
423 else:
424 output.extend(
425 (
426 "\n🔐 Session Permissions Summary:",
427 " ⚠️ Permissions manager not available",
428 )
429 )
432def _add_session_health_insights(insights: list[str], quality_score: float) -> None:
433 """Add session health indicators to insights."""
434 if quality_score >= 80:
435 insights.append("Excellent session progress with optimal workflow patterns")
436 elif quality_score >= 60:
437 insights.append("Good session progress with minor optimization opportunities")
438 else:
439 insights.append(
440 "Session requires attention - potential workflow improvements needed",
441 )
444def _add_current_session_context(summary: dict[str, Any]) -> None:
445 """Add current session context to summary."""
446 current_dir = Path(os.environ.get("PWD", Path.cwd()))
447 if (current_dir / "session_buddy").exists():
448 summary["key_topics"].append("session-mgmt-mcp development")
451def _add_permissions_info(
452 output: list[str],
453 permissions_manager: Any | None = None,
454) -> None:
455 """Add permissions information to output."""
456 if permissions_manager is not None:
457 permissions_status = permissions_manager.get_permission_status()
458 output.extend(
459 (
460 "\n🔐 Session Permissions:",
461 f" 📊 Trusted operations: {permissions_status['trusted_operations_count']}",
462 )
463 )
464 if permissions_status["trusted_operations"]:
465 for op in permissions_status["trusted_operations"]:
466 output.append(f" ✅ {op.replace('_', ' ').title()}")
467 else:
468 output.append(
469 " ⚠️ No trusted operations yet - will prompt for permissions",
470 )
471 else:
472 output.extend(
473 (
474 "\n🔐 Session Permissions:",
475 " ⚠️ Permissions manager not available",
476 )
477 )
480def _add_basic_tools_info(output: list[str]) -> None:
481 """Add basic MCP tools information to output."""
482 output.extend(
483 (
484 "\n🛠️ Available MCP Tools:",
485 "• init - Full session initialization",
486 "• checkpoint - Quality monitoring",
487 "• end - Complete cleanup",
488 "• status - This status report with health checks",
489 "• permissions - Manage trusted operations",
490 "• git_worktree_list - List all git worktrees",
491 "• git_worktree_add - Create new worktrees",
492 "• git_worktree_remove - Remove worktrees",
493 "• git_worktree_status - Comprehensive worktree status",
494 "• git_worktree_prune - Clean up stale references",
495 )
496 )
499def _add_feature_status_info(output: list[str]) -> None:
500 """Add feature status information to output."""
501 # Token Optimization Status
502 if TOKEN_OPTIMIZER_AVAILABLE:
503 output.extend(
504 (
505 "\n⚡ Token Optimization:",
506 "• get_cached_chunk - Retrieve chunked response data",
507 "• get_token_usage_stats - Token usage and savings metrics",
508 "• optimize_memory_usage - Consolidate old conversations",
509 "• Built-in response chunking and truncation",
510 )
511 )
514def _add_configuration_info(output: list[str]) -> None:
515 """Add configuration information to output."""
516 if CONFIG_AVAILABLE:
517 output.extend(
518 (
519 "\n⚙️ Configuration:",
520 "• pyproject.toml configuration support",
521 "• Environment variable overrides",
522 "• Configurable database, search, and optimization settings",
523 )
524 )
527def _add_crackerjack_integration_info(output: list[str]) -> None:
528 """Add Crackerjack integration information to output."""
529 if CRACKERJACK_INTEGRATION_AVAILABLE:
530 output.extend(
531 (
532 "\n🔧 Crackerjack Integration (Enhanced):",
533 "\n🎯 RECOMMENDED COMMANDS (Enhanced with Memory & Analytics):",
534 "• /session-buddy:crackerjack-run <command> - Smart execution with insights",
535 "• /session-buddy:crackerjack-history - View trends and patterns",
536 "• /session-buddy:crackerjack-metrics - Quality metrics over time",
537 "• /session-buddy:crackerjack-patterns - Test failure analysis",
538 "• /session-buddy:crackerjack-help - Complete command guide",
539 )
540 )