Coverage for session_mgmt_mcp/crackerjack_integration.py: 17.72%

496 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-01 05:22 -0700

1"""Crackerjack Integration module for progress tracking and test monitoring. 

2 

3This module provides deep integration with Crackerjack for: 

4- Progress tracking output parsing for memory enrichment 

5- Test result monitoring for context enhancement 

6- Command execution with result capture 

7- Quality metrics integration 

8""" 

9 

10import asyncio 

11import json 

12import logging 

13import re 

14import sqlite3 

15import threading 

16import time 

17from dataclasses import asdict, dataclass 

18from datetime import datetime, timedelta 

19from enum import Enum 

20from pathlib import Path 

21from typing import Any 

22 

23logger = logging.getLogger(__name__) 

24 

25 

26class CrackerjackCommand(Enum): 

27 """Supported Crackerjack commands.""" 

28 

29 # Core quality commands 

30 ANALYZE = "analyze" # Comprehensive analysis command 

31 CHECK = "check" 

32 TEST = "test" 

33 LINT = "lint" 

34 FORMAT = "format" 

35 TYPECHECK = "typecheck" # Type checking support 

36 

37 # Security and complexity 

38 SECURITY = "security" 

39 COMPLEXITY = "complexity" 

40 COVERAGE = "coverage" 

41 

42 # Build and maintenance 

43 BUILD = "build" 

44 CLEAN = "clean" 

45 

46 # Documentation 

47 DOCS = "docs" 

48 

49 # Release management 

50 RELEASE = "release" # Release command support 

51 

52 

53class TestStatus(Enum): 

54 """Test execution status.""" 

55 

56 PASSED = "passed" 

57 FAILED = "failed" 

58 SKIPPED = "skipped" 

59 ERROR = "error" 

60 XFAIL = "xfail" 

61 XPASS = "xpass" 

62 

63 

64class QualityMetric(Enum): 

65 """Quality metrics tracked.""" 

66 

67 CODE_COVERAGE = "coverage" 

68 COMPLEXITY = "complexity" 

69 LINT_SCORE = "lint_score" 

70 SECURITY_SCORE = "security_score" 

71 TEST_PASS_RATE = "test_pass_rate" 

72 BUILD_STATUS = "build_status" 

73 

74 

75@dataclass 

76class CrackerjackResult: 

77 """Result of Crackerjack command execution.""" 

78 

79 command: str 

80 exit_code: int 

81 stdout: str 

82 stderr: str 

83 execution_time: float 

84 timestamp: datetime 

85 working_directory: str 

86 parsed_data: dict[str, Any] | None 

87 quality_metrics: dict[str, float] 

88 test_results: list[dict[str, Any]] 

89 memory_insights: list[str] 

90 

91 

92@dataclass 

93class TestResult: 

94 """Individual test result information.""" 

95 

96 test_id: str 

97 test_name: str 

98 status: TestStatus 

99 duration: float 

100 file_path: str 

101 line_number: int | None 

102 error_message: str | None 

103 traceback: str | None 

104 tags: list[str] 

105 coverage_data: dict[str, Any] | None 

106 

107 

108@dataclass 

109class ProgressSnapshot: 

110 """Progress tracking snapshot.""" 

111 

112 timestamp: datetime 

113 project_path: str 

114 command: str 

115 stage: str 

116 progress_percentage: float 

117 current_task: str 

118 completed_tasks: list[str] 

119 failed_tasks: list[str] 

120 quality_metrics: dict[str, float] 

121 estimated_completion: datetime | None 

122 memory_context: list[str] 

123 

124 

125class CrackerjackOutputParser: 

126 """Parses Crackerjack output for structured data extraction.""" 

127 

128 def __init__(self) -> None: 

129 """Initialize output parser.""" 

130 self.patterns = { 

131 # Test results patterns 

132 "pytest_result": re.compile( 

133 r"(\w+\.py)::\s*(\w+)\s*(PASSED|FAILED|SKIPPED|ERROR|XFAIL|XPASS)\s*(?:\[(\d+%)\])?\s*(?:\((.+)\))?", 

134 ), 

135 "pytest_summary": re.compile(r"=+ (.+) =+"), 

136 "pytest_coverage": re.compile(r"TOTAL\s+\d+\s+\d+\s+(\d+)%"), 

137 # Lint results patterns 

138 "ruff_error": re.compile(r"(\S+):(\d+):(\d+):\s*(\w+):\s*(.+)"), 

139 "pyright_error": re.compile( 

140 r"(\S+):(\d+):(\d+)\s*-\s*(error|warning|info):\s*(.+)", 

141 ), 

142 # Security patterns 

143 "bandit_issue": re.compile(r">> Issue: \[([^\]]+)\]\s*(.+)"), 

144 "bandit_severity": re.compile(r"Severity: (\w+)\s*Confidence: (\w+)"), 

145 # Complexity patterns 

146 "complexity_score": re.compile(r"(\S+)\s+(\d+)\s+(\d+\.\d+)"), 

147 # Coverage patterns 

148 "coverage_line": re.compile(r"(\S+)\s+(\d+)\s+(\d+)\s+(\d+)%"), 

149 # Progress patterns 

150 "progress_indicator": re.compile(r"(?:Progress:|Stage:|Running:)\s*(.+)"), 

151 "percentage": re.compile(r"(\d+(?:\.\d+)?)%"), 

152 "task_completion": re.compile(r"✅\s*(.+)|PASSED\s*(.+)|SUCCESS\s*(.+)"), 

153 "task_failure": re.compile(r"❌\s*(.+)|FAILED\s*(.+)|ERROR\s*(.+)"), 

154 # New crackerjack v0.31.4+ patterns 

155 "ai_agent_action": re.compile(r"🤖\s*AI Agent:\s*(.+)"), 

156 "quality_gate": re.compile(r"Quality Gate:\s*(\w+)\s*\((\d+)%\)"), 

157 "release_info": re.compile(r"Release:\s*(.+)\s*→\s*(.+)"), 

158 "typecheck_error": re.compile( 

159 r"(\S+):(\d+):(\d+)\s*-\s*(error|warning):\s*(.+)" 

160 ), 

161 # Enhanced progress patterns 

162 "stage_progress": re.compile(r"Stage\s+(\d+)/(\d+):\s*(.+)"), 

163 "eta_estimate": re.compile(r"ETA:\s*(\d+m\s*\d+s|\d+s)"), 

164 "crackerjack_stage": re.compile(r"🔧\s*(.+)\s*\.\.\.\s*(.+)"), 

165 "auto_fix": re.compile(r"🔧\s*Auto-fixing:\s*(.+)"), 

166 } 

167 

168 def parse_output( 

169 self, 

170 command: str, 

171 stdout: str, 

172 stderr: str, 

173 ) -> tuple[dict[str, Any], list[str]]: 

174 """Parse Crackerjack output and extract insights.""" 

175 parsed_data = { 

176 "command": command, 

177 "test_results": [], 

178 "lint_issues": [], 

179 "security_issues": [], 

180 "coverage_data": {}, 

181 "complexity_data": {}, 

182 "progress_info": {}, 

183 "quality_metrics": {}, 

184 } 

185 

186 memory_insights = [] 

187 

188 # Combine stdout and stderr for comprehensive parsing 

189 full_output = f"{stdout}\n{stderr}" 

190 

191 # Parse based on command type 

192 if command in ["test", "check"]: 

193 parsed_data.update(self._parse_test_output(full_output)) 

194 memory_insights.extend(self._extract_test_insights(parsed_data)) 

195 

196 if command in ["lint", "format", "check"]: 

197 parsed_data.update(self._parse_lint_output(full_output)) 

198 memory_insights.extend(self._extract_lint_insights(parsed_data)) 

199 

200 if command in ["security", "check"]: 

201 parsed_data.update(self._parse_security_output(full_output)) 

202 memory_insights.extend(self._extract_security_insights(parsed_data)) 

203 

204 if command in ["coverage", "test", "check"]: 

205 parsed_data.update(self._parse_coverage_output(full_output)) 

206 memory_insights.extend(self._extract_coverage_insights(parsed_data)) 

207 

208 if command in ["complexity", "check"]: 

209 parsed_data.update(self._parse_complexity_output(full_output)) 

210 memory_insights.extend(self._extract_complexity_insights(parsed_data)) 

211 

212 # Always parse progress information 

213 parsed_data.update(self._parse_progress_output(full_output)) 

214 memory_insights.extend(self._extract_progress_insights(parsed_data)) 

215 

216 return parsed_data, memory_insights 

217 

218 def _parse_test_output(self, output: str) -> dict[str, Any]: 

219 """Parse pytest output for test results.""" 

220 data = {"test_results": [], "test_summary": {}} 

221 

222 lines = output.split("\n") 

223 

224 for line in lines: 

225 # Test result lines 

226 match = self.patterns["pytest_result"].search(line) 

227 if match: 

228 file_path, test_name, status, coverage, duration = match.groups() 

229 data["test_results"].append( 

230 { 

231 "file": file_path, 

232 "test": test_name, 

233 "status": status.lower(), 

234 "coverage": coverage, 

235 "duration": duration, 

236 }, 

237 ) 

238 

239 # Summary lines 

240 summary_match = self.patterns["pytest_summary"].search(line) 

241 if summary_match: 

242 summary_text = summary_match.group(1) 

243 if "passed" in summary_text or "failed" in summary_text: 

244 data["test_summary"]["summary"] = summary_text 

245 

246 return data 

247 

248 def _parse_lint_output(self, output: str) -> dict[str, Any]: 

249 """Parse lint output for code quality issues.""" 

250 data = {"lint_issues": [], "lint_summary": {}} 

251 

252 lines = output.split("\n") 

253 total_errors = 0 

254 

255 for line in lines: 

256 # Ruff errors 

257 ruff_match = self.patterns["ruff_error"].search(line) 

258 if ruff_match: 

259 file_path, line_num, col_num, error_type, message = ruff_match.groups() 

260 data["lint_issues"].append( 

261 { 

262 "tool": "ruff", 

263 "file": file_path, 

264 "line": int(line_num), 

265 "column": int(col_num), 

266 "type": error_type, 

267 "message": message, 

268 }, 

269 ) 

270 total_errors += 1 

271 

272 # Pyright errors 

273 pyright_match = self.patterns["pyright_error"].search(line) 

274 if pyright_match: 

275 file_path, line_num, col_num, severity, message = pyright_match.groups() 

276 data["lint_issues"].append( 

277 { 

278 "tool": "pyright", 

279 "file": file_path, 

280 "line": int(line_num), 

281 "column": int(col_num), 

282 "type": severity, 

283 "message": message, 

284 }, 

285 ) 

286 total_errors += 1 

287 

288 data["lint_summary"] = {"total_issues": total_errors} 

289 return data 

290 

291 def _parse_security_output(self, output: str) -> dict[str, Any]: 

292 """Parse bandit security scan output.""" 

293 data = {"security_issues": [], "security_summary": {}} 

294 

295 lines = output.split("\n") 

296 current_issue = None 

297 

298 for line in lines: 

299 issue_match = self.patterns["bandit_issue"].search(line) 

300 if issue_match: 

301 issue_id, description = issue_match.groups() 

302 current_issue = { 

303 "id": issue_id, 

304 "description": description, 

305 "severity": None, 

306 "confidence": None, 

307 } 

308 data["security_issues"].append(current_issue) 

309 

310 severity_match = self.patterns["bandit_severity"].search(line) 

311 if severity_match and current_issue: 

312 severity, confidence = severity_match.groups() 

313 current_issue["severity"] = severity 

314 current_issue["confidence"] = confidence 

315 

316 data["security_summary"] = {"total_issues": len(data["security_issues"])} 

317 return data 

318 

319 def _parse_coverage_output(self, output: str) -> dict[str, Any]: 

320 """Parse coverage report output.""" 

321 data = {"coverage_data": {}, "coverage_summary": {}} 

322 

323 lines = output.split("\n") 

324 

325 for line in lines: 

326 # Individual file coverage 

327 coverage_match = self.patterns["coverage_line"].search(line) 

328 if coverage_match: 

329 file_path, statements, missing, coverage = coverage_match.groups() 

330 data["coverage_data"][file_path] = { 

331 "statements": int(statements), 

332 "missing": int(missing), 

333 "coverage": int(coverage.rstrip("%")), 

334 } 

335 

336 # Total coverage 

337 total_match = self.patterns["pytest_coverage"].search(line) 

338 if total_match: 

339 total_coverage = int(total_match.group(1)) 

340 data["coverage_summary"]["total_coverage"] = total_coverage 

341 

342 return data 

343 

344 def _parse_complexity_output(self, output: str) -> dict[str, Any]: 

345 """Parse complexity analysis output.""" 

346 data = {"complexity_data": {}, "complexity_summary": {}} 

347 

348 lines = output.split("\n") 

349 total_files = 0 

350 high_complexity = 0 

351 

352 for line in lines: 

353 complexity_match = self.patterns["complexity_score"].search(line) 

354 if complexity_match: 

355 file_path, lines_count, complexity_score = complexity_match.groups() 

356 complexity_val = float(complexity_score) 

357 data["complexity_data"][file_path] = { 

358 "lines": int(lines_count), 

359 "complexity": complexity_val, 

360 } 

361 total_files += 1 

362 if complexity_val > 10: # Configurable threshold 

363 high_complexity += 1 

364 

365 data["complexity_summary"] = { 

366 "total_files": total_files, 

367 "high_complexity_files": high_complexity, 

368 } 

369 return data 

370 

371 def _parse_progress_output(self, output: str) -> dict[str, Any]: 

372 """Parse progress indicators from output.""" 

373 data = {"progress_info": {}} 

374 

375 lines = output.split("\n") 

376 completed_tasks = [] 

377 failed_tasks = [] 

378 current_percentage = 0 

379 

380 for line in lines: 

381 # Progress indicators 

382 progress_match = self.patterns["progress_indicator"].search(line) 

383 if progress_match: 

384 data["progress_info"]["current_task"] = progress_match.group(1) 

385 

386 # Percentage completion 

387 percentage_match = self.patterns["percentage"].search(line) 

388 if percentage_match: 

389 current_percentage = float(percentage_match.group(1)) 

390 

391 # Task completions 

392 completion_match = self.patterns["task_completion"].search(line) 

393 if completion_match: 

394 task = ( 

395 completion_match.group(1) 

396 or completion_match.group(2) 

397 or completion_match.group(3) 

398 ) 

399 if task: 

400 completed_tasks.append(task.strip()) 

401 

402 # Task failures 

403 failure_match = self.patterns["task_failure"].search(line) 

404 if failure_match: 

405 task = ( 

406 failure_match.group(1) 

407 or failure_match.group(2) 

408 or failure_match.group(3) 

409 ) 

410 if task: 

411 failed_tasks.append(task.strip()) 

412 

413 data["progress_info"].update( 

414 { 

415 "percentage": current_percentage, 

416 "completed_tasks": completed_tasks, 

417 "failed_tasks": failed_tasks, 

418 }, 

419 ) 

420 

421 return data 

422 

423 def _extract_test_insights(self, parsed_data: dict[str, Any]) -> list[str]: 

424 """Extract memory insights from test results.""" 

425 insights = [] 

426 test_results = parsed_data.get("test_results", []) 

427 

428 if test_results: 

429 passed = sum(1 for t in test_results if t["status"] == "passed") 

430 failed = sum(1 for t in test_results if t["status"] == "failed") 

431 total = len(test_results) 

432 

433 if total > 0: 

434 pass_rate = (passed / total) * 100 

435 insights.append( 

436 f"Test suite: {passed}/{total} tests passed ({pass_rate:.1f}% pass rate)", 

437 ) 

438 

439 if failed > 0: 

440 failed_files = { 

441 t["file"] for t in test_results if t["status"] == "failed" 

442 } 

443 insights.append( 

444 f"Test failures found in {len(failed_files)} files: {', '.join(failed_files)}", 

445 ) 

446 

447 if pass_rate == 100: 

448 insights.append("All tests passing - code quality is stable") 

449 elif pass_rate < 80: 

450 insights.append( 

451 "Test pass rate below 80% - investigate failing tests", 

452 ) 

453 

454 return insights 

455 

456 def _extract_lint_insights(self, parsed_data: dict[str, Any]) -> list[str]: 

457 """Extract memory insights from lint results.""" 

458 insights = [] 

459 lint_issues = parsed_data.get("lint_issues", []) 

460 

461 if lint_issues: 

462 total_issues = len(lint_issues) 

463 by_type = {} 

464 by_file = {} 

465 

466 for issue in lint_issues: 

467 issue_type = issue.get("type", "unknown") 

468 file_path = issue.get("file", "unknown") 

469 

470 by_type[issue_type] = by_type.get(issue_type, 0) + 1 

471 by_file[file_path] = by_file.get(file_path, 0) + 1 

472 

473 insights.append(f"Code quality: {total_issues} lint issues found") 

474 

475 # Top issue types 

476 top_types = sorted(by_type.items(), key=lambda x: x[1], reverse=True)[:3] 

477 if top_types: 

478 type_summary = ", ".join(f"{t}: {c}" for t, c in top_types) 

479 insights.append(f"Most common issues: {type_summary}") 

480 

481 # Files needing attention 

482 top_files = sorted(by_file.items(), key=lambda x: x[1], reverse=True)[:3] 

483 if top_files and top_files[0][1] > 5: 

484 insights.append( 

485 f"Files needing attention: {top_files[0][0]} ({top_files[0][1]} issues)", 

486 ) 

487 else: 

488 insights.append("Code quality: No lint issues found - code is clean") 

489 

490 return insights 

491 

492 def _extract_security_insights(self, parsed_data: dict[str, Any]) -> list[str]: 

493 """Extract memory insights from security scan.""" 

494 insights = [] 

495 security_issues = parsed_data.get("security_issues", []) 

496 

497 if security_issues: 

498 total_issues = len(security_issues) 

499 high_severity = sum( 

500 1 for i in security_issues if i.get("severity") == "HIGH" 

501 ) 

502 

503 insights.append( 

504 f"Security scan: {total_issues} potential security issues found", 

505 ) 

506 

507 if high_severity > 0: 

508 insights.append( 

509 f"⚠️ {high_severity} high-severity security issues require immediate attention", 

510 ) 

511 else: 

512 insights.append("No high-severity security issues detected") 

513 else: 

514 insights.append( 

515 "Security scan: No security issues detected - code appears secure", 

516 ) 

517 

518 return insights 

519 

520 def _extract_coverage_insights(self, parsed_data: dict[str, Any]) -> list[str]: 

521 """Extract memory insights from coverage data.""" 

522 insights = [] 

523 coverage_summary = parsed_data.get("coverage_summary", {}) 

524 

525 if "total_coverage" in coverage_summary: 

526 coverage = coverage_summary["total_coverage"] 

527 insights.append(f"Test coverage: {coverage}% of code is covered by tests") 

528 

529 if coverage >= 90: 

530 insights.append("Excellent test coverage - code is well tested") 

531 elif coverage >= 80: 

532 insights.append("Good test coverage - consider adding more tests") 

533 elif coverage >= 60: 

534 insights.append( 

535 "Moderate test coverage - significant testing gaps exist", 

536 ) 

537 else: 

538 insights.append( 

539 "Low test coverage - critical testing gaps need attention", 

540 ) 

541 

542 return insights 

543 

544 def _extract_complexity_insights(self, parsed_data: dict[str, Any]) -> list[str]: 

545 """Extract memory insights from complexity analysis.""" 

546 insights = [] 

547 complexity_summary = parsed_data.get("complexity_summary", {}) 

548 

549 if complexity_summary: 

550 total_files = complexity_summary.get("total_files", 0) 

551 high_complexity = complexity_summary.get("high_complexity_files", 0) 

552 

553 if total_files > 0: 

554 complexity_rate = (high_complexity / total_files) * 100 

555 insights.append( 

556 f"Code complexity: {high_complexity}/{total_files} files have high complexity ({complexity_rate:.1f}%)", 

557 ) 

558 

559 if complexity_rate == 0: 

560 insights.append("Code complexity is well managed") 

561 elif complexity_rate > 20: 

562 insights.append( 

563 "Consider refactoring high-complexity files for maintainability", 

564 ) 

565 

566 return insights 

567 

568 def _extract_progress_insights(self, parsed_data: dict[str, Any]) -> list[str]: 

569 """Extract memory insights from progress information.""" 

570 insights = [] 

571 progress_info = parsed_data.get("progress_info", {}) 

572 

573 completed_tasks = progress_info.get("completed_tasks", []) 

574 failed_tasks = progress_info.get("failed_tasks", []) 

575 percentage = progress_info.get("percentage", 0) 

576 

577 if completed_tasks: 

578 insights.append(f"Progress: Completed {len(completed_tasks)} tasks") 

579 

580 if failed_tasks: 

581 insights.append( 

582 f"⚠️ {len(failed_tasks)} tasks failed: {', '.join(failed_tasks[:3])}", 

583 ) 

584 

585 if percentage > 0: 

586 insights.append(f"Overall progress: {percentage}% complete") 

587 

588 return insights 

589 

590 

591class CrackerjackIntegration: 

592 """Main integration class for Crackerjack command execution and monitoring.""" 

593 

594 def __init__(self, db_path: str | None = None) -> None: 

595 """Initialize Crackerjack integration.""" 

596 self.db_path = db_path or str( 

597 Path.home() / ".claude" / "data" / "crackerjack_integration.db", 

598 ) 

599 self.parser = CrackerjackOutputParser() 

600 self._lock = threading.Lock() 

601 self._init_database() 

602 

603 def _init_database(self) -> None: 

604 """Initialize SQLite database for Crackerjack integration.""" 

605 Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) 

606 

607 with sqlite3.connect(self.db_path) as conn: 

608 conn.execute(""" 

609 CREATE TABLE IF NOT EXISTS crackerjack_results ( 

610 id TEXT PRIMARY KEY, 

611 command TEXT NOT NULL, 

612 exit_code INTEGER, 

613 stdout TEXT, 

614 stderr TEXT, 

615 execution_time REAL, 

616 timestamp TIMESTAMP, 

617 working_directory TEXT, 

618 parsed_data TEXT, -- JSON 

619 quality_metrics TEXT, -- JSON 

620 memory_insights TEXT -- JSON array 

621 ) 

622 """) 

623 

624 conn.execute(""" 

625 CREATE TABLE IF NOT EXISTS test_results ( 

626 id TEXT PRIMARY KEY, 

627 result_id TEXT NOT NULL, 

628 test_name TEXT NOT NULL, 

629 status TEXT NOT NULL, 

630 duration REAL, 

631 file_path TEXT, 

632 line_number INTEGER, 

633 error_message TEXT, 

634 timestamp TIMESTAMP, 

635 FOREIGN KEY (result_id) REFERENCES crackerjack_results(id) 

636 ) 

637 """) 

638 

639 conn.execute(""" 

640 CREATE TABLE IF NOT EXISTS progress_snapshots ( 

641 id TEXT PRIMARY KEY, 

642 project_path TEXT NOT NULL, 

643 command TEXT NOT NULL, 

644 stage TEXT, 

645 progress_percentage REAL, 

646 current_task TEXT, 

647 completed_tasks TEXT, -- JSON array 

648 failed_tasks TEXT, -- JSON array 

649 quality_metrics TEXT, -- JSON 

650 timestamp TIMESTAMP, 

651 memory_context TEXT -- JSON array 

652 ) 

653 """) 

654 

655 conn.execute(""" 

656 CREATE TABLE IF NOT EXISTS quality_metrics_history ( 

657 id TEXT PRIMARY KEY, 

658 project_path TEXT NOT NULL, 

659 metric_type TEXT NOT NULL, 

660 metric_value REAL NOT NULL, 

661 timestamp TIMESTAMP, 

662 result_id TEXT, 

663 FOREIGN KEY (result_id) REFERENCES crackerjack_results(id) 

664 ) 

665 """) 

666 

667 # Create indices 

668 conn.execute( 

669 "CREATE INDEX IF NOT EXISTS idx_results_timestamp ON crackerjack_results(timestamp)", 

670 ) 

671 conn.execute( 

672 "CREATE INDEX IF NOT EXISTS idx_results_command ON crackerjack_results(command)", 

673 ) 

674 conn.execute( 

675 "CREATE INDEX IF NOT EXISTS idx_test_results_status ON test_results(status)", 

676 ) 

677 conn.execute( 

678 "CREATE INDEX IF NOT EXISTS idx_progress_project ON progress_snapshots(project_path)", 

679 ) 

680 conn.execute( 

681 "CREATE INDEX IF NOT EXISTS idx_metrics_type ON quality_metrics_history(metric_type)", 

682 ) 

683 

684 async def execute_crackerjack_command( 

685 self, 

686 command: str, 

687 args: list[str] | None = None, 

688 working_directory: str = ".", 

689 timeout: int = 300, 

690 ai_agent_mode: bool = False, 

691 ) -> CrackerjackResult: 

692 """Execute Crackerjack command and capture results.""" 

693 args = args or [] 

694 

695 # Add AI agent mode support 

696 if ai_agent_mode: 

697 args.append("--ai-agent") 

698 

699 full_command = ["crackerjack", command, *args] 

700 

701 start_time = time.time() 

702 result_id = f"cj_{int(start_time * 1000)}" 

703 

704 try: 

705 # Execute command 

706 process = await asyncio.create_subprocess_exec( 

707 *full_command, 

708 cwd=working_directory, 

709 stdout=asyncio.subprocess.PIPE, 

710 stderr=asyncio.subprocess.PIPE, 

711 ) 

712 

713 stdout, stderr = await asyncio.wait_for( 

714 process.communicate(), 

715 timeout=timeout, 

716 ) 

717 

718 exit_code = process.returncode 

719 execution_time = time.time() - start_time 

720 

721 # Decode output 

722 stdout_text = stdout.decode("utf-8", errors="ignore") 

723 stderr_text = stderr.decode("utf-8", errors="ignore") 

724 

725 # Parse output for insights 

726 parsed_data, memory_insights = self.parser.parse_output( 

727 command, 

728 stdout_text, 

729 stderr_text, 

730 ) 

731 

732 # Calculate quality metrics 

733 quality_metrics = self._calculate_quality_metrics(parsed_data, exit_code) 

734 

735 # Create result object 

736 result = CrackerjackResult( 

737 command=command, 

738 exit_code=exit_code, 

739 stdout=stdout_text, 

740 stderr=stderr_text, 

741 execution_time=execution_time, 

742 timestamp=datetime.now(), 

743 working_directory=working_directory, 

744 parsed_data=parsed_data, 

745 quality_metrics=quality_metrics, 

746 test_results=parsed_data.get("test_results", []), 

747 memory_insights=memory_insights, 

748 ) 

749 

750 # Store in database 

751 await self._store_result(result_id, result) 

752 

753 # Store progress snapshot 

754 await self._store_progress_snapshot(result_id, result, working_directory) 

755 

756 return result 

757 

758 except TimeoutError: 

759 execution_time = time.time() - start_time 

760 error_result = CrackerjackResult( 

761 command=command, 

762 exit_code=-1, 

763 stdout="", 

764 stderr=f"Command timed out after {timeout} seconds", 

765 execution_time=execution_time, 

766 timestamp=datetime.now(), 

767 working_directory=working_directory, 

768 parsed_data={}, 

769 quality_metrics={}, 

770 test_results=[], 

771 memory_insights=[ 

772 f"Command '{command}' timed out - consider optimizing or increasing timeout", 

773 ], 

774 ) 

775 

776 await self._store_result(result_id, error_result) 

777 return error_result 

778 

779 except Exception as e: 

780 execution_time = time.time() - start_time 

781 error_result = CrackerjackResult( 

782 command=command, 

783 exit_code=-2, 

784 stdout="", 

785 stderr=f"Execution error: {e!s}", 

786 execution_time=execution_time, 

787 timestamp=datetime.now(), 

788 working_directory=working_directory, 

789 parsed_data={}, 

790 quality_metrics={}, 

791 test_results=[], 

792 memory_insights=[f"Command '{command}' failed with error: {e!s}"], 

793 ) 

794 

795 await self._store_result(result_id, error_result) 

796 return error_result 

797 

798 async def get_recent_results( 

799 self, 

800 hours: int = 24, 

801 command: str | None = None, 

802 ) -> list[dict[str, Any]]: 

803 """Get recent Crackerjack execution results.""" 

804 since = datetime.now() - timedelta(hours=hours) 

805 

806 with sqlite3.connect(self.db_path) as conn: 

807 conn.row_factory = sqlite3.Row 

808 

809 where_conditions = ["timestamp >= ?"] 

810 params = [since] 

811 

812 if command: 

813 where_conditions.append("command = ?") 

814 params.append(command) 

815 

816 query = f""" 

817 SELECT * FROM crackerjack_results 

818 WHERE {" AND ".join(where_conditions)} 

819 ORDER BY timestamp DESC 

820 """ 

821 

822 cursor = conn.execute(query, params) 

823 results = [] 

824 

825 for row in cursor.fetchall(): 

826 result = dict(row) 

827 result["parsed_data"] = json.loads(result["parsed_data"] or "{}") 

828 result["quality_metrics"] = json.loads( 

829 result["quality_metrics"] or "{}", 

830 ) 

831 result["memory_insights"] = json.loads( 

832 result["memory_insights"] or "[]", 

833 ) 

834 results.append(result) 

835 

836 return results 

837 

838 async def get_quality_metrics_history( 

839 self, 

840 project_path: str, 

841 metric_type: str | None = None, 

842 days: int = 30, 

843 ) -> list[dict[str, Any]]: 

844 """Get quality metrics history for trend analysis.""" 

845 since = datetime.now() - timedelta(days=days) 

846 

847 with sqlite3.connect(self.db_path) as conn: 

848 conn.row_factory = sqlite3.Row 

849 

850 where_conditions = ["project_path = ?", "timestamp >= ?"] 

851 params = [project_path, since] 

852 

853 if metric_type: 

854 where_conditions.append("metric_type = ?") 

855 params.append(metric_type) 

856 

857 query = f""" 

858 SELECT * FROM quality_metrics_history 

859 WHERE {" AND ".join(where_conditions)} 

860 ORDER BY timestamp DESC 

861 """ 

862 

863 cursor = conn.execute(query, params) 

864 return [dict(row) for row in cursor.fetchall()] 

865 

866 async def get_test_failure_patterns(self, days: int = 7) -> dict[str, Any]: 

867 """Analyze test failure patterns for insights.""" 

868 since = datetime.now() - timedelta(days=days) 

869 

870 with sqlite3.connect(self.db_path) as conn: 

871 conn.row_factory = sqlite3.Row 

872 

873 # Get failed tests 

874 failed_tests = conn.execute( 

875 """ 

876 SELECT test_name, file_path, error_message, COUNT(*) as failure_count 

877 FROM test_results 

878 WHERE status = 'failed' AND timestamp >= ? 

879 GROUP BY test_name, file_path, error_message 

880 ORDER BY failure_count DESC 

881 """, 

882 (since,), 

883 ).fetchall() 

884 

885 # Get flaky tests (alternating pass/fail) 

886 flaky_tests = conn.execute( 

887 """ 

888 SELECT test_name, file_path, 

889 COUNT(DISTINCT status) as status_count, 

890 COUNT(*) as total_runs 

891 FROM test_results 

892 WHERE timestamp >= ? 

893 GROUP BY test_name, file_path 

894 HAVING status_count > 1 AND total_runs >= 3 

895 ORDER BY status_count DESC, total_runs DESC 

896 """, 

897 (since,), 

898 ).fetchall() 

899 

900 # Get most failing files 

901 failing_files = conn.execute( 

902 """ 

903 SELECT file_path, COUNT(*) as failure_count 

904 FROM test_results 

905 WHERE status = 'failed' AND timestamp >= ? 

906 GROUP BY file_path 

907 ORDER BY failure_count DESC 

908 LIMIT 10 

909 """, 

910 (since,), 

911 ).fetchall() 

912 

913 return { 

914 "failed_tests": [dict(row) for row in failed_tests], 

915 "flaky_tests": [dict(row) for row in flaky_tests], 

916 "failing_files": [dict(row) for row in failing_files], 

917 "analysis_period_days": days, 

918 } 

919 

920 async def get_quality_trends( 

921 self, 

922 project_path: str, 

923 days: int = 30, 

924 ) -> dict[str, Any]: 

925 """Analyze quality trends over time.""" 

926 metrics_history = await self.get_quality_metrics_history( 

927 project_path, None, days 

928 ) 

929 

930 # Calculate trends for each metric type 

931 trends = {} 

932 for metric_type in [ 

933 "test_pass_rate", 

934 "code_coverage", 

935 "lint_score", 

936 "security_score", 

937 "complexity_score", 

938 ]: 

939 metric_values = [ 

940 m for m in metrics_history if m["metric_type"] == metric_type 

941 ] 

942 if len(metric_values) >= 2: 

943 # Sort by timestamp (most recent first) 

944 metric_values.sort(key=lambda x: x["timestamp"], reverse=True) 

945 

946 # Split into recent and older halves 

947 mid_point = len(metric_values) // 2 

948 recent = metric_values[:mid_point] if mid_point > 0 else metric_values 

949 older = metric_values[mid_point:] if mid_point > 0 else [] 

950 

951 if recent and older: 

952 recent_avg = sum(m["metric_value"] for m in recent) / len(recent) 

953 older_avg = sum(m["metric_value"] for m in older) / len(older) 

954 change = recent_avg - older_avg 

955 

956 trends[metric_type] = { 

957 "direction": "improving" 

958 if change > 0 

959 else "declining" 

960 if change < 0 

961 else "stable", 

962 "change": abs(change), 

963 "change_percentage": (abs(change) / older_avg * 100) 

964 if older_avg > 0 

965 else 0, 

966 "recent_average": recent_avg, 

967 "previous_average": older_avg, 

968 "data_points": len(metric_values), 

969 "trend_strength": "strong" 

970 if abs(change) > 5 

971 else "moderate" 

972 if abs(change) > 1 

973 else "weak", 

974 } 

975 else: 

976 # Not enough data for trend analysis 

977 current_avg = sum(m["metric_value"] for m in metric_values) / len( 

978 metric_values 

979 ) 

980 trends[metric_type] = { 

981 "direction": "insufficient_data", 

982 "change": 0, 

983 "change_percentage": 0, 

984 "recent_average": current_avg, 

985 "previous_average": current_avg, 

986 "data_points": len(metric_values), 

987 "trend_strength": "unknown", 

988 } 

989 

990 # Overall trend assessment 

991 improving_metrics = sum( 

992 1 for t in trends.values() if t["direction"] == "improving" 

993 ) 

994 declining_metrics = sum( 

995 1 for t in trends.values() if t["direction"] == "declining" 

996 ) 

997 

998 overall_assessment = { 

999 "overall_direction": "improving" 

1000 if improving_metrics > declining_metrics 

1001 else "declining" 

1002 if declining_metrics > improving_metrics 

1003 else "stable", 

1004 "improving_count": improving_metrics, 

1005 "declining_count": declining_metrics, 

1006 "stable_count": len(trends) - improving_metrics - declining_metrics, 

1007 "analysis_period_days": days, 

1008 } 

1009 

1010 return { 

1011 "trends": trends, 

1012 "overall": overall_assessment, 

1013 "recommendations": self._generate_trend_recommendations(trends), 

1014 } 

1015 

1016 def _generate_trend_recommendations(self, trends: dict[str, Any]) -> list[str]: 

1017 """Generate recommendations based on quality trends.""" 

1018 recommendations = [] 

1019 

1020 for metric_type, trend_data in trends.items(): 

1021 direction = trend_data["direction"] 

1022 strength = trend_data["trend_strength"] 

1023 change = trend_data["change"] 

1024 

1025 if direction == "declining" and strength in ["strong", "moderate"]: 

1026 if metric_type == "test_pass_rate": 

1027 recommendations.append( 

1028 f"⚠️ Test pass rate declining by {change:.1f}% - investigate failing tests" 

1029 ) 

1030 elif metric_type == "code_coverage": 

1031 recommendations.append( 

1032 f"⚠️ Code coverage declining by {change:.1f}% - add more tests" 

1033 ) 

1034 elif metric_type == "lint_score": 

1035 recommendations.append( 

1036 "⚠️ Code quality declining - address lint issues" 

1037 ) 

1038 elif metric_type == "security_score": 

1039 recommendations.append( 

1040 "🔒 Security score declining - review security findings" 

1041 ) 

1042 elif metric_type == "complexity_score": 

1043 recommendations.append( 

1044 "🔧 Code complexity increasing - consider refactoring" 

1045 ) 

1046 

1047 elif direction == "improving" and strength == "strong": 

1048 if ( 

1049 metric_type == "test_pass_rate" 

1050 and trend_data["recent_average"] > 95 

1051 ): 

1052 recommendations.append( 

1053 "✅ Excellent test pass rate trend - maintain current practices" 

1054 ) 

1055 elif ( 

1056 metric_type == "code_coverage" and trend_data["recent_average"] > 85 

1057 ): 

1058 recommendations.append( 

1059 "✅ Great coverage improvement - continue testing efforts" 

1060 ) 

1061 

1062 # Add general recommendations 

1063 if not recommendations: 

1064 recommendations.append( 

1065 "📈 Quality metrics are stable - continue current practices" 

1066 ) 

1067 

1068 return recommendations 

1069 

1070 async def health_check(self) -> dict[str, Any]: 

1071 """Check integration health and dependencies.""" 

1072 health = { 

1073 "crackerjack_available": False, 

1074 "database_accessible": False, 

1075 "version_compatible": False, 

1076 "recommendations": [], 

1077 "status": "unhealthy", 

1078 } 

1079 

1080 try: 

1081 # Check crackerjack availability 

1082 process = await asyncio.create_subprocess_exec( 

1083 "crackerjack", 

1084 "--help", 

1085 stdout=asyncio.subprocess.DEVNULL, 

1086 stderr=asyncio.subprocess.DEVNULL, 

1087 ) 

1088 await process.communicate() 

1089 health["crackerjack_available"] = process.returncode == 0 

1090 

1091 if health["crackerjack_available"]: 

1092 health["recommendations"].append( 

1093 "✅ Crackerjack is available and responding" 

1094 ) 

1095 else: 

1096 health["recommendations"].append( 

1097 "❌ Crackerjack not available - install with 'uv add crackerjack'" 

1098 ) 

1099 

1100 # Check database accessibility 

1101 with sqlite3.connect(self.db_path) as conn: 

1102 conn.execute("SELECT 1").fetchone() 

1103 health["database_accessible"] = True 

1104 health["recommendations"].append("✅ Database connection successful") 

1105 

1106 # Check if we have any data 

1107 cursor = conn.execute("SELECT COUNT(*) FROM crackerjack_results") 

1108 result_count = cursor.fetchone()[0] 

1109 

1110 if result_count > 0: 

1111 health["recommendations"].append( 

1112 f"📊 {result_count} execution records available" 

1113 ) 

1114 else: 

1115 health["recommendations"].append( 

1116 "📝 No execution history - run some crackerjack commands" 

1117 ) 

1118 

1119 # Overall status 

1120 if health["crackerjack_available"] and health["database_accessible"]: 

1121 health["status"] = "healthy" 

1122 elif health["database_accessible"]: 

1123 health["status"] = "partial" 

1124 else: 

1125 health["status"] = "unhealthy" 

1126 

1127 except sqlite3.Error as e: 

1128 health["database_accessible"] = False 

1129 health["recommendations"].append(f"❌ Database error: {e}") 

1130 except Exception as e: 

1131 health["error"] = str(e) 

1132 health["recommendations"].append(f"❌ Health check error: {e}") 

1133 

1134 return health 

1135 

1136 def _calculate_quality_metrics( 

1137 self, 

1138 parsed_data: dict[str, Any], 

1139 exit_code: int, 

1140 ) -> dict[str, float]: 

1141 """Calculate quality metrics from parsed data.""" 

1142 metrics = {} 

1143 

1144 # Test metrics 

1145 test_results = parsed_data.get("test_results", []) 

1146 if test_results: 

1147 passed = sum(1 for t in test_results if t["status"] == "passed") 

1148 total = len(test_results) 

1149 metrics["test_pass_rate"] = (passed / total) * 100 if total > 0 else 0 

1150 

1151 # Coverage metrics 

1152 coverage_summary = parsed_data.get("coverage_summary", {}) 

1153 if "total_coverage" in coverage_summary: 

1154 metrics["code_coverage"] = float(coverage_summary["total_coverage"]) 

1155 

1156 # Lint metrics 

1157 lint_summary = parsed_data.get("lint_summary", {}) 

1158 if "total_issues" in lint_summary: 

1159 # Invert to make higher scores better 

1160 total_issues = lint_summary["total_issues"] 

1161 metrics["lint_score"] = ( 

1162 max(0, 100 - total_issues) if total_issues < 100 else 0 

1163 ) 

1164 

1165 # Security metrics 

1166 security_summary = parsed_data.get("security_summary", {}) 

1167 if "total_issues" in security_summary: 

1168 total_issues = security_summary["total_issues"] 

1169 metrics["security_score"] = ( 

1170 max(0, 100 - (total_issues * 10)) if total_issues < 10 else 0 

1171 ) 

1172 

1173 # Complexity metrics 

1174 complexity_summary = parsed_data.get("complexity_summary", {}) 

1175 if complexity_summary: 

1176 total_files = complexity_summary.get("total_files", 0) 

1177 high_complexity = complexity_summary.get("high_complexity_files", 0) 

1178 if total_files > 0: 

1179 complexity_rate = (high_complexity / total_files) * 100 

1180 metrics["complexity_score"] = max(0, 100 - complexity_rate) 

1181 

1182 # Overall build status 

1183 metrics["build_status"] = 100 if exit_code == 0 else 0 

1184 

1185 return metrics 

1186 

1187 async def _store_result(self, result_id: str, result: CrackerjackResult) -> None: 

1188 """Store Crackerjack result in database.""" 

1189 with sqlite3.connect(self.db_path) as conn: 

1190 conn.execute( 

1191 """ 

1192 INSERT INTO crackerjack_results 

1193 (id, command, exit_code, stdout, stderr, execution_time, timestamp, 

1194 working_directory, parsed_data, quality_metrics, memory_insights) 

1195 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 

1196 """, 

1197 ( 

1198 result_id, 

1199 result.command, 

1200 result.exit_code, 

1201 result.stdout, 

1202 result.stderr, 

1203 result.execution_time, 

1204 result.timestamp, 

1205 result.working_directory, 

1206 json.dumps(result.parsed_data), 

1207 json.dumps(result.quality_metrics), 

1208 json.dumps(result.memory_insights), 

1209 ), 

1210 ) 

1211 

1212 # Store individual test results 

1213 for test_result in result.test_results: 

1214 test_id = f"test_{result_id}_{hash(test_result.get('test', 'unknown'))}" 

1215 conn.execute( 

1216 """ 

1217 INSERT INTO test_results 

1218 (id, result_id, test_name, status, duration, file_path, timestamp) 

1219 VALUES (?, ?, ?, ?, ?, ?, ?) 

1220 """, 

1221 ( 

1222 test_id, 

1223 result_id, 

1224 test_result.get("test", ""), 

1225 test_result.get("status", ""), 

1226 test_result.get("duration", 0), 

1227 test_result.get("file", ""), 

1228 result.timestamp, 

1229 ), 

1230 ) 

1231 

1232 # Store quality metrics 

1233 for metric_name, metric_value in result.quality_metrics.items(): 

1234 metric_id = f"metric_{result_id}_{metric_name}" 

1235 conn.execute( 

1236 """ 

1237 INSERT INTO quality_metrics_history 

1238 (id, project_path, metric_type, metric_value, timestamp, result_id) 

1239 VALUES (?, ?, ?, ?, ?, ?) 

1240 """, 

1241 ( 

1242 metric_id, 

1243 result.working_directory, 

1244 metric_name, 

1245 metric_value, 

1246 result.timestamp, 

1247 result_id, 

1248 ), 

1249 ) 

1250 

1251 async def _store_progress_snapshot( 

1252 self, 

1253 result_id: str, 

1254 result: CrackerjackResult, 

1255 project_path: str, 

1256 ) -> None: 

1257 """Store progress snapshot from result.""" 

1258 progress_info = result.parsed_data.get("progress_info", {}) 

1259 

1260 if progress_info: 

1261 snapshot_id = f"progress_{result_id}" 

1262 

1263 with sqlite3.connect(self.db_path) as conn: 

1264 conn.execute( 

1265 """ 

1266 INSERT INTO progress_snapshots 

1267 (id, project_path, command, stage, progress_percentage, current_task, 

1268 completed_tasks, failed_tasks, quality_metrics, timestamp, memory_context) 

1269 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 

1270 """, 

1271 ( 

1272 snapshot_id, 

1273 project_path, 

1274 result.command, 

1275 progress_info.get("stage", ""), 

1276 progress_info.get("percentage", 0), 

1277 progress_info.get("current_task", ""), 

1278 json.dumps(progress_info.get("completed_tasks", [])), 

1279 json.dumps(progress_info.get("failed_tasks", [])), 

1280 json.dumps(result.quality_metrics), 

1281 result.timestamp, 

1282 json.dumps(result.memory_insights), 

1283 ), 

1284 ) 

1285 

1286 

1287# Global integration instance 

1288_crackerjack_integration = None 

1289 

1290 

1291def get_crackerjack_integration() -> CrackerjackIntegration: 

1292 """Get global Crackerjack integration instance.""" 

1293 global _crackerjack_integration 

1294 if _crackerjack_integration is None: 

1295 _crackerjack_integration = CrackerjackIntegration() 

1296 return _crackerjack_integration 

1297 

1298 

1299# Public API functions for MCP tools 

1300async def execute_crackerjack_command( 

1301 command: str, 

1302 args: list[str] | None = None, 

1303 working_directory: str = ".", 

1304 timeout: int = 300, 

1305 ai_agent_mode: bool = False, 

1306) -> dict[str, Any]: 

1307 """Execute Crackerjack command and return structured results.""" 

1308 integration = get_crackerjack_integration() 

1309 result = await integration.execute_crackerjack_command( 

1310 command, 

1311 args, 

1312 working_directory, 

1313 timeout, 

1314 ai_agent_mode, 

1315 ) 

1316 return asdict(result) 

1317 

1318 

1319async def get_recent_crackerjack_results( 

1320 hours: int = 24, 

1321 command: str | None = None, 

1322) -> list[dict[str, Any]]: 

1323 """Get recent Crackerjack execution results.""" 

1324 integration = get_crackerjack_integration() 

1325 return await integration.get_recent_results(hours, command) 

1326 

1327 

1328async def get_quality_metrics_history( 

1329 project_path: str, 

1330 metric_type: str | None = None, 

1331 days: int = 30, 

1332) -> list[dict[str, Any]]: 

1333 """Get quality metrics history for trend analysis.""" 

1334 integration = get_crackerjack_integration() 

1335 return await integration.get_quality_metrics_history( 

1336 project_path, 

1337 metric_type, 

1338 days, 

1339 ) 

1340 

1341 

1342async def analyze_test_failure_patterns(days: int = 7) -> dict[str, Any]: 

1343 """Analyze test failure patterns for insights.""" 

1344 integration = get_crackerjack_integration() 

1345 return await integration.get_test_failure_patterns(days) 

1346 

1347 

1348async def get_quality_trends( 

1349 project_path: str, 

1350 days: int = 30, 

1351) -> dict[str, Any]: 

1352 """Analyze quality trends over time.""" 

1353 integration = get_crackerjack_integration() 

1354 return await integration.get_quality_trends(project_path, days) 

1355 

1356 

1357async def crackerjack_health_check() -> dict[str, Any]: 

1358 """Check Crackerjack integration health and dependencies.""" 

1359 integration = get_crackerjack_integration() 

1360 return await integration.health_check()