Coverage for session_buddy / core / session_manager.py: 79.72%

331 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1#!/usr/bin/env python3 

2"""Session lifecycle management for session-buddy. 

3 

4This module handles session initialization, quality assessment, checkpoints, 

5and cleanup operations. 

6""" 

7 

8import asyncio 

9import importlib 

10import logging 

11import os 

12import shutil 

13import sys 

14import typing as t 

15from contextlib import suppress 

16from datetime import datetime 

17from pathlib import Path 

18 

19from session_buddy.utils.git_operations import ( 

20 create_checkpoint_commit, 

21 is_git_repository, 

22) 

23 

24 

25def get_session_logger() -> logging.Logger: 

26 """Get the session logger instance. 

27 

28 This function is used in tests for mocking purposes. 

29 """ 

30 return logging.getLogger(__name__) 

31 

32 

33class SessionLifecycleManager: 

34 """Manages session lifecycle operations.""" 

35 

36 def __init__(self, logger: logging.Logger | None = None) -> None: 

37 """Initialize session lifecycle manager. 

38 

39 Args: 

40 logger: Logger instance (injected by DI container or standard logger) 

41 

42 """ 

43 if logger is None: 

44 logger = logging.getLogger(__name__) 

45 

46 self.logger = logger 

47 self.current_project: str | None = None 

48 self._quality_history: dict[str, list[int]] = {} # project -> [scores] 

49 

50 # Initialize templates renderer for handoff documentation 

51 self.templates: t.Any | None = None 

52 self._initialize_templates() 

53 

54 def _initialize_templates(self) -> None: 

55 """Initialize Jinja2 environment for handoff documentation.""" 

56 try: 

57 from jinja2 import Environment, FileSystemLoader, select_autoescape 

58 

59 templates_dir = Path(__file__).parent.parent.parent / "templates" 

60 self.templates = Environment( 

61 loader=FileSystemLoader(str(templates_dir)), 

62 autoescape=select_autoescape(["html", "xml"]), 

63 ) 

64 self.logger.info( 

65 "Templates environment initialized, templates_dir=%s", 

66 str(templates_dir), 

67 ) 

68 except Exception as e: 

69 self.logger.warning( 

70 "Templates environment initialization failed, using fallback, error=%s", 

71 str(e), 

72 ) 

73 self.templates = None 

74 

75 async def calculate_quality_score( 

76 self, 

77 project_dir: Path | None = None, 

78 ) -> dict[str, t.Any]: 

79 """Calculate session quality score using V2 algorithm. 

80 

81 Delegates to the centralized quality scoring in server.py to avoid 

82 code duplication and ensure consistent scoring across the system. 

83 

84 Args: 

85 project_dir: Path to the project directory. If not provided, will use current directory. 

86 

87 """ 

88 if project_dir is None: 

89 project_dir = Path.cwd() 

90 

91 if "session_buddy.server" in sys.modules: 

92 server = sys.modules["session_buddy.server"] 

93 else: 

94 server = await asyncio.to_thread( 

95 importlib.import_module, 

96 "session_buddy.server", 

97 ) 

98 

99 return t.cast( 

100 "dict[str, t.Any]", 

101 await server.calculate_quality_score(project_dir=project_dir), 

102 ) 

103 

104 def _calculate_project_score(self, project_context: dict[str, bool]) -> float: 

105 """Calculate project health score (40% of total).""" 

106 return ( 

107 sum(1 for detected in project_context.values() if detected) 

108 / len(project_context) 

109 ) * 40 

110 

111 def _calculate_permissions_score(self) -> int: 

112 """Calculate permissions health score (20% of total).""" 

113 try: 

114 from session_buddy.server import permissions_manager 

115 

116 if hasattr(permissions_manager, "trusted_operations"): 

117 trusted_count = len(permissions_manager.trusted_operations) 

118 return min( 

119 trusted_count * 4, 

120 20, 

121 ) # 4 points per trusted operation, max 20 

122 return 10 # Basic score if we can't access trusted operations 

123 except (ImportError, AttributeError): 

124 return 10 # Fallback score 

125 

126 def _calculate_session_score(self) -> int: 

127 """Calculate session management score (20% of total).""" 

128 return 20 # Always available in this refactored version 

129 

130 def _calculate_tool_score(self) -> int: 

131 """Calculate tool availability score (20% of total).""" 

132 uv_available = shutil.which("uv") is not None 

133 return 20 if uv_available else 10 

134 

135 def _format_quality_score_result( 

136 self, 

137 total_score: int, 

138 project_score: float, 

139 permissions_score: int, 

140 session_score: int, 

141 tool_score: int, 

142 project_context: dict[str, bool], 

143 uv_available: bool, 

144 ) -> dict[str, t.Any]: 

145 """Format the quality score calculation result.""" 

146 return { 

147 "total_score": total_score, 

148 "breakdown": { 

149 "project_health": project_score, 

150 "permissions": permissions_score, 

151 "session_management": session_score, 

152 "tools": tool_score, 

153 }, 

154 "recommendations": self._generate_quality_recommendations( 

155 total_score, 

156 project_context, 

157 uv_available, 

158 ), 

159 } 

160 

161 def _generate_quality_recommendations( 

162 self, 

163 score: int, 

164 project_context: dict[str, t.Any], 

165 uv_available: bool, 

166 ) -> list[str]: 

167 """Generate quality improvement recommendations based on score factors.""" 

168 recommendations = [] 

169 

170 if score < 50: 

171 recommendations.append( 

172 "Session needs attention - multiple areas for improvement", 

173 ) 

174 

175 if not project_context.get("has_pyproject_toml"): 

176 recommendations.append( 

177 "Consider adding pyproject.toml for modern Python project structure", 

178 ) 

179 

180 if not project_context.get("has_git_repo"): 

181 recommendations.append("Initialize git repository for version control") 

182 

183 if not uv_available: 

184 recommendations.append( 

185 "Install UV package manager for improved dependency management", 

186 ) 

187 

188 if not project_context.get("has_tests"): 

189 recommendations.append("Add test suite to improve code quality") 

190 

191 if score >= 80: 

192 recommendations.append("Excellent session setup! Keep up the good work") 

193 elif score >= 60: 

194 recommendations.append("Good session quality with room for optimization") 

195 

196 return recommendations[:5] # Limit to top 5 recommendations 

197 

198 async def perform_quality_assessment( 

199 self, 

200 project_dir: Path | None = None, 

201 ) -> tuple[int, dict[str, t.Any]]: 

202 """Perform quality assessment and return score and data.""" 

203 quality_data = await self.calculate_quality_score(project_dir=project_dir) 

204 quality_score = quality_data["total_score"] 

205 return quality_score, quality_data 

206 

207 def _format_trust_score(self, trust: t.Any) -> list[str]: 

208 """Format trust score section (helper to reduce complexity). Target complexity: ≤5.""" 

209 output = [] 

210 # Defensive check: trust_score may be a dict or object with total attribute 

211 if hasattr(trust, "total"): 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 total_score = trust.total 

213 elif isinstance(trust, dict) and "total" in trust: 

214 total_score = trust["total"] 

215 else: 

216 total_score = 0 

217 

218 if total_score > 0: 

219 output.append(f"\n🔐 Trust score: {total_score:.0f}/100 (separate metric)") 

220 # Handle both dict and object-based trust score 

221 if isinstance(trust, dict): 221 ↛ 224line 221 didn't jump to line 224 because the condition on line 221 was always true

222 details = trust.get("details", {}) 

223 else: 

224 details = getattr(trust, "details", {}) 

225 if not isinstance(details, dict): 

226 details = {} 

227 

228 # Only show breakdown if available 

229 if details: 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true

230 output.extend( 

231 ( 

232 f" • Trusted operations: {details.get('permissions_count', 0)}/40", 

233 f" • Session features: {details.get('session_available', False)} (available)", 

234 f" • Tool ecosystem: {details.get('tool_count', 0)} tools", 

235 ) 

236 ) 

237 return output 

238 

239 def format_quality_results( 

240 self, 

241 quality_score: int, 

242 quality_data: dict[str, t.Any], 

243 checkpoint_result: dict[str, t.Any] | None = None, 

244 ) -> list[str]: 

245 """Format quality assessment results for display. Target complexity: ≤10.""" 

246 output = [] 

247 

248 # Quality status 

249 if quality_score >= 80: 

250 output.append(f"✅ Session quality: EXCELLENT (Score: {quality_score}/100)") 

251 elif quality_score >= 60: 

252 output.append(f"✅ Session quality: GOOD (Score: {quality_score}/100)") 

253 else: 

254 output.append( 

255 f"⚠️ Session quality: NEEDS ATTENTION (Score: {quality_score}/100)", 

256 ) 

257 

258 # Quality breakdown - V2 format (actual code quality metrics) 

259 output.append("\n📈 Quality breakdown (code health metrics):") 

260 breakdown = quality_data["breakdown"] 

261 output.extend( 

262 ( 

263 f" • Code quality: {breakdown['code_quality']:.1f}/40", 

264 f" • Project health: {breakdown['project_health']:.1f}/30", 

265 f" • Dev velocity: {breakdown['dev_velocity']:.1f}/20", 

266 f" • Security: {breakdown['security']:.1f}/10", 

267 ) 

268 ) 

269 

270 # Trust score (separate from quality) - extracted to helper 

271 if "trust_score" in quality_data: 

272 output.extend(self._format_trust_score(quality_data["trust_score"])) 

273 

274 # Recommendations 

275 recommendations = quality_data["recommendations"] 

276 if recommendations: 

277 output.append("\n💡 Recommendations:") 

278 for rec in recommendations[:3]: 

279 output.append(f"{rec}") 

280 

281 # Session management specific results 

282 if checkpoint_result: 

283 strengths = checkpoint_result.get("strengths", []) 

284 if strengths: 284 ↛ 289line 284 didn't jump to line 289 because the condition on line 284 was always true

285 output.append("\n🌟 Session strengths:") 

286 for strength in strengths[:3]: 

287 output.append(f"{strength}") 

288 

289 session_stats = checkpoint_result.get("session_stats", {}) 

290 if session_stats: 290 ↛ 300line 290 didn't jump to line 300 because the condition on line 290 was always true

291 output.extend( 

292 ( 

293 "\n⏱️ Session progress:", 

294 f" • Duration: {session_stats.get('duration_minutes', 0)} minutes", 

295 f" • Checkpoints: {session_stats.get('total_checkpoints', 0)}", 

296 f" • Success rate: {session_stats.get('success_rate', 0):.1f}%", 

297 ) 

298 ) 

299 

300 return output 

301 

302 async def perform_git_checkpoint( 

303 self, 

304 current_dir: Path, 

305 quality_score: int, 

306 ) -> list[str]: 

307 """Handle git operations for checkpoint commit using the new git utilities.""" 

308 output = [] 

309 output.extend(("\n" + "=" * 50, "📦 Git Checkpoint Commit", "=" * 50)) 

310 

311 try: 

312 # Use the new git utilities 

313 success, result, git_output = create_checkpoint_commit( 

314 current_dir, 

315 self.current_project or "Unknown", 

316 quality_score, 

317 ) 

318 

319 output.extend(git_output) 

320 

321 if success and result != "clean": 

322 self.logger.info( 

323 "Checkpoint commit created, project=%s, commit_hash=%s, quality_score=%d", 

324 self.current_project, 

325 result, 

326 quality_score, 

327 ) 

328 

329 except Exception as e: 

330 output.append(f"\n⚠️ Git operations error: {e}") 

331 self.logger.exception( 

332 "Git checkpoint error occurred, error=%s, project=%s", 

333 str(e), 

334 self.current_project, 

335 ) 

336 

337 return output 

338 

339 def _setup_working_directory(self, working_directory: str | None) -> Path: 

340 """Set up working directory and project name.""" 

341 if working_directory: 

342 os.chdir(working_directory) 

343 

344 current_dir = Path.cwd() 

345 self.current_project = current_dir.name 

346 return current_dir 

347 

348 def _setup_claude_directories(self) -> Path: 

349 """Create .claude directory structure.""" 

350 claude_dir = Path.home() / ".claude" 

351 claude_dir.mkdir(exist_ok=True) 

352 (claude_dir / "data").mkdir(exist_ok=True) 

353 (claude_dir / "logs").mkdir(exist_ok=True) 

354 return claude_dir 

355 

356 def _discover_session_files(self, current_dir: Path) -> list[Path]: 

357 """Discover session files in the current directory and subdirectories.""" 

358 return [ 

359 file_path 

360 for file_path in current_dir.rglob("*.session.json") 

361 if file_path.is_file() 

362 ] 

363 

364 async def _read_previous_session_info( 

365 self, file_path: Path 

366 ) -> dict[str, t.Any] | None: 

367 """Read previous session information from a file - handles both JSON and markdown files.""" 

368 try: 

369 content = file_path.read_text(encoding="utf-8") 

370 

371 # Try JSON first 

372 import json 

373 

374 try: 

375 data = json.loads(content) 

376 # Ensure the return type is properly typed as dict[str, t.Any] | None 

377 if isinstance(data, dict): 

378 return data # type: ignore[return-value] 

379 return None 

380 except json.JSONDecodeError: 

381 # If not JSON, try to parse as markdown handoff file 

382 from session_buddy.core.lifecycle.session_info import ( 

383 parse_session_file, 

384 ) 

385 

386 # Parse the markdown file content 

387 session_info = await parse_session_file(file_path) 

388 

389 # Convert SessionInfo to dictionary format expected by the system 

390 if session_info.is_complete(): 390 ↛ 399line 390 didn't jump to line 399 because the condition on line 390 was always true

391 return { 

392 "ended_at": session_info.ended_at, 

393 "quality_score": session_info.quality_score, 

394 "working_directory": session_info.working_directory, 

395 "top_recommendation": session_info.top_recommendation, 

396 "session_id": session_info.session_id, 

397 } 

398 

399 return None 

400 except OSError: 

401 return None 

402 

403 def _find_latest_handoff_file(self, current_dir: Path) -> Path | None: 

404 """Find the latest handoff file in the project - supports both JSON and markdown files.""" 

405 # Look for markdown handoff files in the current directory (legacy format) 

406 legacy_handoff_files = list(current_dir.glob("session_handoff_*.md")) 

407 latest_legacy = None 

408 if legacy_handoff_files: 

409 latest_legacy = max(legacy_handoff_files, key=lambda f: f.stat().st_mtime) 

410 

411 # Look in the .crackerjack/session/handoff directory for newer markdown files 

412 crackerjack_handoff_dir = current_dir / ".crackerjack" / "session" / "handoff" 

413 if crackerjack_handoff_dir.exists(): 

414 handoff_files = list(crackerjack_handoff_dir.glob("session_handoff_*.md")) 

415 if handoff_files: 415 ↛ 429line 415 didn't jump to line 429 because the condition on line 415 was always true

416 latest_nested = max(handoff_files, key=lambda f: f.stat().st_mtime) 

417 # Compare with legacy files if present and return the most recent 

418 if ( 418 ↛ 422line 418 didn't jump to line 422 because the condition on line 418 was never true

419 latest_legacy 

420 and latest_nested.stat().st_mtime < latest_legacy.stat().st_mtime 

421 ): 

422 return latest_legacy 

423 return latest_nested 

424 # If the nested directory doesn't exist, return the legacy file if found 

425 elif latest_legacy: 

426 return latest_legacy 

427 

428 # Next, look for JSON handoff files anywhere in the directory 

429 handoff_files = list(current_dir.rglob("*.handoff.json")) 

430 if handoff_files: 430 ↛ 431line 430 didn't jump to line 431 because the condition on line 430 was never true

431 return max(handoff_files, key=lambda f: f.stat().st_mtime) 

432 

433 # Finally, fall back to any session-related JSON files 

434 session_files = list(current_dir.rglob("*.session.json")) 

435 if session_files: 435 ↛ 436line 435 didn't jump to line 436 because the condition on line 435 was never true

436 return max(session_files, key=lambda f: f.stat().st_mtime) 

437 

438 return None 

439 

440 async def _get_previous_session_info( 

441 self, 

442 current_dir: Path, 

443 ) -> dict[str, t.Any] | None: 

444 """Get previous session information if available. Target complexity: ≤5.""" 

445 session_files = self._discover_session_files(current_dir) 

446 

447 for file_path in session_files: 447 ↛ 448line 447 didn't jump to line 448 because the loop on line 447 never started

448 session_info = await self._read_previous_session_info(file_path) 

449 if session_info: 

450 return session_info 

451 

452 # Fallback to old method 

453 latest_handoff = self._find_latest_handoff_file(current_dir) 

454 if latest_handoff: 

455 return await self._read_previous_session_info(latest_handoff) 

456 

457 return None 

458 

459 async def analyze_project_context(self, current_dir: Path) -> dict[str, bool]: 

460 """Analyze project context and return relevant information.""" 

461 # Ensure current_dir is a Path object 

462 current_dir = Path(current_dir) 

463 

464 def _safe_any_glob(pattern: str) -> bool: 

465 try: 

466 return any(current_dir.glob(pattern)) 

467 except (OSError, PermissionError): 

468 return False 

469 

470 # This is a basic implementation; could be expanded based on requirements 

471 has_git_repo = is_git_repository( 

472 current_dir 

473 ) # Use the function from git_operations 

474 has_readme = _safe_any_glob("README*") 

475 has_pyproject_toml = (current_dir / "pyproject.toml").is_file() 

476 has_setup_py = (current_dir / "setup.py").is_file() 

477 has_requirements_txt = (current_dir / "requirements.txt").is_file() 

478 has_src_structure = (current_dir / "src").is_dir() 

479 has_tests = _safe_any_glob("test*") or _safe_any_glob("**/test*") 

480 has_docs = _safe_any_glob("docs/**") or _safe_any_glob("**/*.md") 

481 has_ci_cd = ( 

482 (current_dir / ".github").exists() 

483 or (current_dir / ".gitlab").exists() 

484 or (current_dir / ".circleci").exists() 

485 ) 

486 has_venv = (current_dir / ".venv").exists() or (current_dir / "venv").exists() 

487 has_python_files = _safe_any_glob("**/*.py") 

488 

489 # Detect commonly used Python web frameworks and libraries 

490 requirements_content = "" 

491 with suppress(OSError, PermissionError): 

492 if (current_dir / "requirements.txt").is_file(): 

493 requirements_content += (current_dir / "requirements.txt").read_text() 

494 with suppress(OSError, PermissionError): 

495 if (current_dir / "pyproject.toml").is_file(): 

496 requirements_content += (current_dir / "pyproject.toml").read_text() 

497 

498 # Scan Python files for framework imports (first 10 files as suggested by test) 

499 try: 

500 python_files = list(current_dir.glob("**/*.py"))[:10] 

501 except (OSError, PermissionError): 

502 python_files = [] 

503 for py_file in python_files: 

504 try: 

505 content = py_file.read_text() 

506 requirements_content += content # Add file content to check for imports 

507 except (OSError, UnicodeDecodeError): 

508 # Skip files that can't be read 

509 continue 

510 

511 uses_fastapi = "fastapi" in requirements_content.lower() 

512 uses_django = "django" in requirements_content.lower() 

513 uses_flask = "flask" in requirements_content.lower() 

514 

515 return { 

516 "has_git_repo": has_git_repo, 

517 "has_readme": has_readme, 

518 "has_pyproject_toml": has_pyproject_toml, 

519 "has_setup_py": has_setup_py, 

520 "has_requirements_txt": has_requirements_txt, 

521 "has_src_structure": has_src_structure, 

522 "has_tests": has_tests, 

523 "has_docs": has_docs, 

524 "has_ci_cd": has_ci_cd, 

525 "has_venv": has_venv, 

526 "has_python_files": has_python_files, 

527 "uses_fastapi": uses_fastapi, 

528 "uses_django": uses_django, 

529 "uses_flask": uses_flask, 

530 } 

531 

532 async def _generate_handoff_documentation( 

533 self, summary: dict[str, t.Any], quality_data: dict[str, t.Any] 

534 ) -> str: 

535 """Generate handoff documentation based on session summary and quality data.""" 

536 from datetime import datetime 

537 

538 # Format as markdown document 

539 markdown_content = [] 

540 markdown_content.append( 

541 f"# Session Handoff Report - {summary.get('project', 'unknown')}" 

542 ) 

543 markdown_content.append( 

544 f"\n**Session ended:** {summary.get('session_end_time', datetime.now().isoformat())}" 

545 ) 

546 markdown_content.append( 

547 f"**Final quality score:** {summary.get('final_quality_score', 0)}/100" 

548 ) 

549 markdown_content.append( 

550 f"**Working directory:** {summary.get('working_directory', 'N/A')}" 

551 ) 

552 markdown_content.append("") 

553 

554 if summary.get("recommendations"): 554 ↛ 561line 554 didn't jump to line 561 because the condition on line 554 was always true

555 markdown_content.append("## Recommendations") 

556 for rec in summary["recommendations"]: 

557 markdown_content.append(f"- {rec}") 

558 markdown_content.append("") 

559 

560 # Add quality details 

561 breakdown = quality_data.get("breakdown", {}) 

562 if breakdown: 562 ↛ 568line 562 didn't jump to line 568 because the condition on line 562 was always true

563 markdown_content.append("## Quality Breakdown") 

564 for key, value in breakdown.items(): 

565 markdown_content.append(f"- {key}: {value}") 

566 markdown_content.append("") 

567 

568 return "\n".join(markdown_content) 

569 

570 def _save_handoff_documentation( 

571 self, content: str, current_dir: Path 

572 ) -> Path | None: 

573 """Save handoff documentation to a file.""" 

574 from datetime import datetime 

575 

576 try: 

577 # Save to .claude/handoff/ directory instead of project root 

578 handoff_dir = current_dir / ".claude" / "handoff" 

579 handoff_dir.mkdir(parents=True, exist_ok=True) 

580 

581 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 

582 handoff_file = handoff_dir / f"session_handoff_{timestamp}.md" 

583 handoff_file.write_text(content) 

584 return handoff_file 

585 except Exception: 

586 # Return None on any failure to save 

587 return None 

588 

589 async def initialize_session( 

590 self, 

591 working_directory: str | None = None, 

592 ) -> dict[str, t.Any]: 

593 """Initialize a new session with comprehensive setup.""" 

594 try: 

595 # Setup directories and project 

596 current_dir = self._setup_working_directory(working_directory) 

597 claude_dir = self._setup_claude_directories() 

598 

599 # Analyze project and assess quality 

600 project_context = await self.analyze_project_context(current_dir) 

601 quality_score, quality_data = await self.perform_quality_assessment( 

602 project_dir=current_dir, 

603 ) 

604 

605 # Get previous session info 

606 previous_session_info = await self._get_previous_session_info(current_dir) 

607 

608 self.logger.info( 

609 "Session initialized, project=%s, quality_score=%d, working_directory=%s, has_previous_session=%s", 

610 self.current_project, 

611 quality_score, 

612 str(current_dir), 

613 previous_session_info is not None, 

614 ) 

615 

616 return { 

617 "success": True, 

618 "project": self.current_project, 

619 "working_directory": str(current_dir), 

620 "quality_score": quality_score, 

621 "quality_data": quality_data, 

622 "project_context": project_context, 

623 "claude_directory": str(claude_dir), 

624 "previous_session": previous_session_info, 

625 } 

626 

627 except Exception as e: 

628 self.logger.exception("Session initialization failed: %s", str(e)) 

629 return {"success": False, "error": str(e)} 

630 

631 def get_previous_quality_score(self, project: str) -> int | None: 

632 """Get the most recent quality score for a project.""" 

633 scores = self._quality_history.get(project, []) 

634 return scores[-1] if scores else None 

635 

636 def record_quality_score(self, project: str, score: int) -> None: 

637 """Record a quality score for quality trend tracking.""" 

638 if project not in self._quality_history: 

639 self._quality_history[project] = [] 

640 self._quality_history[project].append(score) 

641 # Keep only last 10 scores to prevent unbounded growth 

642 if len(self._quality_history[project]) > 10: 

643 self._quality_history[project] = self._quality_history[project][-10:] 

644 

645 async def checkpoint_session( 

646 self, 

647 working_directory: str | None = None, 

648 is_manual: bool = False, 

649 ) -> dict[str, t.Any]: 

650 """Perform a comprehensive session checkpoint. 

651 

652 Args: 

653 working_directory: Optional working directory override 

654 is_manual: Whether this is a manually-triggered checkpoint 

655 

656 Returns: 

657 Dictionary containing checkpoint results and auto-store decision 

658 

659 """ 

660 try: 

661 current_dir = Path(working_directory) if working_directory else Path.cwd() 

662 self.current_project = current_dir.name 

663 

664 # Quality assessment 

665 quality_score, quality_data = await self.perform_quality_assessment( 

666 project_dir=current_dir, 

667 ) 

668 

669 # Get previous score for trend analysis 

670 previous_score = self.get_previous_quality_score(self.current_project) 

671 

672 # Record this score for future comparisons 

673 self.record_quality_score(self.current_project, quality_score) 

674 

675 # Determine if reflection should be auto-stored 

676 from session_buddy.utils.reflection_utils import ( 

677 format_auto_store_summary, 

678 should_auto_store_checkpoint, 

679 ) 

680 

681 auto_store_decision = should_auto_store_checkpoint( 

682 quality_score=quality_score, 

683 previous_score=previous_score, 

684 is_manual=is_manual, 

685 session_phase="checkpoint", 

686 ) 

687 

688 # Git checkpoint 

689 git_output = await self.perform_git_checkpoint(current_dir, quality_score) 

690 

691 # Format results 

692 quality_output = self.format_quality_results(quality_score, quality_data) 

693 

694 self.logger.info( 

695 "Session checkpoint completed, project=%s, quality_score=%d, auto_store_decision=%s, auto_store_reason=%s", 

696 self.current_project, 

697 quality_score, 

698 auto_store_decision.should_store, 

699 auto_store_decision.reason.value, 

700 ) 

701 

702 return { 

703 "success": True, 

704 "quality_score": quality_score, 

705 "quality_output": quality_output, 

706 "git_output": git_output, 

707 "timestamp": datetime.now().isoformat(), 

708 "auto_store_decision": auto_store_decision, 

709 "auto_store_summary": format_auto_store_summary(auto_store_decision), 

710 } 

711 

712 except Exception as e: 

713 self.logger.exception("Session checkpoint failed, error=%s", str(e)) 

714 return {"success": False, "error": str(e)} 

715 

716 async def end_session( 

717 self, 

718 working_directory: str | None = None, 

719 ) -> dict[str, t.Any]: 

720 """End the current session with cleanup and summary.""" 

721 try: 

722 current_dir = Path(working_directory) if working_directory else Path.cwd() 

723 self.current_project = current_dir.name 

724 

725 # Final quality assessment 

726 quality_score, quality_data = await self.perform_quality_assessment( 

727 project_dir=current_dir, 

728 ) 

729 

730 # Create session summary 

731 summary = { 

732 "project": self.current_project, 

733 "final_quality_score": quality_score, 

734 "session_end_time": datetime.now().isoformat(), 

735 "working_directory": str(current_dir), 

736 "recommendations": quality_data.get("recommendations", []), 

737 } 

738 

739 # Generate handoff documentation 

740 handoff_content = await self._generate_handoff_documentation( 

741 summary, 

742 quality_data, 

743 ) 

744 

745 # Save handoff documentation 

746 handoff_path = self._save_handoff_documentation( 

747 handoff_content, 

748 current_dir, 

749 ) 

750 

751 self.logger.info( 

752 "Session ended, project=%s, final_quality_score=%d", 

753 self.current_project, 

754 quality_score, 

755 ) 

756 

757 summary["handoff_documentation"] = ( 

758 str(handoff_path) if handoff_path else None 

759 ) 

760 

761 return {"success": True, "summary": summary} 

762 

763 except Exception as e: 

764 self.logger.exception("Session end failed, error=%s", str(e)) 

765 return {"success": False, "error": str(e)} 

766 

767 async def get_session_status( 

768 self, 

769 working_directory: str | None = None, 

770 ) -> dict[str, t.Any]: 

771 """Get current session status and health information.""" 

772 try: 

773 current_dir = Path(working_directory) if working_directory else Path.cwd() 

774 

775 self.current_project = current_dir.name 

776 

777 # Get comprehensive status 

778 project_context = await self.analyze_project_context(current_dir) 

779 quality_score, quality_data = await self.perform_quality_assessment( 

780 project_dir=current_dir, 

781 ) 

782 

783 # Check system health 

784 uv_available = shutil.which("uv") is not None 

785 git_available = is_git_repository(current_dir) 

786 claude_dir = Path.home() / ".claude" 

787 claude_dir_exists = claude_dir.exists() 

788 

789 return { 

790 "success": True, 

791 "project": self.current_project, 

792 "working_directory": str(current_dir), 

793 "quality_score": quality_score, 

794 "quality_breakdown": quality_data["breakdown"], 

795 "recommendations": quality_data["recommendations"], 

796 "project_context": project_context, 

797 "system_health": { 

798 "uv_available": uv_available, 

799 "git_repository": git_available, 

800 "claude_directory": claude_dir_exists, 

801 }, 

802 "timestamp": datetime.now().isoformat(), 

803 } 

804 

805 except Exception as e: 

806 self.logger.exception("Failed to get session status, error=%s", str(e)) 

807 return {"success": False, "error": str(e)}