Coverage for session_buddy / crackerjack_integration.py: 58.72%

417 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1"""Crackerjack Integration module for progress tracking and test monitoring. 

2 

3This module provides deep integration with Crackerjack for: 

4- Progress tracking output parsing for memory enrichment 

5- Test result monitoring for context enhancement 

6- Command execution with result capture 

7- Quality metrics integration 

8""" 

9 

10import asyncio 

11import json 

12import logging 

13import operator 

14import sqlite3 

15import tempfile 

16import threading 

17import time 

18from dataclasses import asdict, dataclass 

19from datetime import datetime, timedelta 

20from enum import Enum 

21from pathlib import Path 

22from typing import Any 

23 

24from session_buddy.utils.crackerjack import ( 

25 CrackerjackOutputParser, 

26) 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31class CrackerjackCommand(Enum): 

32 """Supported Crackerjack commands.""" 

33 

34 # Core quality commands 

35 ANALYZE = "analyze" # Comprehensive analysis command 

36 CHECK = "check" 

37 TEST = "test" 

38 LINT = "lint" 

39 FORMAT = "format" 

40 TYPECHECK = "typecheck" # Type checking support 

41 

42 # Security and complexity 

43 SECURITY = "security" 

44 COMPLEXITY = "complexity" 

45 COVERAGE = "coverage" 

46 

47 # Build and maintenance 

48 BUILD = "build" 

49 CLEAN = "clean" 

50 

51 # Documentation 

52 DOCS = "docs" 

53 

54 # Release management 

55 RELEASE = "release" # Release command support 

56 

57 

58class TestStatus(Enum): 

59 """Test execution status.""" 

60 

61 PASSED = "passed" 

62 FAILED = "failed" 

63 SKIPPED = "skipped" 

64 ERROR = "error" 

65 XFAIL = "xfail" 

66 XPASS = "xpass" 

67 

68 

69class QualityMetric(Enum): 

70 """Quality metrics tracked.""" 

71 

72 CODE_COVERAGE = "coverage" 

73 COMPLEXITY = "complexity" 

74 LINT_SCORE = "lint_score" 

75 SECURITY_SCORE = "security_score" 

76 TEST_PASS_RATE = "test_pass_rate" # nosec B105 

77 BUILD_STATUS = "build_status" 

78 

79 

80@dataclass 

81class CrackerjackResult: 

82 """Result of Crackerjack command execution.""" 

83 

84 command: str 

85 exit_code: int 

86 stdout: str 

87 stderr: str 

88 execution_time: float 

89 timestamp: datetime 

90 working_directory: str 

91 parsed_data: dict[str, Any] | None 

92 quality_metrics: dict[str, float] 

93 test_results: list[dict[str, Any]] 

94 memory_insights: list[str] 

95 

96 

97@dataclass 

98class TestResult: 

99 """Individual test result information.""" 

100 

101 test_id: str 

102 test_name: str 

103 status: TestStatus 

104 duration: float 

105 file_path: str 

106 line_number: int | None 

107 error_message: str | None 

108 traceback: str | None 

109 tags: list[str] 

110 coverage_data: dict[str, Any] | None 

111 

112 

113@dataclass 

114class ProgressSnapshot: 

115 """Progress tracking snapshot.""" 

116 

117 timestamp: datetime 

118 project_path: str 

119 command: str 

120 stage: str 

121 progress_percentage: float 

122 current_task: str 

123 completed_tasks: list[str] 

124 failed_tasks: list[str] 

125 quality_metrics: dict[str, float] 

126 estimated_completion: datetime | None 

127 memory_context: list[str] 

128 

129 

130# PatternMappingsBuilder and CrackerjackOutputParser classes have been extracted 

131# to session_buddy.utils.crackerjack module for better modularity and reusability. 

132 

133 

134class CrackerjackIntegration: 

135 """Main integration class for Crackerjack command execution and monitoring.""" 

136 

137 def __init__(self, db_path: str | None = None) -> None: 

138 """Initialize Crackerjack integration.""" 

139 self.db_path = db_path or str( 

140 Path.home() / ".claude" / "data" / "crackerjack_integration.db", 

141 ) 

142 self.parser = CrackerjackOutputParser() 

143 self._lock = threading.Lock() 

144 try: 

145 self._init_database() 

146 except Exception: 

147 # Fall back to a temp-writable path if the default is not writable 

148 tmp_db = ( 

149 Path(tempfile.gettempdir()) 

150 / "session-mgmt-mcp" 

151 / "data" 

152 / "crackerjack_integration.db" 

153 ) 

154 tmp_db.parent.mkdir(parents=True, exist_ok=True) 

155 self.db_path = str(tmp_db) 

156 self._init_database() 

157 

158 def execute_command( 

159 self, 

160 cmd: list[str], 

161 **kwargs: Any, 

162 ) -> dict[str, Any]: 

163 """Execute command synchronously (for CommandRunner protocol compatibility). 

164 

165 This is a synchronous wrapper around execute_crackerjack_command for 

166 compatibility with crackerjack's CommandRunner protocol. 

167 """ 

168 import os 

169 import subprocess # nosec B404 

170 

171 try: 

172 env = kwargs.get("env", os.environ.copy()) 

173 

174 # Execute the command directly using subprocess 

175 result = subprocess.run( 

176 cmd, 

177 check=False, 

178 capture_output=True, 

179 text=True, 

180 timeout=kwargs.get("timeout", 300), 

181 cwd=kwargs.get("cwd", "."), 

182 env=env, 

183 **{ 

184 k: v 

185 for k, v in kwargs.items() 

186 if k not in {"timeout", "cwd", "env"} 

187 }, 

188 ) 

189 

190 return { 

191 "stdout": result.stdout, 

192 "stderr": result.stderr, 

193 "returncode": result.returncode, 

194 "success": result.returncode == 0, 

195 } 

196 

197 except subprocess.TimeoutExpired as e: 

198 return { 

199 "stdout": e.stdout.decode() if e.stdout else "", 

200 "stderr": e.stderr.decode() if e.stderr else "Command timed out", 

201 "returncode": -1, 

202 "success": False, 

203 } 

204 except Exception as e: 

205 return {"stdout": "", "stderr": str(e), "returncode": -2, "success": False} 

206 

207 def _init_database(self) -> None: 

208 """Initialize SQLite database for Crackerjack integration.""" 

209 Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) 

210 

211 with sqlite3.connect(self.db_path) as conn: 

212 conn.execute(""" 

213 CREATE TABLE IF NOT EXISTS crackerjack_results ( 

214 id TEXT PRIMARY KEY, 

215 command TEXT NOT NULL, 

216 exit_code INTEGER, 

217 stdout TEXT, 

218 stderr TEXT, 

219 execution_time REAL, 

220 timestamp TIMESTAMP, 

221 working_directory TEXT, 

222 parsed_data TEXT, -- JSON 

223 quality_metrics TEXT, -- JSON 

224 memory_insights TEXT -- JSON array 

225 ) 

226 """) 

227 

228 conn.execute(""" 

229 CREATE TABLE IF NOT EXISTS test_results ( 

230 id TEXT PRIMARY KEY, 

231 result_id TEXT NOT NULL, 

232 test_name TEXT NOT NULL, 

233 status TEXT NOT NULL, 

234 duration REAL, 

235 file_path TEXT, 

236 line_number INTEGER, 

237 error_message TEXT, 

238 timestamp TIMESTAMP, 

239 FOREIGN KEY (result_id) REFERENCES crackerjack_results(id) 

240 ) 

241 """) 

242 

243 conn.execute(""" 

244 CREATE TABLE IF NOT EXISTS progress_snapshots ( 

245 id TEXT PRIMARY KEY, 

246 project_path TEXT NOT NULL, 

247 command TEXT NOT NULL, 

248 stage TEXT, 

249 progress_percentage REAL, 

250 current_task TEXT, 

251 completed_tasks TEXT, -- JSON array 

252 failed_tasks TEXT, -- JSON array 

253 quality_metrics TEXT, -- JSON 

254 timestamp TIMESTAMP, 

255 memory_context TEXT -- JSON array 

256 ) 

257 """) 

258 

259 conn.execute(""" 

260 CREATE TABLE IF NOT EXISTS quality_metrics_history ( 

261 id TEXT PRIMARY KEY, 

262 project_path TEXT NOT NULL, 

263 metric_type TEXT NOT NULL, 

264 metric_value REAL NOT NULL, 

265 timestamp TIMESTAMP, 

266 result_id TEXT, 

267 FOREIGN KEY (result_id) REFERENCES crackerjack_results(id) 

268 ) 

269 """) 

270 

271 # Create indices 

272 conn.execute( 

273 "CREATE INDEX IF NOT EXISTS idx_results_timestamp ON crackerjack_results(timestamp)", 

274 ) 

275 conn.execute( 

276 "CREATE INDEX IF NOT EXISTS idx_results_command ON crackerjack_results(command)", 

277 ) 

278 conn.execute( 

279 "CREATE INDEX IF NOT EXISTS idx_test_results_status ON test_results(status)", 

280 ) 

281 conn.execute( 

282 "CREATE INDEX IF NOT EXISTS idx_progress_project ON progress_snapshots(project_path)", 

283 ) 

284 conn.execute( 

285 "CREATE INDEX IF NOT EXISTS idx_metrics_type ON quality_metrics_history(metric_type)", 

286 ) 

287 

288 def _build_command_flags(self, command: str, ai_agent_mode: bool) -> list[str]: 

289 """Build appropriate command flags for the given command. 

290 

291 Crackerjack CLI structure (v0.47+): 

292 - Uses 'run' subcommand followed by flags 

293 - Example: python -m crackerjack run --fast --quick 

294 - Example: python -m crackerjack run --comp --run-tests 

295 - Example: python -m crackerjack run --all patch (release workflow) 

296 

297 IMPORTANT: --all requires an argument (patch|minor|major|auto|interactive) 

298 For general quality checks, use 'run' with no flags or --fast/--comp 

299 """ 

300 command_mappings = { 

301 # Fast hooks only (formatters and basic checks) 

302 "lint": ["run", "--fast"], 

303 "format": ["run", "--fast"], 

304 # Comprehensive hooks (type checking, security, complexity) 

305 "check": ["run", "--comp"], 

306 "typecheck": ["run", "--comp"], 

307 "security": ["run", "--comp"], # Security is part of comprehensive hooks 

308 "complexity": [ 

309 "run", 

310 "--comp", 

311 ], # Complexity is part of comprehensive hooks 

312 "analyze": ["run", "--comp"], # Comprehensive analysis 

313 # Test execution 

314 "test": ["run", "--run-tests"], 

315 # Build and maintenance (default run with no special flags) 

316 "build": ["run"], 

317 "clean": ["run"], # Clean happens automatically in current version 

318 # All quality checks (use default run, NOT --all which is for release) 

319 "all": ["run"], # Just run with no special flags 

320 "run": ["run"], 

321 # Standalone commands (no 'run' prefix) 

322 "run-tests": ["run-tests"], 

323 } 

324 

325 flags = command_mappings.get(command.lower(), ["run"]) 

326 if ai_agent_mode: 

327 flags.append("--ai-fix") 

328 return flags 

329 

330 async def _execute_process( 

331 self, 

332 full_command: list[str], 

333 working_directory: str, 

334 timeout: int, 

335 ) -> tuple[int, str, str, float]: 

336 """Execute the subprocess and return exit code, stdout, stderr, and execution time.""" 

337 import os 

338 

339 start_time = time.time() 

340 

341 env = os.environ.copy() 

342 

343 process = await asyncio.create_subprocess_exec( 

344 *full_command, 

345 cwd=working_directory, 

346 stdout=asyncio.subprocess.PIPE, 

347 stderr=asyncio.subprocess.PIPE, 

348 env=env, 

349 ) 

350 

351 stdout, stderr = await asyncio.wait_for( 

352 process.communicate(), 

353 timeout=timeout, 

354 ) 

355 

356 exit_code = process.returncode or 0 

357 execution_time = time.time() - start_time 

358 stdout_text = stdout.decode("utf-8", errors="ignore") 

359 stderr_text = stderr.decode("utf-8", errors="ignore") 

360 

361 return exit_code, stdout_text, stderr_text, execution_time 

362 

363 def _create_error_result( 

364 self, 

365 command: str, 

366 exit_code: int, 

367 stderr: str, 

368 execution_time: float, 

369 working_directory: str, 

370 memory_insight: str, 

371 ) -> CrackerjackResult: 

372 """Create a standardized error result.""" 

373 return CrackerjackResult( 

374 command=command, 

375 exit_code=exit_code, 

376 stdout="", 

377 stderr=stderr, 

378 execution_time=execution_time, 

379 timestamp=datetime.now(), 

380 working_directory=working_directory, 

381 parsed_data={}, 

382 quality_metrics={}, 

383 test_results=[], 

384 memory_insights=[memory_insight], 

385 ) 

386 

387 async def execute_crackerjack_command( 

388 self, 

389 command: str, 

390 args: list[str] | None = None, 

391 working_directory: str = ".", 

392 timeout: int = 300, 

393 ai_agent_mode: bool = False, 

394 ) -> CrackerjackResult: 

395 """Execute Crackerjack command and capture results.""" 

396 args = args or [] 

397 command_flags = self._build_command_flags(command, ai_agent_mode) 

398 quick_commands = {"lint", "check", "test", "format", "typecheck"} 

399 if command.lower() in quick_commands and "--quick" not in args: 

400 if "--ai-fix" in command_flags: 

401 command_flags.insert(command_flags.index("--ai-fix"), "--quick") 

402 else: 

403 command_flags.append("--quick") 

404 full_command = ["python", "-m", "crackerjack", *command_flags, *args] 

405 

406 start_time = time.time() 

407 result_id = f"cj_{int(start_time * 1000)}" 

408 

409 try: 

410 ( 

411 exit_code, 

412 stdout_text, 

413 stderr_text, 

414 execution_time, 

415 ) = await self._execute_process(full_command, working_directory, timeout) 

416 

417 parsed_data, memory_insights = self.parser.parse_output( 

418 command, 

419 stdout_text, 

420 stderr_text, 

421 ) 

422 quality_metrics = self._calculate_quality_metrics( 

423 parsed_data, 

424 exit_code, 

425 stderr_text, 

426 ) 

427 

428 result = CrackerjackResult( 

429 command=command, 

430 exit_code=exit_code, 

431 stdout=stdout_text, 

432 stderr=stderr_text, 

433 execution_time=execution_time, 

434 timestamp=datetime.now(), 

435 working_directory=working_directory, 

436 parsed_data=parsed_data, 

437 quality_metrics=quality_metrics, 

438 test_results=parsed_data.get("test_results", []), 

439 memory_insights=memory_insights, 

440 ) 

441 

442 await self._store_result(result_id, result) 

443 await self._store_progress_snapshot(result_id, result, working_directory) 

444 return result 

445 

446 except TimeoutError: 

447 execution_time = time.time() - start_time 

448 error_result = self._create_error_result( 

449 command, 

450 -1, 

451 f"Command timed out after {timeout} seconds", 

452 execution_time, 

453 working_directory, 

454 f"Command '{command}' timed out - consider optimizing or increasing timeout", 

455 ) 

456 await self._store_result(result_id, error_result) 

457 return error_result 

458 

459 except Exception as e: 

460 execution_time = time.time() - start_time 

461 error_result = self._create_error_result( 

462 command, 

463 -2, 

464 f"Execution error: {e!s}", 

465 execution_time, 

466 working_directory, 

467 f"Command '{command}' failed with error: {e!s}", 

468 ) 

469 await self._store_result(result_id, error_result) 

470 return error_result 

471 

472 async def get_recent_results( 

473 self, 

474 hours: int = 24, 

475 command: str | None = None, 

476 ) -> list[dict[str, Any]]: 

477 """Get recent Crackerjack execution results.""" 

478 since = datetime.now() - timedelta(hours=hours) 

479 

480 with sqlite3.connect(self.db_path) as conn: 

481 conn.row_factory = sqlite3.Row 

482 

483 where_conditions = ["timestamp >= ?"] 

484 params = [since.isoformat()] 

485 

486 if command: 486 ↛ 487line 486 didn't jump to line 487 because the condition on line 486 was never true

487 where_conditions.append("command = ?") 

488 params.append(command) 

489 

490 # Build SQL safely - all user input is parameterized via params list 

491 query = ( 

492 "SELECT * FROM crackerjack_results WHERE " 

493 + " AND ".join(where_conditions) 

494 + " ORDER BY timestamp DESC" 

495 ) 

496 

497 cursor = conn.execute(query, params) 

498 results = [] 

499 

500 for row in cursor.fetchall(): 

501 result = dict(row) 

502 result["parsed_data"] = json.loads(result["parsed_data"] or "{}") 

503 result["quality_metrics"] = json.loads( 

504 result["quality_metrics"] or "{}", 

505 ) 

506 result["memory_insights"] = json.loads( 

507 result["memory_insights"] or "[]", 

508 ) 

509 results.append(result) 

510 

511 return results 

512 

513 async def get_quality_metrics_history( 

514 self, 

515 project_path: str, 

516 metric_type: str | None = None, 

517 days: int = 30, 

518 ) -> list[dict[str, Any]]: 

519 """Get quality metrics history for trend analysis.""" 

520 since = datetime.now() - timedelta(days=days) 

521 

522 with sqlite3.connect(self.db_path) as conn: 

523 conn.row_factory = sqlite3.Row 

524 

525 where_conditions = ["project_path = ?", "timestamp >= ?"] 

526 params = [project_path, since.isoformat()] 

527 

528 if metric_type: 528 ↛ 529line 528 didn't jump to line 529 because the condition on line 528 was never true

529 where_conditions.append("metric_type = ?") 

530 params.append(metric_type) 

531 

532 # Build SQL safely - all user input is parameterized via params list 

533 query = ( 

534 "SELECT * FROM quality_metrics_history WHERE " 

535 + " AND ".join(where_conditions) 

536 + " ORDER BY timestamp DESC" 

537 ) 

538 

539 cursor = conn.execute(query, params) 

540 return [dict(row) for row in cursor.fetchall()] 

541 

542 async def get_test_failure_patterns(self, days: int = 7) -> dict[str, Any]: 

543 """Analyze test failure patterns for insights.""" 

544 since = datetime.now() - timedelta(days=days) 

545 

546 with sqlite3.connect(self.db_path) as conn: 

547 conn.row_factory = sqlite3.Row 

548 

549 # Get failed tests 

550 failed_tests = conn.execute( 

551 """ 

552 SELECT test_name, file_path, error_message, COUNT(*) as failure_count 

553 FROM test_results 

554 WHERE status = 'failed' AND timestamp >= ? 

555 GROUP BY test_name, file_path, error_message 

556 ORDER BY failure_count DESC 

557 """, 

558 (since.isoformat(),), 

559 ).fetchall() 

560 

561 # Get flaky tests (alternating pass/fail) 

562 flaky_tests = conn.execute( 

563 """ 

564 SELECT test_name, file_path, 

565 COUNT(DISTINCT status) as status_count, 

566 COUNT(*) as total_runs 

567 FROM test_results 

568 WHERE timestamp >= ? 

569 GROUP BY test_name, file_path 

570 HAVING status_count > 1 AND total_runs >= 3 

571 ORDER BY status_count DESC, total_runs DESC 

572 """, 

573 (since.isoformat(),), 

574 ).fetchall() 

575 

576 # Get most failing files 

577 failing_files = conn.execute( 

578 """ 

579 SELECT file_path, COUNT(*) as failure_count 

580 FROM test_results 

581 WHERE status = 'failed' AND timestamp >= ? 

582 GROUP BY file_path 

583 ORDER BY failure_count DESC 

584 LIMIT 10 

585 """, 

586 (since.isoformat(),), 

587 ).fetchall() 

588 

589 return { 

590 "failed_tests": [dict(row) for row in failed_tests], 

591 "flaky_tests": [dict(row) for row in flaky_tests], 

592 "failing_files": [dict(row) for row in failing_files], 

593 "analysis_period_days": days, 

594 } 

595 

596 def _filter_metrics_by_type( 

597 self, 

598 metrics_history: list[dict[str, Any]], 

599 metric_type: str, 

600 ) -> list[dict[str, Any]]: 

601 """Filter metrics history by type and sort by timestamp.""" 

602 metric_values = [m for m in metrics_history if m["metric_type"] == metric_type] 

603 metric_values.sort(key=operator.itemgetter("timestamp"), reverse=True) 

604 return metric_values 

605 

606 def _calculate_trend_direction(self, change: float) -> str: 

607 """Determine trend direction from change value.""" 

608 if change > 0: 

609 return "improving" 

610 if change < 0: 

611 return "declining" 

612 return "stable" 

613 

614 def _calculate_trend_strength(self, change: float) -> str: 

615 """Determine trend strength from absolute change value.""" 

616 abs_change = abs(change) 

617 if abs_change > 5: 

618 return "strong" 

619 if abs_change > 1: 

620 return "moderate" 

621 return "weak" 

622 

623 def _create_trend_data(self, metric_values: list[dict[str, Any]]) -> dict[str, Any]: 

624 """Create trend data from metric values with sufficient data.""" 

625 mid_point = len(metric_values) // 2 

626 recent = metric_values[:mid_point] if mid_point > 0 else metric_values 

627 older = metric_values[mid_point:] if mid_point > 0 else [] 

628 

629 if not (recent and older): 

630 current_avg = sum(m["metric_value"] for m in metric_values) / len( 

631 metric_values, 

632 ) 

633 return { 

634 "direction": "insufficient_data", 

635 "change": 0, 

636 "change_percentage": 0, 

637 "recent_average": current_avg, 

638 "previous_average": current_avg, 

639 "data_points": len(metric_values), 

640 "trend_strength": "unknown", 

641 } 

642 

643 recent_avg = sum(m["metric_value"] for m in recent) / len(recent) 

644 older_avg = sum(m["metric_value"] for m in older) / len(older) 

645 change = recent_avg - older_avg 

646 

647 return { 

648 "direction": self._calculate_trend_direction(change), 

649 "change": abs(change), 

650 "change_percentage": (abs(change) / older_avg * 100) 

651 if older_avg > 0 

652 else 0, 

653 "recent_average": recent_avg, 

654 "previous_average": older_avg, 

655 "data_points": len(metric_values), 

656 "trend_strength": self._calculate_trend_strength(change), 

657 } 

658 

659 def _calculate_overall_assessment( 

660 self, 

661 trends: dict[str, Any], 

662 days: int, 

663 ) -> dict[str, Any]: 

664 """Calculate overall trend assessment from individual trend data.""" 

665 improving_metrics = sum( 

666 1 for t in trends.values() if t["direction"] == "improving" 

667 ) 

668 declining_metrics = sum( 

669 1 for t in trends.values() if t["direction"] == "declining" 

670 ) 

671 

672 if improving_metrics > declining_metrics: 

673 overall_direction = "improving" 

674 elif declining_metrics > improving_metrics: 

675 overall_direction = "declining" 

676 else: 

677 overall_direction = "stable" 

678 

679 return { 

680 "overall_direction": overall_direction, 

681 "improving_count": improving_metrics, 

682 "declining_count": declining_metrics, 

683 "stable_count": len(trends) - improving_metrics - declining_metrics, 

684 "analysis_period_days": days, 

685 } 

686 

687 async def get_quality_trends( 

688 self, 

689 project_path: str, 

690 days: int = 30, 

691 ) -> dict[str, Any]: 

692 """Analyze quality trends over time.""" 

693 metrics_history = await self.get_quality_metrics_history( 

694 project_path, 

695 None, 

696 days, 

697 ) 

698 

699 metric_types = ( 

700 "test_pass_rate", 

701 "code_coverage", 

702 "lint_score", 

703 "security_score", 

704 "complexity_score", 

705 ) 

706 trends = {} 

707 

708 for metric_type in metric_types: 

709 metric_values = self._filter_metrics_by_type(metrics_history, metric_type) 

710 if len(metric_values) >= 2: 

711 trends[metric_type] = self._create_trend_data(metric_values) 

712 

713 overall_assessment = self._calculate_overall_assessment(trends, days) 

714 

715 return { 

716 "trends": trends, 

717 "overall": overall_assessment, 

718 "recommendations": self._generate_trend_recommendations(trends), 

719 } 

720 

721 def _get_declining_recommendation( 

722 self, 

723 metric_type: str, 

724 change: float, 

725 ) -> str | None: 

726 """Get recommendation for declining metrics.""" 

727 recommendations_map = { 

728 "test_pass_rate": f"⚠️ Test pass rate declining by {change:.1f}% - investigate failing tests", 

729 "code_coverage": f"⚠️ Code coverage declining by {change:.1f}% - add more tests", 

730 "lint_score": "⚠️ Code quality declining - address lint issues", 

731 "security_score": "🔒 Security score declining - review security findings", 

732 "complexity_score": "🔧 Code complexity increasing - consider refactoring", 

733 } 

734 return recommendations_map.get(metric_type) 

735 

736 def _get_improving_recommendation( 

737 self, 

738 metric_type: str, 

739 recent_avg: float, 

740 ) -> str | None: 

741 """Get recommendation for improving metrics with high averages.""" 

742 if metric_type == "test_pass_rate" and recent_avg > 95: 

743 return "✅ Excellent test pass rate trend - maintain current practices" 

744 if metric_type == "code_coverage" and recent_avg > 85: 

745 return "✅ Great coverage improvement - continue testing efforts" 

746 return None 

747 

748 def _generate_trend_recommendations(self, trends: dict[str, Any]) -> list[str]: 

749 """Generate recommendations based on quality trends.""" 

750 recommendations = [] 

751 

752 for metric_type, trend_data in trends.items(): 

753 direction = trend_data["direction"] 

754 strength = trend_data["trend_strength"] 

755 change = trend_data["change"] 

756 recent_avg = trend_data["recent_average"] 

757 

758 if direction == "declining" and strength in {"strong", "moderate"}: 

759 recommendation = self._get_declining_recommendation(metric_type, change) 

760 if recommendation: 

761 recommendations.append(recommendation) 

762 elif direction == "improving" and strength == "strong": 

763 recommendation = self._get_improving_recommendation( 

764 metric_type, 

765 recent_avg, 

766 ) 

767 if recommendation: 

768 recommendations.append(recommendation) 

769 

770 if not recommendations: 

771 recommendations.append( 

772 "📈 Quality metrics are stable - continue current practices", 

773 ) 

774 

775 return recommendations 

776 

777 async def health_check(self) -> dict[str, Any]: 

778 """Check integration health and dependencies.""" 

779 health: dict[str, Any] = { 

780 "crackerjack_available": False, 

781 "database_accessible": False, 

782 "version_compatible": False, 

783 "recommendations": [], 

784 "status": "unhealthy", 

785 } 

786 

787 try: 

788 await self._check_crackerjack_availability(health) 

789 await self._check_database_health(health) 

790 health["status"] = self._determine_health_status(health) 

791 

792 except sqlite3.Error as e: 

793 health["database_accessible"] = False 

794 health["recommendations"].append(f"❌ Database error: {e}") 

795 except Exception as e: 

796 health["error"] = str(e) 

797 health["recommendations"].append(f"❌ Health check error: {e}") 

798 

799 return health 

800 

801 async def _check_crackerjack_availability(self, health: dict[str, Any]) -> None: 

802 """Check if crackerjack command is available.""" 

803 process = await asyncio.create_subprocess_exec( 

804 "crackerjack", 

805 "--help", 

806 stdout=asyncio.subprocess.DEVNULL, 

807 stderr=asyncio.subprocess.DEVNULL, 

808 ) 

809 await process.communicate() 

810 health["crackerjack_available"] = process.returncode == 0 

811 

812 if health["crackerjack_available"]: 

813 health["recommendations"].append( 

814 "✅ Crackerjack is available and responding", 

815 ) 

816 else: 

817 health["recommendations"].append( 

818 "❌ Crackerjack not available - install with 'uv add crackerjack'", 

819 ) 

820 

821 async def _check_database_health(self, health: dict[str, Any]) -> None: 

822 """Check database accessibility and data availability.""" 

823 with sqlite3.connect(self.db_path) as conn: 

824 conn.execute("SELECT 1").fetchone() 

825 health["database_accessible"] = True 

826 health["recommendations"].append("✅ Database connection successful") 

827 

828 cursor = conn.execute("SELECT COUNT(*) FROM crackerjack_results") 

829 result_count = cursor.fetchone()[0] 

830 

831 if result_count > 0: 

832 health["recommendations"].append( 

833 f"📊 {result_count} execution records available", 

834 ) 

835 else: 

836 health["recommendations"].append( 

837 "📝 No execution history - run some crackerjack commands", 

838 ) 

839 

840 def _determine_health_status(self, health: dict[str, Any]) -> str: 

841 """Determine overall health status from component health.""" 

842 if health["crackerjack_available"] and health["database_accessible"]: 

843 return "healthy" 

844 if health["database_accessible"]: 

845 return "partial" 

846 return "unhealthy" 

847 

848 def _calculate_quality_metrics( 

849 self, 

850 parsed_data: dict[str, Any], 

851 exit_code: int, 

852 stderr_content: str = "", 

853 ) -> dict[str, float]: 

854 """Calculate quality metrics from parsed data.""" 

855 metrics = {} 

856 

857 metrics.update(self._calculate_test_metrics(parsed_data)) 

858 metrics.update(self._calculate_coverage_metrics(parsed_data)) 

859 metrics.update(self._calculate_lint_metrics(parsed_data)) 

860 metrics.update(self._calculate_security_metrics(parsed_data)) 

861 metrics.update(self._calculate_complexity_metrics(parsed_data)) 

862 

863 if stderr_content: 863 ↛ 864line 863 didn't jump to line 864 because the condition on line 863 was never true

864 metrics.update(self._parse_stderr_metrics(stderr_content)) 

865 

866 metrics["build_status"] = float(100 if exit_code == 0 else 0) 

867 

868 return metrics 

869 

870 def _calculate_test_metrics(self, parsed_data: dict[str, Any]) -> dict[str, float]: 

871 """Calculate test pass rate metrics.""" 

872 metrics = {} 

873 test_results = parsed_data.get("test_results", []) 

874 if test_results: 

875 passed = sum(1 for t in test_results if t["status"] == "passed") 

876 total = len(test_results) 

877 metrics["test_pass_rate"] = float( 

878 (passed / total) * 100 if total > 0 else 0, 

879 ) 

880 return metrics 

881 

882 def _calculate_coverage_metrics( 

883 self, parsed_data: dict[str, Any] 

884 ) -> dict[str, float]: 

885 """Calculate code coverage metrics.""" 

886 metrics = {} 

887 coverage_summary = parsed_data.get("coverage_summary", {}) 

888 if "total_coverage" in coverage_summary: 888 ↛ 889line 888 didn't jump to line 889 because the condition on line 888 was never true

889 metrics["code_coverage"] = float(coverage_summary["total_coverage"]) 

890 return metrics 

891 

892 def _calculate_lint_metrics(self, parsed_data: dict[str, Any]) -> dict[str, float]: 

893 """Calculate lint score metrics (inverted so higher is better).""" 

894 metrics = {} 

895 lint_summary = parsed_data.get("lint_summary", {}) 

896 if "total_issues" in lint_summary: 

897 total_issues = lint_summary["total_issues"] 

898 metrics["lint_score"] = float( 

899 max(0, 100 - total_issues) if total_issues < 100 else 0, 

900 ) 

901 return metrics 

902 

903 def _calculate_security_metrics( 

904 self, parsed_data: dict[str, Any] 

905 ) -> dict[str, float]: 

906 """Calculate security score metrics (inverted so higher is better).""" 

907 metrics = {} 

908 security_summary = parsed_data.get("security_summary", {}) 

909 if "total_issues" in security_summary: 

910 total_issues = security_summary["total_issues"] 

911 metrics["security_score"] = float( 

912 max(0, 100 - (total_issues * 10)) if total_issues < 10 else 0, 

913 ) 

914 return metrics 

915 

916 def _calculate_complexity_metrics( 

917 self, parsed_data: dict[str, Any] 

918 ) -> dict[str, float]: 

919 """Calculate complexity score metrics (inverted so higher is better).""" 

920 metrics = {} 

921 complexity_summary = parsed_data.get("complexity_summary", {}) 

922 if complexity_summary: 

923 total_files = complexity_summary.get("total_files", 0) 

924 high_complexity = complexity_summary.get("high_complexity_files", 0) 

925 if total_files > 0: 925 ↛ 926line 925 didn't jump to line 926 because the condition on line 925 was never true

926 complexity_rate = (high_complexity / total_files) * 100 

927 metrics["complexity_score"] = float(max(0, 100 - complexity_rate)) 

928 return metrics 

929 

930 def _parse_stderr_metrics(self, stderr_content: str) -> dict[str, float]: 

931 """Parse quality metrics from structured logging in stderr.""" 

932 metrics = {} 

933 

934 # Look for common structured logging patterns in stderr 

935 lines = stderr_content.split("\n") 

936 

937 for line in lines: 

938 # Parse structured log entries that might contain quality metrics 

939 if '"quality"' in line or '"metric"' in line or '"score"' in line: 

940 # This is a simplified approach - would in practice need to 

941 # handle the actual structured format 

942 import re 

943 

944 # Look for patterns like: "quality": value or "metric": value 

945 quality_pattern = r'"quality"\s*:\s*(\d+\.?\d*)' 

946 metric_pattern = r'"metric"\s*:\s*(\d+\.?\d*)' 

947 score_pattern = r'"score"\s*:\s*(\d+\.?\d*)' 

948 

949 quality_match = re.search(quality_pattern, line) 

950 if quality_match: 

951 metrics["parsed_quality"] = float(quality_match.group(1)) 

952 

953 metric_match = re.search(metric_pattern, line) 

954 if metric_match: 

955 metrics["parsed_metric"] = float(metric_match.group(1)) 

956 

957 score_match = re.search(score_pattern, line) 

958 if score_match: 

959 metrics["parsed_score"] = float(score_match.group(1)) 

960 

961 return metrics 

962 

963 async def _store_result(self, result_id: str, result: CrackerjackResult) -> None: 

964 """Store Crackerjack result in database.""" 

965 try: 

966 with sqlite3.connect(self.db_path) as conn: 

967 conn.execute( 

968 """ 

969 INSERT INTO crackerjack_results 

970 (id, command, exit_code, stdout, stderr, execution_time, timestamp, 

971 working_directory, parsed_data, quality_metrics, memory_insights) 

972 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 

973 """, 

974 ( 

975 result_id, 

976 result.command, 

977 result.exit_code, 

978 result.stdout, 

979 result.stderr, 

980 result.execution_time, 

981 result.timestamp.isoformat(), 

982 result.working_directory, 

983 json.dumps(result.parsed_data), 

984 json.dumps(result.quality_metrics), 

985 json.dumps(result.memory_insights), 

986 ), 

987 ) 

988 

989 # Store individual test results 

990 for test_result in result.test_results: 990 ↛ 991line 990 didn't jump to line 991 because the loop on line 990 never started

991 test_id = ( 

992 f"test_{result_id}_{hash(test_result.get('test', 'unknown'))}" 

993 ) 

994 conn.execute( 

995 """ 

996 INSERT INTO test_results 

997 (id, result_id, test_name, status, duration, file_path, timestamp) 

998 VALUES (?, ?, ?, ?, ?, ?, ?) 

999 """, 

1000 ( 

1001 test_id, 

1002 result_id, 

1003 test_result.get("test", ""), 

1004 test_result.get("status", ""), 

1005 test_result.get("duration", 0), 

1006 test_result.get("file", ""), 

1007 result.timestamp, 

1008 ), 

1009 ) 

1010 

1011 # Store quality metrics 

1012 for metric_name, metric_value in result.quality_metrics.items(): 

1013 metric_id = f"metric_{result_id}_{metric_name}" 

1014 conn.execute( 

1015 """ 

1016 INSERT INTO quality_metrics_history 

1017 (id, project_path, metric_type, metric_value, timestamp, result_id) 

1018 VALUES (?, ?, ?, ?, ?, ?) 

1019 """, 

1020 ( 

1021 metric_id, 

1022 result.working_directory, 

1023 metric_name, 

1024 metric_value, 

1025 result.timestamp.isoformat(), 

1026 result_id, 

1027 ), 

1028 ) 

1029 except Exception: 

1030 # In sandboxed/readonly environments, skip persistence 

1031 return 

1032 

1033 async def _store_progress_snapshot( 

1034 self, 

1035 result_id: str, 

1036 result: CrackerjackResult, 

1037 project_path: str, 

1038 ) -> None: 

1039 """Store progress snapshot from result.""" 

1040 progress_info: dict[str, Any] = ( 

1041 result.parsed_data.get("progress_info", {}) if result.parsed_data else {} 

1042 ) 

1043 

1044 if progress_info: 1044 ↛ exitline 1044 didn't return from function '_store_progress_snapshot' because the condition on line 1044 was always true

1045 snapshot_id = f"progress_{result_id}" 

1046 try: 

1047 with sqlite3.connect(self.db_path) as conn: 

1048 conn.execute( 

1049 """ 

1050 INSERT INTO progress_snapshots 

1051 (id, project_path, command, stage, progress_percentage, current_task, 

1052 completed_tasks, failed_tasks, quality_metrics, timestamp, memory_context) 

1053 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 

1054 """, 

1055 ( 

1056 snapshot_id, 

1057 project_path, 

1058 result.command, 

1059 progress_info.get("stage", ""), 

1060 progress_info.get("percentage", 0), 

1061 progress_info.get("current_task", ""), 

1062 json.dumps(progress_info.get("completed_tasks", [])), 

1063 json.dumps(progress_info.get("failed_tasks", [])), 

1064 json.dumps(result.quality_metrics), 

1065 result.timestamp.isoformat(), 

1066 json.dumps(result.memory_insights), 

1067 ), 

1068 ) 

1069 except Exception: 

1070 # In sandboxed/readonly environments, skip persistence 

1071 return 

1072 

1073 

1074# Global integration instance 

1075_crackerjack_integration = None 

1076 

1077 

1078def get_crackerjack_integration() -> CrackerjackIntegration: 

1079 """Get global Crackerjack integration instance.""" 

1080 global _crackerjack_integration 

1081 if _crackerjack_integration is None: 

1082 _crackerjack_integration = CrackerjackIntegration() 

1083 return _crackerjack_integration 

1084 

1085 

1086# Public API functions for MCP tools 

1087async def execute_crackerjack_command( 

1088 command: str, 

1089 args: list[str] | None = None, 

1090 working_directory: str = ".", 

1091 timeout: int = 300, 

1092 ai_agent_mode: bool = False, 

1093) -> dict[str, Any]: 

1094 """Execute Crackerjack command and return structured results.""" 

1095 integration = get_crackerjack_integration() 

1096 result = await integration.execute_crackerjack_command( 

1097 command, 

1098 args, 

1099 working_directory, 

1100 timeout, 

1101 ai_agent_mode, 

1102 ) 

1103 return asdict(result) 

1104 

1105 

1106async def get_recent_crackerjack_results( 

1107 hours: int = 24, 

1108 command: str | None = None, 

1109) -> list[dict[str, Any]]: 

1110 """Get recent Crackerjack execution results.""" 

1111 integration = get_crackerjack_integration() 

1112 return await integration.get_recent_results(hours, command) 

1113 

1114 

1115async def get_quality_metrics_history( 

1116 project_path: str, 

1117 metric_type: str | None = None, 

1118 days: int = 30, 

1119) -> list[dict[str, Any]]: 

1120 """Get quality metrics history for trend analysis.""" 

1121 integration = get_crackerjack_integration() 

1122 return await integration.get_quality_metrics_history( 

1123 project_path, 

1124 metric_type, 

1125 days, 

1126 ) 

1127 

1128 

1129async def analyze_test_failure_patterns(days: int = 7) -> dict[str, Any]: 

1130 """Analyze test failure patterns for insights.""" 

1131 integration = get_crackerjack_integration() 

1132 return await integration.get_test_failure_patterns(days) 

1133 

1134 

1135async def get_quality_trends( 

1136 project_path: str, 

1137 days: int = 30, 

1138) -> dict[str, Any]: 

1139 """Analyze quality trends over time.""" 

1140 integration = get_crackerjack_integration() 

1141 return await integration.get_quality_trends(project_path, days) 

1142 

1143 

1144async def crackerjack_health_check() -> dict[str, Any]: 

1145 """Check Crackerjack integration health and dependencies.""" 

1146 integration = get_crackerjack_integration() 

1147 return await integration.health_check()