Coverage for session_buddy / core / session_manager.py: 79.72%
331 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1#!/usr/bin/env python3
2"""Session lifecycle management for session-buddy.
4This module handles session initialization, quality assessment, checkpoints,
5and cleanup operations.
6"""
8import asyncio
9import importlib
10import logging
11import os
12import shutil
13import sys
14import typing as t
15from contextlib import suppress
16from datetime import datetime
17from pathlib import Path
19from session_buddy.utils.git_operations import (
20 create_checkpoint_commit,
21 is_git_repository,
22)
25def get_session_logger() -> logging.Logger:
26 """Get the session logger instance.
28 This function is used in tests for mocking purposes.
29 """
30 return logging.getLogger(__name__)
33class SessionLifecycleManager:
34 """Manages session lifecycle operations."""
36 def __init__(self, logger: logging.Logger | None = None) -> None:
37 """Initialize session lifecycle manager.
39 Args:
40 logger: Logger instance (injected by DI container or standard logger)
42 """
43 if logger is None:
44 logger = logging.getLogger(__name__)
46 self.logger = logger
47 self.current_project: str | None = None
48 self._quality_history: dict[str, list[int]] = {} # project -> [scores]
50 # Initialize templates renderer for handoff documentation
51 self.templates: t.Any | None = None
52 self._initialize_templates()
54 def _initialize_templates(self) -> None:
55 """Initialize Jinja2 environment for handoff documentation."""
56 try:
57 from jinja2 import Environment, FileSystemLoader, select_autoescape
59 templates_dir = Path(__file__).parent.parent.parent / "templates"
60 self.templates = Environment(
61 loader=FileSystemLoader(str(templates_dir)),
62 autoescape=select_autoescape(["html", "xml"]),
63 )
64 self.logger.info(
65 "Templates environment initialized, templates_dir=%s",
66 str(templates_dir),
67 )
68 except Exception as e:
69 self.logger.warning(
70 "Templates environment initialization failed, using fallback, error=%s",
71 str(e),
72 )
73 self.templates = None
75 async def calculate_quality_score(
76 self,
77 project_dir: Path | None = None,
78 ) -> dict[str, t.Any]:
79 """Calculate session quality score using V2 algorithm.
81 Delegates to the centralized quality scoring in server.py to avoid
82 code duplication and ensure consistent scoring across the system.
84 Args:
85 project_dir: Path to the project directory. If not provided, will use current directory.
87 """
88 if project_dir is None:
89 project_dir = Path.cwd()
91 if "session_buddy.server" in sys.modules:
92 server = sys.modules["session_buddy.server"]
93 else:
94 server = await asyncio.to_thread(
95 importlib.import_module,
96 "session_buddy.server",
97 )
99 return t.cast(
100 "dict[str, t.Any]",
101 await server.calculate_quality_score(project_dir=project_dir),
102 )
104 def _calculate_project_score(self, project_context: dict[str, bool]) -> float:
105 """Calculate project health score (40% of total)."""
106 return (
107 sum(1 for detected in project_context.values() if detected)
108 / len(project_context)
109 ) * 40
111 def _calculate_permissions_score(self) -> int:
112 """Calculate permissions health score (20% of total)."""
113 try:
114 from session_buddy.server import permissions_manager
116 if hasattr(permissions_manager, "trusted_operations"):
117 trusted_count = len(permissions_manager.trusted_operations)
118 return min(
119 trusted_count * 4,
120 20,
121 ) # 4 points per trusted operation, max 20
122 return 10 # Basic score if we can't access trusted operations
123 except (ImportError, AttributeError):
124 return 10 # Fallback score
126 def _calculate_session_score(self) -> int:
127 """Calculate session management score (20% of total)."""
128 return 20 # Always available in this refactored version
130 def _calculate_tool_score(self) -> int:
131 """Calculate tool availability score (20% of total)."""
132 uv_available = shutil.which("uv") is not None
133 return 20 if uv_available else 10
135 def _format_quality_score_result(
136 self,
137 total_score: int,
138 project_score: float,
139 permissions_score: int,
140 session_score: int,
141 tool_score: int,
142 project_context: dict[str, bool],
143 uv_available: bool,
144 ) -> dict[str, t.Any]:
145 """Format the quality score calculation result."""
146 return {
147 "total_score": total_score,
148 "breakdown": {
149 "project_health": project_score,
150 "permissions": permissions_score,
151 "session_management": session_score,
152 "tools": tool_score,
153 },
154 "recommendations": self._generate_quality_recommendations(
155 total_score,
156 project_context,
157 uv_available,
158 ),
159 }
161 def _generate_quality_recommendations(
162 self,
163 score: int,
164 project_context: dict[str, t.Any],
165 uv_available: bool,
166 ) -> list[str]:
167 """Generate quality improvement recommendations based on score factors."""
168 recommendations = []
170 if score < 50:
171 recommendations.append(
172 "Session needs attention - multiple areas for improvement",
173 )
175 if not project_context.get("has_pyproject_toml"):
176 recommendations.append(
177 "Consider adding pyproject.toml for modern Python project structure",
178 )
180 if not project_context.get("has_git_repo"):
181 recommendations.append("Initialize git repository for version control")
183 if not uv_available:
184 recommendations.append(
185 "Install UV package manager for improved dependency management",
186 )
188 if not project_context.get("has_tests"):
189 recommendations.append("Add test suite to improve code quality")
191 if score >= 80:
192 recommendations.append("Excellent session setup! Keep up the good work")
193 elif score >= 60:
194 recommendations.append("Good session quality with room for optimization")
196 return recommendations[:5] # Limit to top 5 recommendations
198 async def perform_quality_assessment(
199 self,
200 project_dir: Path | None = None,
201 ) -> tuple[int, dict[str, t.Any]]:
202 """Perform quality assessment and return score and data."""
203 quality_data = await self.calculate_quality_score(project_dir=project_dir)
204 quality_score = quality_data["total_score"]
205 return quality_score, quality_data
207 def _format_trust_score(self, trust: t.Any) -> list[str]:
208 """Format trust score section (helper to reduce complexity). Target complexity: ≤5."""
209 output = []
210 # Defensive check: trust_score may be a dict or object with total attribute
211 if hasattr(trust, "total"): 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 total_score = trust.total
213 elif isinstance(trust, dict) and "total" in trust:
214 total_score = trust["total"]
215 else:
216 total_score = 0
218 if total_score > 0:
219 output.append(f"\n🔐 Trust score: {total_score:.0f}/100 (separate metric)")
220 # Handle both dict and object-based trust score
221 if isinstance(trust, dict): 221 ↛ 224line 221 didn't jump to line 224 because the condition on line 221 was always true
222 details = trust.get("details", {})
223 else:
224 details = getattr(trust, "details", {})
225 if not isinstance(details, dict):
226 details = {}
228 # Only show breakdown if available
229 if details: 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true
230 output.extend(
231 (
232 f" • Trusted operations: {details.get('permissions_count', 0)}/40",
233 f" • Session features: {details.get('session_available', False)} (available)",
234 f" • Tool ecosystem: {details.get('tool_count', 0)} tools",
235 )
236 )
237 return output
239 def format_quality_results(
240 self,
241 quality_score: int,
242 quality_data: dict[str, t.Any],
243 checkpoint_result: dict[str, t.Any] | None = None,
244 ) -> list[str]:
245 """Format quality assessment results for display. Target complexity: ≤10."""
246 output = []
248 # Quality status
249 if quality_score >= 80:
250 output.append(f"✅ Session quality: EXCELLENT (Score: {quality_score}/100)")
251 elif quality_score >= 60:
252 output.append(f"✅ Session quality: GOOD (Score: {quality_score}/100)")
253 else:
254 output.append(
255 f"⚠️ Session quality: NEEDS ATTENTION (Score: {quality_score}/100)",
256 )
258 # Quality breakdown - V2 format (actual code quality metrics)
259 output.append("\n📈 Quality breakdown (code health metrics):")
260 breakdown = quality_data["breakdown"]
261 output.extend(
262 (
263 f" • Code quality: {breakdown['code_quality']:.1f}/40",
264 f" • Project health: {breakdown['project_health']:.1f}/30",
265 f" • Dev velocity: {breakdown['dev_velocity']:.1f}/20",
266 f" • Security: {breakdown['security']:.1f}/10",
267 )
268 )
270 # Trust score (separate from quality) - extracted to helper
271 if "trust_score" in quality_data:
272 output.extend(self._format_trust_score(quality_data["trust_score"]))
274 # Recommendations
275 recommendations = quality_data["recommendations"]
276 if recommendations:
277 output.append("\n💡 Recommendations:")
278 for rec in recommendations[:3]:
279 output.append(f" • {rec}")
281 # Session management specific results
282 if checkpoint_result:
283 strengths = checkpoint_result.get("strengths", [])
284 if strengths: 284 ↛ 289line 284 didn't jump to line 289 because the condition on line 284 was always true
285 output.append("\n🌟 Session strengths:")
286 for strength in strengths[:3]:
287 output.append(f" • {strength}")
289 session_stats = checkpoint_result.get("session_stats", {})
290 if session_stats: 290 ↛ 300line 290 didn't jump to line 300 because the condition on line 290 was always true
291 output.extend(
292 (
293 "\n⏱️ Session progress:",
294 f" • Duration: {session_stats.get('duration_minutes', 0)} minutes",
295 f" • Checkpoints: {session_stats.get('total_checkpoints', 0)}",
296 f" • Success rate: {session_stats.get('success_rate', 0):.1f}%",
297 )
298 )
300 return output
302 async def perform_git_checkpoint(
303 self,
304 current_dir: Path,
305 quality_score: int,
306 ) -> list[str]:
307 """Handle git operations for checkpoint commit using the new git utilities."""
308 output = []
309 output.extend(("\n" + "=" * 50, "📦 Git Checkpoint Commit", "=" * 50))
311 try:
312 # Use the new git utilities
313 success, result, git_output = create_checkpoint_commit(
314 current_dir,
315 self.current_project or "Unknown",
316 quality_score,
317 )
319 output.extend(git_output)
321 if success and result != "clean":
322 self.logger.info(
323 "Checkpoint commit created, project=%s, commit_hash=%s, quality_score=%d",
324 self.current_project,
325 result,
326 quality_score,
327 )
329 except Exception as e:
330 output.append(f"\n⚠️ Git operations error: {e}")
331 self.logger.exception(
332 "Git checkpoint error occurred, error=%s, project=%s",
333 str(e),
334 self.current_project,
335 )
337 return output
339 def _setup_working_directory(self, working_directory: str | None) -> Path:
340 """Set up working directory and project name."""
341 if working_directory:
342 os.chdir(working_directory)
344 current_dir = Path.cwd()
345 self.current_project = current_dir.name
346 return current_dir
348 def _setup_claude_directories(self) -> Path:
349 """Create .claude directory structure."""
350 claude_dir = Path.home() / ".claude"
351 claude_dir.mkdir(exist_ok=True)
352 (claude_dir / "data").mkdir(exist_ok=True)
353 (claude_dir / "logs").mkdir(exist_ok=True)
354 return claude_dir
356 def _discover_session_files(self, current_dir: Path) -> list[Path]:
357 """Discover session files in the current directory and subdirectories."""
358 return [
359 file_path
360 for file_path in current_dir.rglob("*.session.json")
361 if file_path.is_file()
362 ]
364 async def _read_previous_session_info(
365 self, file_path: Path
366 ) -> dict[str, t.Any] | None:
367 """Read previous session information from a file - handles both JSON and markdown files."""
368 try:
369 content = file_path.read_text(encoding="utf-8")
371 # Try JSON first
372 import json
374 try:
375 data = json.loads(content)
376 # Ensure the return type is properly typed as dict[str, t.Any] | None
377 if isinstance(data, dict):
378 return data # type: ignore[return-value]
379 return None
380 except json.JSONDecodeError:
381 # If not JSON, try to parse as markdown handoff file
382 from session_buddy.core.lifecycle.session_info import (
383 parse_session_file,
384 )
386 # Parse the markdown file content
387 session_info = await parse_session_file(file_path)
389 # Convert SessionInfo to dictionary format expected by the system
390 if session_info.is_complete(): 390 ↛ 399line 390 didn't jump to line 399 because the condition on line 390 was always true
391 return {
392 "ended_at": session_info.ended_at,
393 "quality_score": session_info.quality_score,
394 "working_directory": session_info.working_directory,
395 "top_recommendation": session_info.top_recommendation,
396 "session_id": session_info.session_id,
397 }
399 return None
400 except OSError:
401 return None
403 def _find_latest_handoff_file(self, current_dir: Path) -> Path | None:
404 """Find the latest handoff file in the project - supports both JSON and markdown files."""
405 # Look for markdown handoff files in the current directory (legacy format)
406 legacy_handoff_files = list(current_dir.glob("session_handoff_*.md"))
407 latest_legacy = None
408 if legacy_handoff_files:
409 latest_legacy = max(legacy_handoff_files, key=lambda f: f.stat().st_mtime)
411 # Look in the .crackerjack/session/handoff directory for newer markdown files
412 crackerjack_handoff_dir = current_dir / ".crackerjack" / "session" / "handoff"
413 if crackerjack_handoff_dir.exists():
414 handoff_files = list(crackerjack_handoff_dir.glob("session_handoff_*.md"))
415 if handoff_files: 415 ↛ 429line 415 didn't jump to line 429 because the condition on line 415 was always true
416 latest_nested = max(handoff_files, key=lambda f: f.stat().st_mtime)
417 # Compare with legacy files if present and return the most recent
418 if ( 418 ↛ 422line 418 didn't jump to line 422 because the condition on line 418 was never true
419 latest_legacy
420 and latest_nested.stat().st_mtime < latest_legacy.stat().st_mtime
421 ):
422 return latest_legacy
423 return latest_nested
424 # If the nested directory doesn't exist, return the legacy file if found
425 elif latest_legacy:
426 return latest_legacy
428 # Next, look for JSON handoff files anywhere in the directory
429 handoff_files = list(current_dir.rglob("*.handoff.json"))
430 if handoff_files: 430 ↛ 431line 430 didn't jump to line 431 because the condition on line 430 was never true
431 return max(handoff_files, key=lambda f: f.stat().st_mtime)
433 # Finally, fall back to any session-related JSON files
434 session_files = list(current_dir.rglob("*.session.json"))
435 if session_files: 435 ↛ 436line 435 didn't jump to line 436 because the condition on line 435 was never true
436 return max(session_files, key=lambda f: f.stat().st_mtime)
438 return None
440 async def _get_previous_session_info(
441 self,
442 current_dir: Path,
443 ) -> dict[str, t.Any] | None:
444 """Get previous session information if available. Target complexity: ≤5."""
445 session_files = self._discover_session_files(current_dir)
447 for file_path in session_files: 447 ↛ 448line 447 didn't jump to line 448 because the loop on line 447 never started
448 session_info = await self._read_previous_session_info(file_path)
449 if session_info:
450 return session_info
452 # Fallback to old method
453 latest_handoff = self._find_latest_handoff_file(current_dir)
454 if latest_handoff:
455 return await self._read_previous_session_info(latest_handoff)
457 return None
459 async def analyze_project_context(self, current_dir: Path) -> dict[str, bool]:
460 """Analyze project context and return relevant information."""
461 # Ensure current_dir is a Path object
462 current_dir = Path(current_dir)
464 def _safe_any_glob(pattern: str) -> bool:
465 try:
466 return any(current_dir.glob(pattern))
467 except (OSError, PermissionError):
468 return False
470 # This is a basic implementation; could be expanded based on requirements
471 has_git_repo = is_git_repository(
472 current_dir
473 ) # Use the function from git_operations
474 has_readme = _safe_any_glob("README*")
475 has_pyproject_toml = (current_dir / "pyproject.toml").is_file()
476 has_setup_py = (current_dir / "setup.py").is_file()
477 has_requirements_txt = (current_dir / "requirements.txt").is_file()
478 has_src_structure = (current_dir / "src").is_dir()
479 has_tests = _safe_any_glob("test*") or _safe_any_glob("**/test*")
480 has_docs = _safe_any_glob("docs/**") or _safe_any_glob("**/*.md")
481 has_ci_cd = (
482 (current_dir / ".github").exists()
483 or (current_dir / ".gitlab").exists()
484 or (current_dir / ".circleci").exists()
485 )
486 has_venv = (current_dir / ".venv").exists() or (current_dir / "venv").exists()
487 has_python_files = _safe_any_glob("**/*.py")
489 # Detect commonly used Python web frameworks and libraries
490 requirements_content = ""
491 with suppress(OSError, PermissionError):
492 if (current_dir / "requirements.txt").is_file():
493 requirements_content += (current_dir / "requirements.txt").read_text()
494 with suppress(OSError, PermissionError):
495 if (current_dir / "pyproject.toml").is_file():
496 requirements_content += (current_dir / "pyproject.toml").read_text()
498 # Scan Python files for framework imports (first 10 files as suggested by test)
499 try:
500 python_files = list(current_dir.glob("**/*.py"))[:10]
501 except (OSError, PermissionError):
502 python_files = []
503 for py_file in python_files:
504 try:
505 content = py_file.read_text()
506 requirements_content += content # Add file content to check for imports
507 except (OSError, UnicodeDecodeError):
508 # Skip files that can't be read
509 continue
511 uses_fastapi = "fastapi" in requirements_content.lower()
512 uses_django = "django" in requirements_content.lower()
513 uses_flask = "flask" in requirements_content.lower()
515 return {
516 "has_git_repo": has_git_repo,
517 "has_readme": has_readme,
518 "has_pyproject_toml": has_pyproject_toml,
519 "has_setup_py": has_setup_py,
520 "has_requirements_txt": has_requirements_txt,
521 "has_src_structure": has_src_structure,
522 "has_tests": has_tests,
523 "has_docs": has_docs,
524 "has_ci_cd": has_ci_cd,
525 "has_venv": has_venv,
526 "has_python_files": has_python_files,
527 "uses_fastapi": uses_fastapi,
528 "uses_django": uses_django,
529 "uses_flask": uses_flask,
530 }
532 async def _generate_handoff_documentation(
533 self, summary: dict[str, t.Any], quality_data: dict[str, t.Any]
534 ) -> str:
535 """Generate handoff documentation based on session summary and quality data."""
536 from datetime import datetime
538 # Format as markdown document
539 markdown_content = []
540 markdown_content.append(
541 f"# Session Handoff Report - {summary.get('project', 'unknown')}"
542 )
543 markdown_content.append(
544 f"\n**Session ended:** {summary.get('session_end_time', datetime.now().isoformat())}"
545 )
546 markdown_content.append(
547 f"**Final quality score:** {summary.get('final_quality_score', 0)}/100"
548 )
549 markdown_content.append(
550 f"**Working directory:** {summary.get('working_directory', 'N/A')}"
551 )
552 markdown_content.append("")
554 if summary.get("recommendations"): 554 ↛ 561line 554 didn't jump to line 561 because the condition on line 554 was always true
555 markdown_content.append("## Recommendations")
556 for rec in summary["recommendations"]:
557 markdown_content.append(f"- {rec}")
558 markdown_content.append("")
560 # Add quality details
561 breakdown = quality_data.get("breakdown", {})
562 if breakdown: 562 ↛ 568line 562 didn't jump to line 568 because the condition on line 562 was always true
563 markdown_content.append("## Quality Breakdown")
564 for key, value in breakdown.items():
565 markdown_content.append(f"- {key}: {value}")
566 markdown_content.append("")
568 return "\n".join(markdown_content)
570 def _save_handoff_documentation(
571 self, content: str, current_dir: Path
572 ) -> Path | None:
573 """Save handoff documentation to a file."""
574 from datetime import datetime
576 try:
577 # Save to .claude/handoff/ directory instead of project root
578 handoff_dir = current_dir / ".claude" / "handoff"
579 handoff_dir.mkdir(parents=True, exist_ok=True)
581 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
582 handoff_file = handoff_dir / f"session_handoff_{timestamp}.md"
583 handoff_file.write_text(content)
584 return handoff_file
585 except Exception:
586 # Return None on any failure to save
587 return None
589 async def initialize_session(
590 self,
591 working_directory: str | None = None,
592 ) -> dict[str, t.Any]:
593 """Initialize a new session with comprehensive setup."""
594 try:
595 # Setup directories and project
596 current_dir = self._setup_working_directory(working_directory)
597 claude_dir = self._setup_claude_directories()
599 # Analyze project and assess quality
600 project_context = await self.analyze_project_context(current_dir)
601 quality_score, quality_data = await self.perform_quality_assessment(
602 project_dir=current_dir,
603 )
605 # Get previous session info
606 previous_session_info = await self._get_previous_session_info(current_dir)
608 self.logger.info(
609 "Session initialized, project=%s, quality_score=%d, working_directory=%s, has_previous_session=%s",
610 self.current_project,
611 quality_score,
612 str(current_dir),
613 previous_session_info is not None,
614 )
616 return {
617 "success": True,
618 "project": self.current_project,
619 "working_directory": str(current_dir),
620 "quality_score": quality_score,
621 "quality_data": quality_data,
622 "project_context": project_context,
623 "claude_directory": str(claude_dir),
624 "previous_session": previous_session_info,
625 }
627 except Exception as e:
628 self.logger.exception("Session initialization failed: %s", str(e))
629 return {"success": False, "error": str(e)}
631 def get_previous_quality_score(self, project: str) -> int | None:
632 """Get the most recent quality score for a project."""
633 scores = self._quality_history.get(project, [])
634 return scores[-1] if scores else None
636 def record_quality_score(self, project: str, score: int) -> None:
637 """Record a quality score for quality trend tracking."""
638 if project not in self._quality_history:
639 self._quality_history[project] = []
640 self._quality_history[project].append(score)
641 # Keep only last 10 scores to prevent unbounded growth
642 if len(self._quality_history[project]) > 10:
643 self._quality_history[project] = self._quality_history[project][-10:]
645 async def checkpoint_session(
646 self,
647 working_directory: str | None = None,
648 is_manual: bool = False,
649 ) -> dict[str, t.Any]:
650 """Perform a comprehensive session checkpoint.
652 Args:
653 working_directory: Optional working directory override
654 is_manual: Whether this is a manually-triggered checkpoint
656 Returns:
657 Dictionary containing checkpoint results and auto-store decision
659 """
660 try:
661 current_dir = Path(working_directory) if working_directory else Path.cwd()
662 self.current_project = current_dir.name
664 # Quality assessment
665 quality_score, quality_data = await self.perform_quality_assessment(
666 project_dir=current_dir,
667 )
669 # Get previous score for trend analysis
670 previous_score = self.get_previous_quality_score(self.current_project)
672 # Record this score for future comparisons
673 self.record_quality_score(self.current_project, quality_score)
675 # Determine if reflection should be auto-stored
676 from session_buddy.utils.reflection_utils import (
677 format_auto_store_summary,
678 should_auto_store_checkpoint,
679 )
681 auto_store_decision = should_auto_store_checkpoint(
682 quality_score=quality_score,
683 previous_score=previous_score,
684 is_manual=is_manual,
685 session_phase="checkpoint",
686 )
688 # Git checkpoint
689 git_output = await self.perform_git_checkpoint(current_dir, quality_score)
691 # Format results
692 quality_output = self.format_quality_results(quality_score, quality_data)
694 self.logger.info(
695 "Session checkpoint completed, project=%s, quality_score=%d, auto_store_decision=%s, auto_store_reason=%s",
696 self.current_project,
697 quality_score,
698 auto_store_decision.should_store,
699 auto_store_decision.reason.value,
700 )
702 return {
703 "success": True,
704 "quality_score": quality_score,
705 "quality_output": quality_output,
706 "git_output": git_output,
707 "timestamp": datetime.now().isoformat(),
708 "auto_store_decision": auto_store_decision,
709 "auto_store_summary": format_auto_store_summary(auto_store_decision),
710 }
712 except Exception as e:
713 self.logger.exception("Session checkpoint failed, error=%s", str(e))
714 return {"success": False, "error": str(e)}
716 async def end_session(
717 self,
718 working_directory: str | None = None,
719 ) -> dict[str, t.Any]:
720 """End the current session with cleanup and summary."""
721 try:
722 current_dir = Path(working_directory) if working_directory else Path.cwd()
723 self.current_project = current_dir.name
725 # Final quality assessment
726 quality_score, quality_data = await self.perform_quality_assessment(
727 project_dir=current_dir,
728 )
730 # Create session summary
731 summary = {
732 "project": self.current_project,
733 "final_quality_score": quality_score,
734 "session_end_time": datetime.now().isoformat(),
735 "working_directory": str(current_dir),
736 "recommendations": quality_data.get("recommendations", []),
737 }
739 # Generate handoff documentation
740 handoff_content = await self._generate_handoff_documentation(
741 summary,
742 quality_data,
743 )
745 # Save handoff documentation
746 handoff_path = self._save_handoff_documentation(
747 handoff_content,
748 current_dir,
749 )
751 self.logger.info(
752 "Session ended, project=%s, final_quality_score=%d",
753 self.current_project,
754 quality_score,
755 )
757 summary["handoff_documentation"] = (
758 str(handoff_path) if handoff_path else None
759 )
761 return {"success": True, "summary": summary}
763 except Exception as e:
764 self.logger.exception("Session end failed, error=%s", str(e))
765 return {"success": False, "error": str(e)}
767 async def get_session_status(
768 self,
769 working_directory: str | None = None,
770 ) -> dict[str, t.Any]:
771 """Get current session status and health information."""
772 try:
773 current_dir = Path(working_directory) if working_directory else Path.cwd()
775 self.current_project = current_dir.name
777 # Get comprehensive status
778 project_context = await self.analyze_project_context(current_dir)
779 quality_score, quality_data = await self.perform_quality_assessment(
780 project_dir=current_dir,
781 )
783 # Check system health
784 uv_available = shutil.which("uv") is not None
785 git_available = is_git_repository(current_dir)
786 claude_dir = Path.home() / ".claude"
787 claude_dir_exists = claude_dir.exists()
789 return {
790 "success": True,
791 "project": self.current_project,
792 "working_directory": str(current_dir),
793 "quality_score": quality_score,
794 "quality_breakdown": quality_data["breakdown"],
795 "recommendations": quality_data["recommendations"],
796 "project_context": project_context,
797 "system_health": {
798 "uv_available": uv_available,
799 "git_repository": git_available,
800 "claude_directory": claude_dir_exists,
801 },
802 "timestamp": datetime.now().isoformat(),
803 }
805 except Exception as e:
806 self.logger.exception("Failed to get session status, error=%s", str(e))
807 return {"success": False, "error": str(e)}