Coverage for session_mgmt_mcp/crackerjack_integration.py: 17.72%
496 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 05:22 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 05:22 -0700
1"""Crackerjack Integration module for progress tracking and test monitoring.
3This module provides deep integration with Crackerjack for:
4- Progress tracking output parsing for memory enrichment
5- Test result monitoring for context enhancement
6- Command execution with result capture
7- Quality metrics integration
8"""
10import asyncio
11import json
12import logging
13import re
14import sqlite3
15import threading
16import time
17from dataclasses import asdict, dataclass
18from datetime import datetime, timedelta
19from enum import Enum
20from pathlib import Path
21from typing import Any
23logger = logging.getLogger(__name__)
26class CrackerjackCommand(Enum):
27 """Supported Crackerjack commands."""
29 # Core quality commands
30 ANALYZE = "analyze" # Comprehensive analysis command
31 CHECK = "check"
32 TEST = "test"
33 LINT = "lint"
34 FORMAT = "format"
35 TYPECHECK = "typecheck" # Type checking support
37 # Security and complexity
38 SECURITY = "security"
39 COMPLEXITY = "complexity"
40 COVERAGE = "coverage"
42 # Build and maintenance
43 BUILD = "build"
44 CLEAN = "clean"
46 # Documentation
47 DOCS = "docs"
49 # Release management
50 RELEASE = "release" # Release command support
53class TestStatus(Enum):
54 """Test execution status."""
56 PASSED = "passed"
57 FAILED = "failed"
58 SKIPPED = "skipped"
59 ERROR = "error"
60 XFAIL = "xfail"
61 XPASS = "xpass"
64class QualityMetric(Enum):
65 """Quality metrics tracked."""
67 CODE_COVERAGE = "coverage"
68 COMPLEXITY = "complexity"
69 LINT_SCORE = "lint_score"
70 SECURITY_SCORE = "security_score"
71 TEST_PASS_RATE = "test_pass_rate"
72 BUILD_STATUS = "build_status"
75@dataclass
76class CrackerjackResult:
77 """Result of Crackerjack command execution."""
79 command: str
80 exit_code: int
81 stdout: str
82 stderr: str
83 execution_time: float
84 timestamp: datetime
85 working_directory: str
86 parsed_data: dict[str, Any] | None
87 quality_metrics: dict[str, float]
88 test_results: list[dict[str, Any]]
89 memory_insights: list[str]
92@dataclass
93class TestResult:
94 """Individual test result information."""
96 test_id: str
97 test_name: str
98 status: TestStatus
99 duration: float
100 file_path: str
101 line_number: int | None
102 error_message: str | None
103 traceback: str | None
104 tags: list[str]
105 coverage_data: dict[str, Any] | None
108@dataclass
109class ProgressSnapshot:
110 """Progress tracking snapshot."""
112 timestamp: datetime
113 project_path: str
114 command: str
115 stage: str
116 progress_percentage: float
117 current_task: str
118 completed_tasks: list[str]
119 failed_tasks: list[str]
120 quality_metrics: dict[str, float]
121 estimated_completion: datetime | None
122 memory_context: list[str]
125class CrackerjackOutputParser:
126 """Parses Crackerjack output for structured data extraction."""
128 def __init__(self) -> None:
129 """Initialize output parser."""
130 self.patterns = {
131 # Test results patterns
132 "pytest_result": re.compile(
133 r"(\w+\.py)::\s*(\w+)\s*(PASSED|FAILED|SKIPPED|ERROR|XFAIL|XPASS)\s*(?:\[(\d+%)\])?\s*(?:\((.+)\))?",
134 ),
135 "pytest_summary": re.compile(r"=+ (.+) =+"),
136 "pytest_coverage": re.compile(r"TOTAL\s+\d+\s+\d+\s+(\d+)%"),
137 # Lint results patterns
138 "ruff_error": re.compile(r"(\S+):(\d+):(\d+):\s*(\w+):\s*(.+)"),
139 "pyright_error": re.compile(
140 r"(\S+):(\d+):(\d+)\s*-\s*(error|warning|info):\s*(.+)",
141 ),
142 # Security patterns
143 "bandit_issue": re.compile(r">> Issue: \[([^\]]+)\]\s*(.+)"),
144 "bandit_severity": re.compile(r"Severity: (\w+)\s*Confidence: (\w+)"),
145 # Complexity patterns
146 "complexity_score": re.compile(r"(\S+)\s+(\d+)\s+(\d+\.\d+)"),
147 # Coverage patterns
148 "coverage_line": re.compile(r"(\S+)\s+(\d+)\s+(\d+)\s+(\d+)%"),
149 # Progress patterns
150 "progress_indicator": re.compile(r"(?:Progress:|Stage:|Running:)\s*(.+)"),
151 "percentage": re.compile(r"(\d+(?:\.\d+)?)%"),
152 "task_completion": re.compile(r"✅\s*(.+)|PASSED\s*(.+)|SUCCESS\s*(.+)"),
153 "task_failure": re.compile(r"❌\s*(.+)|FAILED\s*(.+)|ERROR\s*(.+)"),
154 # New crackerjack v0.31.4+ patterns
155 "ai_agent_action": re.compile(r"🤖\s*AI Agent:\s*(.+)"),
156 "quality_gate": re.compile(r"Quality Gate:\s*(\w+)\s*\((\d+)%\)"),
157 "release_info": re.compile(r"Release:\s*(.+)\s*→\s*(.+)"),
158 "typecheck_error": re.compile(
159 r"(\S+):(\d+):(\d+)\s*-\s*(error|warning):\s*(.+)"
160 ),
161 # Enhanced progress patterns
162 "stage_progress": re.compile(r"Stage\s+(\d+)/(\d+):\s*(.+)"),
163 "eta_estimate": re.compile(r"ETA:\s*(\d+m\s*\d+s|\d+s)"),
164 "crackerjack_stage": re.compile(r"🔧\s*(.+)\s*\.\.\.\s*(.+)"),
165 "auto_fix": re.compile(r"🔧\s*Auto-fixing:\s*(.+)"),
166 }
168 def parse_output(
169 self,
170 command: str,
171 stdout: str,
172 stderr: str,
173 ) -> tuple[dict[str, Any], list[str]]:
174 """Parse Crackerjack output and extract insights."""
175 parsed_data = {
176 "command": command,
177 "test_results": [],
178 "lint_issues": [],
179 "security_issues": [],
180 "coverage_data": {},
181 "complexity_data": {},
182 "progress_info": {},
183 "quality_metrics": {},
184 }
186 memory_insights = []
188 # Combine stdout and stderr for comprehensive parsing
189 full_output = f"{stdout}\n{stderr}"
191 # Parse based on command type
192 if command in ["test", "check"]:
193 parsed_data.update(self._parse_test_output(full_output))
194 memory_insights.extend(self._extract_test_insights(parsed_data))
196 if command in ["lint", "format", "check"]:
197 parsed_data.update(self._parse_lint_output(full_output))
198 memory_insights.extend(self._extract_lint_insights(parsed_data))
200 if command in ["security", "check"]:
201 parsed_data.update(self._parse_security_output(full_output))
202 memory_insights.extend(self._extract_security_insights(parsed_data))
204 if command in ["coverage", "test", "check"]:
205 parsed_data.update(self._parse_coverage_output(full_output))
206 memory_insights.extend(self._extract_coverage_insights(parsed_data))
208 if command in ["complexity", "check"]:
209 parsed_data.update(self._parse_complexity_output(full_output))
210 memory_insights.extend(self._extract_complexity_insights(parsed_data))
212 # Always parse progress information
213 parsed_data.update(self._parse_progress_output(full_output))
214 memory_insights.extend(self._extract_progress_insights(parsed_data))
216 return parsed_data, memory_insights
218 def _parse_test_output(self, output: str) -> dict[str, Any]:
219 """Parse pytest output for test results."""
220 data = {"test_results": [], "test_summary": {}}
222 lines = output.split("\n")
224 for line in lines:
225 # Test result lines
226 match = self.patterns["pytest_result"].search(line)
227 if match:
228 file_path, test_name, status, coverage, duration = match.groups()
229 data["test_results"].append(
230 {
231 "file": file_path,
232 "test": test_name,
233 "status": status.lower(),
234 "coverage": coverage,
235 "duration": duration,
236 },
237 )
239 # Summary lines
240 summary_match = self.patterns["pytest_summary"].search(line)
241 if summary_match:
242 summary_text = summary_match.group(1)
243 if "passed" in summary_text or "failed" in summary_text:
244 data["test_summary"]["summary"] = summary_text
246 return data
248 def _parse_lint_output(self, output: str) -> dict[str, Any]:
249 """Parse lint output for code quality issues."""
250 data = {"lint_issues": [], "lint_summary": {}}
252 lines = output.split("\n")
253 total_errors = 0
255 for line in lines:
256 # Ruff errors
257 ruff_match = self.patterns["ruff_error"].search(line)
258 if ruff_match:
259 file_path, line_num, col_num, error_type, message = ruff_match.groups()
260 data["lint_issues"].append(
261 {
262 "tool": "ruff",
263 "file": file_path,
264 "line": int(line_num),
265 "column": int(col_num),
266 "type": error_type,
267 "message": message,
268 },
269 )
270 total_errors += 1
272 # Pyright errors
273 pyright_match = self.patterns["pyright_error"].search(line)
274 if pyright_match:
275 file_path, line_num, col_num, severity, message = pyright_match.groups()
276 data["lint_issues"].append(
277 {
278 "tool": "pyright",
279 "file": file_path,
280 "line": int(line_num),
281 "column": int(col_num),
282 "type": severity,
283 "message": message,
284 },
285 )
286 total_errors += 1
288 data["lint_summary"] = {"total_issues": total_errors}
289 return data
291 def _parse_security_output(self, output: str) -> dict[str, Any]:
292 """Parse bandit security scan output."""
293 data = {"security_issues": [], "security_summary": {}}
295 lines = output.split("\n")
296 current_issue = None
298 for line in lines:
299 issue_match = self.patterns["bandit_issue"].search(line)
300 if issue_match:
301 issue_id, description = issue_match.groups()
302 current_issue = {
303 "id": issue_id,
304 "description": description,
305 "severity": None,
306 "confidence": None,
307 }
308 data["security_issues"].append(current_issue)
310 severity_match = self.patterns["bandit_severity"].search(line)
311 if severity_match and current_issue:
312 severity, confidence = severity_match.groups()
313 current_issue["severity"] = severity
314 current_issue["confidence"] = confidence
316 data["security_summary"] = {"total_issues": len(data["security_issues"])}
317 return data
319 def _parse_coverage_output(self, output: str) -> dict[str, Any]:
320 """Parse coverage report output."""
321 data = {"coverage_data": {}, "coverage_summary": {}}
323 lines = output.split("\n")
325 for line in lines:
326 # Individual file coverage
327 coverage_match = self.patterns["coverage_line"].search(line)
328 if coverage_match:
329 file_path, statements, missing, coverage = coverage_match.groups()
330 data["coverage_data"][file_path] = {
331 "statements": int(statements),
332 "missing": int(missing),
333 "coverage": int(coverage.rstrip("%")),
334 }
336 # Total coverage
337 total_match = self.patterns["pytest_coverage"].search(line)
338 if total_match:
339 total_coverage = int(total_match.group(1))
340 data["coverage_summary"]["total_coverage"] = total_coverage
342 return data
344 def _parse_complexity_output(self, output: str) -> dict[str, Any]:
345 """Parse complexity analysis output."""
346 data = {"complexity_data": {}, "complexity_summary": {}}
348 lines = output.split("\n")
349 total_files = 0
350 high_complexity = 0
352 for line in lines:
353 complexity_match = self.patterns["complexity_score"].search(line)
354 if complexity_match:
355 file_path, lines_count, complexity_score = complexity_match.groups()
356 complexity_val = float(complexity_score)
357 data["complexity_data"][file_path] = {
358 "lines": int(lines_count),
359 "complexity": complexity_val,
360 }
361 total_files += 1
362 if complexity_val > 10: # Configurable threshold
363 high_complexity += 1
365 data["complexity_summary"] = {
366 "total_files": total_files,
367 "high_complexity_files": high_complexity,
368 }
369 return data
371 def _parse_progress_output(self, output: str) -> dict[str, Any]:
372 """Parse progress indicators from output."""
373 data = {"progress_info": {}}
375 lines = output.split("\n")
376 completed_tasks = []
377 failed_tasks = []
378 current_percentage = 0
380 for line in lines:
381 # Progress indicators
382 progress_match = self.patterns["progress_indicator"].search(line)
383 if progress_match:
384 data["progress_info"]["current_task"] = progress_match.group(1)
386 # Percentage completion
387 percentage_match = self.patterns["percentage"].search(line)
388 if percentage_match:
389 current_percentage = float(percentage_match.group(1))
391 # Task completions
392 completion_match = self.patterns["task_completion"].search(line)
393 if completion_match:
394 task = (
395 completion_match.group(1)
396 or completion_match.group(2)
397 or completion_match.group(3)
398 )
399 if task:
400 completed_tasks.append(task.strip())
402 # Task failures
403 failure_match = self.patterns["task_failure"].search(line)
404 if failure_match:
405 task = (
406 failure_match.group(1)
407 or failure_match.group(2)
408 or failure_match.group(3)
409 )
410 if task:
411 failed_tasks.append(task.strip())
413 data["progress_info"].update(
414 {
415 "percentage": current_percentage,
416 "completed_tasks": completed_tasks,
417 "failed_tasks": failed_tasks,
418 },
419 )
421 return data
423 def _extract_test_insights(self, parsed_data: dict[str, Any]) -> list[str]:
424 """Extract memory insights from test results."""
425 insights = []
426 test_results = parsed_data.get("test_results", [])
428 if test_results:
429 passed = sum(1 for t in test_results if t["status"] == "passed")
430 failed = sum(1 for t in test_results if t["status"] == "failed")
431 total = len(test_results)
433 if total > 0:
434 pass_rate = (passed / total) * 100
435 insights.append(
436 f"Test suite: {passed}/{total} tests passed ({pass_rate:.1f}% pass rate)",
437 )
439 if failed > 0:
440 failed_files = {
441 t["file"] for t in test_results if t["status"] == "failed"
442 }
443 insights.append(
444 f"Test failures found in {len(failed_files)} files: {', '.join(failed_files)}",
445 )
447 if pass_rate == 100:
448 insights.append("All tests passing - code quality is stable")
449 elif pass_rate < 80:
450 insights.append(
451 "Test pass rate below 80% - investigate failing tests",
452 )
454 return insights
456 def _extract_lint_insights(self, parsed_data: dict[str, Any]) -> list[str]:
457 """Extract memory insights from lint results."""
458 insights = []
459 lint_issues = parsed_data.get("lint_issues", [])
461 if lint_issues:
462 total_issues = len(lint_issues)
463 by_type = {}
464 by_file = {}
466 for issue in lint_issues:
467 issue_type = issue.get("type", "unknown")
468 file_path = issue.get("file", "unknown")
470 by_type[issue_type] = by_type.get(issue_type, 0) + 1
471 by_file[file_path] = by_file.get(file_path, 0) + 1
473 insights.append(f"Code quality: {total_issues} lint issues found")
475 # Top issue types
476 top_types = sorted(by_type.items(), key=lambda x: x[1], reverse=True)[:3]
477 if top_types:
478 type_summary = ", ".join(f"{t}: {c}" for t, c in top_types)
479 insights.append(f"Most common issues: {type_summary}")
481 # Files needing attention
482 top_files = sorted(by_file.items(), key=lambda x: x[1], reverse=True)[:3]
483 if top_files and top_files[0][1] > 5:
484 insights.append(
485 f"Files needing attention: {top_files[0][0]} ({top_files[0][1]} issues)",
486 )
487 else:
488 insights.append("Code quality: No lint issues found - code is clean")
490 return insights
492 def _extract_security_insights(self, parsed_data: dict[str, Any]) -> list[str]:
493 """Extract memory insights from security scan."""
494 insights = []
495 security_issues = parsed_data.get("security_issues", [])
497 if security_issues:
498 total_issues = len(security_issues)
499 high_severity = sum(
500 1 for i in security_issues if i.get("severity") == "HIGH"
501 )
503 insights.append(
504 f"Security scan: {total_issues} potential security issues found",
505 )
507 if high_severity > 0:
508 insights.append(
509 f"⚠️ {high_severity} high-severity security issues require immediate attention",
510 )
511 else:
512 insights.append("No high-severity security issues detected")
513 else:
514 insights.append(
515 "Security scan: No security issues detected - code appears secure",
516 )
518 return insights
520 def _extract_coverage_insights(self, parsed_data: dict[str, Any]) -> list[str]:
521 """Extract memory insights from coverage data."""
522 insights = []
523 coverage_summary = parsed_data.get("coverage_summary", {})
525 if "total_coverage" in coverage_summary:
526 coverage = coverage_summary["total_coverage"]
527 insights.append(f"Test coverage: {coverage}% of code is covered by tests")
529 if coverage >= 90:
530 insights.append("Excellent test coverage - code is well tested")
531 elif coverage >= 80:
532 insights.append("Good test coverage - consider adding more tests")
533 elif coverage >= 60:
534 insights.append(
535 "Moderate test coverage - significant testing gaps exist",
536 )
537 else:
538 insights.append(
539 "Low test coverage - critical testing gaps need attention",
540 )
542 return insights
544 def _extract_complexity_insights(self, parsed_data: dict[str, Any]) -> list[str]:
545 """Extract memory insights from complexity analysis."""
546 insights = []
547 complexity_summary = parsed_data.get("complexity_summary", {})
549 if complexity_summary:
550 total_files = complexity_summary.get("total_files", 0)
551 high_complexity = complexity_summary.get("high_complexity_files", 0)
553 if total_files > 0:
554 complexity_rate = (high_complexity / total_files) * 100
555 insights.append(
556 f"Code complexity: {high_complexity}/{total_files} files have high complexity ({complexity_rate:.1f}%)",
557 )
559 if complexity_rate == 0:
560 insights.append("Code complexity is well managed")
561 elif complexity_rate > 20:
562 insights.append(
563 "Consider refactoring high-complexity files for maintainability",
564 )
566 return insights
568 def _extract_progress_insights(self, parsed_data: dict[str, Any]) -> list[str]:
569 """Extract memory insights from progress information."""
570 insights = []
571 progress_info = parsed_data.get("progress_info", {})
573 completed_tasks = progress_info.get("completed_tasks", [])
574 failed_tasks = progress_info.get("failed_tasks", [])
575 percentage = progress_info.get("percentage", 0)
577 if completed_tasks:
578 insights.append(f"Progress: Completed {len(completed_tasks)} tasks")
580 if failed_tasks:
581 insights.append(
582 f"⚠️ {len(failed_tasks)} tasks failed: {', '.join(failed_tasks[:3])}",
583 )
585 if percentage > 0:
586 insights.append(f"Overall progress: {percentage}% complete")
588 return insights
591class CrackerjackIntegration:
592 """Main integration class for Crackerjack command execution and monitoring."""
594 def __init__(self, db_path: str | None = None) -> None:
595 """Initialize Crackerjack integration."""
596 self.db_path = db_path or str(
597 Path.home() / ".claude" / "data" / "crackerjack_integration.db",
598 )
599 self.parser = CrackerjackOutputParser()
600 self._lock = threading.Lock()
601 self._init_database()
603 def _init_database(self) -> None:
604 """Initialize SQLite database for Crackerjack integration."""
605 Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
607 with sqlite3.connect(self.db_path) as conn:
608 conn.execute("""
609 CREATE TABLE IF NOT EXISTS crackerjack_results (
610 id TEXT PRIMARY KEY,
611 command TEXT NOT NULL,
612 exit_code INTEGER,
613 stdout TEXT,
614 stderr TEXT,
615 execution_time REAL,
616 timestamp TIMESTAMP,
617 working_directory TEXT,
618 parsed_data TEXT, -- JSON
619 quality_metrics TEXT, -- JSON
620 memory_insights TEXT -- JSON array
621 )
622 """)
624 conn.execute("""
625 CREATE TABLE IF NOT EXISTS test_results (
626 id TEXT PRIMARY KEY,
627 result_id TEXT NOT NULL,
628 test_name TEXT NOT NULL,
629 status TEXT NOT NULL,
630 duration REAL,
631 file_path TEXT,
632 line_number INTEGER,
633 error_message TEXT,
634 timestamp TIMESTAMP,
635 FOREIGN KEY (result_id) REFERENCES crackerjack_results(id)
636 )
637 """)
639 conn.execute("""
640 CREATE TABLE IF NOT EXISTS progress_snapshots (
641 id TEXT PRIMARY KEY,
642 project_path TEXT NOT NULL,
643 command TEXT NOT NULL,
644 stage TEXT,
645 progress_percentage REAL,
646 current_task TEXT,
647 completed_tasks TEXT, -- JSON array
648 failed_tasks TEXT, -- JSON array
649 quality_metrics TEXT, -- JSON
650 timestamp TIMESTAMP,
651 memory_context TEXT -- JSON array
652 )
653 """)
655 conn.execute("""
656 CREATE TABLE IF NOT EXISTS quality_metrics_history (
657 id TEXT PRIMARY KEY,
658 project_path TEXT NOT NULL,
659 metric_type TEXT NOT NULL,
660 metric_value REAL NOT NULL,
661 timestamp TIMESTAMP,
662 result_id TEXT,
663 FOREIGN KEY (result_id) REFERENCES crackerjack_results(id)
664 )
665 """)
667 # Create indices
668 conn.execute(
669 "CREATE INDEX IF NOT EXISTS idx_results_timestamp ON crackerjack_results(timestamp)",
670 )
671 conn.execute(
672 "CREATE INDEX IF NOT EXISTS idx_results_command ON crackerjack_results(command)",
673 )
674 conn.execute(
675 "CREATE INDEX IF NOT EXISTS idx_test_results_status ON test_results(status)",
676 )
677 conn.execute(
678 "CREATE INDEX IF NOT EXISTS idx_progress_project ON progress_snapshots(project_path)",
679 )
680 conn.execute(
681 "CREATE INDEX IF NOT EXISTS idx_metrics_type ON quality_metrics_history(metric_type)",
682 )
684 async def execute_crackerjack_command(
685 self,
686 command: str,
687 args: list[str] | None = None,
688 working_directory: str = ".",
689 timeout: int = 300,
690 ai_agent_mode: bool = False,
691 ) -> CrackerjackResult:
692 """Execute Crackerjack command and capture results."""
693 args = args or []
695 # Add AI agent mode support
696 if ai_agent_mode:
697 args.append("--ai-agent")
699 full_command = ["crackerjack", command, *args]
701 start_time = time.time()
702 result_id = f"cj_{int(start_time * 1000)}"
704 try:
705 # Execute command
706 process = await asyncio.create_subprocess_exec(
707 *full_command,
708 cwd=working_directory,
709 stdout=asyncio.subprocess.PIPE,
710 stderr=asyncio.subprocess.PIPE,
711 )
713 stdout, stderr = await asyncio.wait_for(
714 process.communicate(),
715 timeout=timeout,
716 )
718 exit_code = process.returncode
719 execution_time = time.time() - start_time
721 # Decode output
722 stdout_text = stdout.decode("utf-8", errors="ignore")
723 stderr_text = stderr.decode("utf-8", errors="ignore")
725 # Parse output for insights
726 parsed_data, memory_insights = self.parser.parse_output(
727 command,
728 stdout_text,
729 stderr_text,
730 )
732 # Calculate quality metrics
733 quality_metrics = self._calculate_quality_metrics(parsed_data, exit_code)
735 # Create result object
736 result = CrackerjackResult(
737 command=command,
738 exit_code=exit_code,
739 stdout=stdout_text,
740 stderr=stderr_text,
741 execution_time=execution_time,
742 timestamp=datetime.now(),
743 working_directory=working_directory,
744 parsed_data=parsed_data,
745 quality_metrics=quality_metrics,
746 test_results=parsed_data.get("test_results", []),
747 memory_insights=memory_insights,
748 )
750 # Store in database
751 await self._store_result(result_id, result)
753 # Store progress snapshot
754 await self._store_progress_snapshot(result_id, result, working_directory)
756 return result
758 except TimeoutError:
759 execution_time = time.time() - start_time
760 error_result = CrackerjackResult(
761 command=command,
762 exit_code=-1,
763 stdout="",
764 stderr=f"Command timed out after {timeout} seconds",
765 execution_time=execution_time,
766 timestamp=datetime.now(),
767 working_directory=working_directory,
768 parsed_data={},
769 quality_metrics={},
770 test_results=[],
771 memory_insights=[
772 f"Command '{command}' timed out - consider optimizing or increasing timeout",
773 ],
774 )
776 await self._store_result(result_id, error_result)
777 return error_result
779 except Exception as e:
780 execution_time = time.time() - start_time
781 error_result = CrackerjackResult(
782 command=command,
783 exit_code=-2,
784 stdout="",
785 stderr=f"Execution error: {e!s}",
786 execution_time=execution_time,
787 timestamp=datetime.now(),
788 working_directory=working_directory,
789 parsed_data={},
790 quality_metrics={},
791 test_results=[],
792 memory_insights=[f"Command '{command}' failed with error: {e!s}"],
793 )
795 await self._store_result(result_id, error_result)
796 return error_result
798 async def get_recent_results(
799 self,
800 hours: int = 24,
801 command: str | None = None,
802 ) -> list[dict[str, Any]]:
803 """Get recent Crackerjack execution results."""
804 since = datetime.now() - timedelta(hours=hours)
806 with sqlite3.connect(self.db_path) as conn:
807 conn.row_factory = sqlite3.Row
809 where_conditions = ["timestamp >= ?"]
810 params = [since]
812 if command:
813 where_conditions.append("command = ?")
814 params.append(command)
816 query = f"""
817 SELECT * FROM crackerjack_results
818 WHERE {" AND ".join(where_conditions)}
819 ORDER BY timestamp DESC
820 """
822 cursor = conn.execute(query, params)
823 results = []
825 for row in cursor.fetchall():
826 result = dict(row)
827 result["parsed_data"] = json.loads(result["parsed_data"] or "{}")
828 result["quality_metrics"] = json.loads(
829 result["quality_metrics"] or "{}",
830 )
831 result["memory_insights"] = json.loads(
832 result["memory_insights"] or "[]",
833 )
834 results.append(result)
836 return results
838 async def get_quality_metrics_history(
839 self,
840 project_path: str,
841 metric_type: str | None = None,
842 days: int = 30,
843 ) -> list[dict[str, Any]]:
844 """Get quality metrics history for trend analysis."""
845 since = datetime.now() - timedelta(days=days)
847 with sqlite3.connect(self.db_path) as conn:
848 conn.row_factory = sqlite3.Row
850 where_conditions = ["project_path = ?", "timestamp >= ?"]
851 params = [project_path, since]
853 if metric_type:
854 where_conditions.append("metric_type = ?")
855 params.append(metric_type)
857 query = f"""
858 SELECT * FROM quality_metrics_history
859 WHERE {" AND ".join(where_conditions)}
860 ORDER BY timestamp DESC
861 """
863 cursor = conn.execute(query, params)
864 return [dict(row) for row in cursor.fetchall()]
866 async def get_test_failure_patterns(self, days: int = 7) -> dict[str, Any]:
867 """Analyze test failure patterns for insights."""
868 since = datetime.now() - timedelta(days=days)
870 with sqlite3.connect(self.db_path) as conn:
871 conn.row_factory = sqlite3.Row
873 # Get failed tests
874 failed_tests = conn.execute(
875 """
876 SELECT test_name, file_path, error_message, COUNT(*) as failure_count
877 FROM test_results
878 WHERE status = 'failed' AND timestamp >= ?
879 GROUP BY test_name, file_path, error_message
880 ORDER BY failure_count DESC
881 """,
882 (since,),
883 ).fetchall()
885 # Get flaky tests (alternating pass/fail)
886 flaky_tests = conn.execute(
887 """
888 SELECT test_name, file_path,
889 COUNT(DISTINCT status) as status_count,
890 COUNT(*) as total_runs
891 FROM test_results
892 WHERE timestamp >= ?
893 GROUP BY test_name, file_path
894 HAVING status_count > 1 AND total_runs >= 3
895 ORDER BY status_count DESC, total_runs DESC
896 """,
897 (since,),
898 ).fetchall()
900 # Get most failing files
901 failing_files = conn.execute(
902 """
903 SELECT file_path, COUNT(*) as failure_count
904 FROM test_results
905 WHERE status = 'failed' AND timestamp >= ?
906 GROUP BY file_path
907 ORDER BY failure_count DESC
908 LIMIT 10
909 """,
910 (since,),
911 ).fetchall()
913 return {
914 "failed_tests": [dict(row) for row in failed_tests],
915 "flaky_tests": [dict(row) for row in flaky_tests],
916 "failing_files": [dict(row) for row in failing_files],
917 "analysis_period_days": days,
918 }
920 async def get_quality_trends(
921 self,
922 project_path: str,
923 days: int = 30,
924 ) -> dict[str, Any]:
925 """Analyze quality trends over time."""
926 metrics_history = await self.get_quality_metrics_history(
927 project_path, None, days
928 )
930 # Calculate trends for each metric type
931 trends = {}
932 for metric_type in [
933 "test_pass_rate",
934 "code_coverage",
935 "lint_score",
936 "security_score",
937 "complexity_score",
938 ]:
939 metric_values = [
940 m for m in metrics_history if m["metric_type"] == metric_type
941 ]
942 if len(metric_values) >= 2:
943 # Sort by timestamp (most recent first)
944 metric_values.sort(key=lambda x: x["timestamp"], reverse=True)
946 # Split into recent and older halves
947 mid_point = len(metric_values) // 2
948 recent = metric_values[:mid_point] if mid_point > 0 else metric_values
949 older = metric_values[mid_point:] if mid_point > 0 else []
951 if recent and older:
952 recent_avg = sum(m["metric_value"] for m in recent) / len(recent)
953 older_avg = sum(m["metric_value"] for m in older) / len(older)
954 change = recent_avg - older_avg
956 trends[metric_type] = {
957 "direction": "improving"
958 if change > 0
959 else "declining"
960 if change < 0
961 else "stable",
962 "change": abs(change),
963 "change_percentage": (abs(change) / older_avg * 100)
964 if older_avg > 0
965 else 0,
966 "recent_average": recent_avg,
967 "previous_average": older_avg,
968 "data_points": len(metric_values),
969 "trend_strength": "strong"
970 if abs(change) > 5
971 else "moderate"
972 if abs(change) > 1
973 else "weak",
974 }
975 else:
976 # Not enough data for trend analysis
977 current_avg = sum(m["metric_value"] for m in metric_values) / len(
978 metric_values
979 )
980 trends[metric_type] = {
981 "direction": "insufficient_data",
982 "change": 0,
983 "change_percentage": 0,
984 "recent_average": current_avg,
985 "previous_average": current_avg,
986 "data_points": len(metric_values),
987 "trend_strength": "unknown",
988 }
990 # Overall trend assessment
991 improving_metrics = sum(
992 1 for t in trends.values() if t["direction"] == "improving"
993 )
994 declining_metrics = sum(
995 1 for t in trends.values() if t["direction"] == "declining"
996 )
998 overall_assessment = {
999 "overall_direction": "improving"
1000 if improving_metrics > declining_metrics
1001 else "declining"
1002 if declining_metrics > improving_metrics
1003 else "stable",
1004 "improving_count": improving_metrics,
1005 "declining_count": declining_metrics,
1006 "stable_count": len(trends) - improving_metrics - declining_metrics,
1007 "analysis_period_days": days,
1008 }
1010 return {
1011 "trends": trends,
1012 "overall": overall_assessment,
1013 "recommendations": self._generate_trend_recommendations(trends),
1014 }
1016 def _generate_trend_recommendations(self, trends: dict[str, Any]) -> list[str]:
1017 """Generate recommendations based on quality trends."""
1018 recommendations = []
1020 for metric_type, trend_data in trends.items():
1021 direction = trend_data["direction"]
1022 strength = trend_data["trend_strength"]
1023 change = trend_data["change"]
1025 if direction == "declining" and strength in ["strong", "moderate"]:
1026 if metric_type == "test_pass_rate":
1027 recommendations.append(
1028 f"⚠️ Test pass rate declining by {change:.1f}% - investigate failing tests"
1029 )
1030 elif metric_type == "code_coverage":
1031 recommendations.append(
1032 f"⚠️ Code coverage declining by {change:.1f}% - add more tests"
1033 )
1034 elif metric_type == "lint_score":
1035 recommendations.append(
1036 "⚠️ Code quality declining - address lint issues"
1037 )
1038 elif metric_type == "security_score":
1039 recommendations.append(
1040 "🔒 Security score declining - review security findings"
1041 )
1042 elif metric_type == "complexity_score":
1043 recommendations.append(
1044 "🔧 Code complexity increasing - consider refactoring"
1045 )
1047 elif direction == "improving" and strength == "strong":
1048 if (
1049 metric_type == "test_pass_rate"
1050 and trend_data["recent_average"] > 95
1051 ):
1052 recommendations.append(
1053 "✅ Excellent test pass rate trend - maintain current practices"
1054 )
1055 elif (
1056 metric_type == "code_coverage" and trend_data["recent_average"] > 85
1057 ):
1058 recommendations.append(
1059 "✅ Great coverage improvement - continue testing efforts"
1060 )
1062 # Add general recommendations
1063 if not recommendations:
1064 recommendations.append(
1065 "📈 Quality metrics are stable - continue current practices"
1066 )
1068 return recommendations
1070 async def health_check(self) -> dict[str, Any]:
1071 """Check integration health and dependencies."""
1072 health = {
1073 "crackerjack_available": False,
1074 "database_accessible": False,
1075 "version_compatible": False,
1076 "recommendations": [],
1077 "status": "unhealthy",
1078 }
1080 try:
1081 # Check crackerjack availability
1082 process = await asyncio.create_subprocess_exec(
1083 "crackerjack",
1084 "--help",
1085 stdout=asyncio.subprocess.DEVNULL,
1086 stderr=asyncio.subprocess.DEVNULL,
1087 )
1088 await process.communicate()
1089 health["crackerjack_available"] = process.returncode == 0
1091 if health["crackerjack_available"]:
1092 health["recommendations"].append(
1093 "✅ Crackerjack is available and responding"
1094 )
1095 else:
1096 health["recommendations"].append(
1097 "❌ Crackerjack not available - install with 'uv add crackerjack'"
1098 )
1100 # Check database accessibility
1101 with sqlite3.connect(self.db_path) as conn:
1102 conn.execute("SELECT 1").fetchone()
1103 health["database_accessible"] = True
1104 health["recommendations"].append("✅ Database connection successful")
1106 # Check if we have any data
1107 cursor = conn.execute("SELECT COUNT(*) FROM crackerjack_results")
1108 result_count = cursor.fetchone()[0]
1110 if result_count > 0:
1111 health["recommendations"].append(
1112 f"📊 {result_count} execution records available"
1113 )
1114 else:
1115 health["recommendations"].append(
1116 "📝 No execution history - run some crackerjack commands"
1117 )
1119 # Overall status
1120 if health["crackerjack_available"] and health["database_accessible"]:
1121 health["status"] = "healthy"
1122 elif health["database_accessible"]:
1123 health["status"] = "partial"
1124 else:
1125 health["status"] = "unhealthy"
1127 except sqlite3.Error as e:
1128 health["database_accessible"] = False
1129 health["recommendations"].append(f"❌ Database error: {e}")
1130 except Exception as e:
1131 health["error"] = str(e)
1132 health["recommendations"].append(f"❌ Health check error: {e}")
1134 return health
1136 def _calculate_quality_metrics(
1137 self,
1138 parsed_data: dict[str, Any],
1139 exit_code: int,
1140 ) -> dict[str, float]:
1141 """Calculate quality metrics from parsed data."""
1142 metrics = {}
1144 # Test metrics
1145 test_results = parsed_data.get("test_results", [])
1146 if test_results:
1147 passed = sum(1 for t in test_results if t["status"] == "passed")
1148 total = len(test_results)
1149 metrics["test_pass_rate"] = (passed / total) * 100 if total > 0 else 0
1151 # Coverage metrics
1152 coverage_summary = parsed_data.get("coverage_summary", {})
1153 if "total_coverage" in coverage_summary:
1154 metrics["code_coverage"] = float(coverage_summary["total_coverage"])
1156 # Lint metrics
1157 lint_summary = parsed_data.get("lint_summary", {})
1158 if "total_issues" in lint_summary:
1159 # Invert to make higher scores better
1160 total_issues = lint_summary["total_issues"]
1161 metrics["lint_score"] = (
1162 max(0, 100 - total_issues) if total_issues < 100 else 0
1163 )
1165 # Security metrics
1166 security_summary = parsed_data.get("security_summary", {})
1167 if "total_issues" in security_summary:
1168 total_issues = security_summary["total_issues"]
1169 metrics["security_score"] = (
1170 max(0, 100 - (total_issues * 10)) if total_issues < 10 else 0
1171 )
1173 # Complexity metrics
1174 complexity_summary = parsed_data.get("complexity_summary", {})
1175 if complexity_summary:
1176 total_files = complexity_summary.get("total_files", 0)
1177 high_complexity = complexity_summary.get("high_complexity_files", 0)
1178 if total_files > 0:
1179 complexity_rate = (high_complexity / total_files) * 100
1180 metrics["complexity_score"] = max(0, 100 - complexity_rate)
1182 # Overall build status
1183 metrics["build_status"] = 100 if exit_code == 0 else 0
1185 return metrics
1187 async def _store_result(self, result_id: str, result: CrackerjackResult) -> None:
1188 """Store Crackerjack result in database."""
1189 with sqlite3.connect(self.db_path) as conn:
1190 conn.execute(
1191 """
1192 INSERT INTO crackerjack_results
1193 (id, command, exit_code, stdout, stderr, execution_time, timestamp,
1194 working_directory, parsed_data, quality_metrics, memory_insights)
1195 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1196 """,
1197 (
1198 result_id,
1199 result.command,
1200 result.exit_code,
1201 result.stdout,
1202 result.stderr,
1203 result.execution_time,
1204 result.timestamp,
1205 result.working_directory,
1206 json.dumps(result.parsed_data),
1207 json.dumps(result.quality_metrics),
1208 json.dumps(result.memory_insights),
1209 ),
1210 )
1212 # Store individual test results
1213 for test_result in result.test_results:
1214 test_id = f"test_{result_id}_{hash(test_result.get('test', 'unknown'))}"
1215 conn.execute(
1216 """
1217 INSERT INTO test_results
1218 (id, result_id, test_name, status, duration, file_path, timestamp)
1219 VALUES (?, ?, ?, ?, ?, ?, ?)
1220 """,
1221 (
1222 test_id,
1223 result_id,
1224 test_result.get("test", ""),
1225 test_result.get("status", ""),
1226 test_result.get("duration", 0),
1227 test_result.get("file", ""),
1228 result.timestamp,
1229 ),
1230 )
1232 # Store quality metrics
1233 for metric_name, metric_value in result.quality_metrics.items():
1234 metric_id = f"metric_{result_id}_{metric_name}"
1235 conn.execute(
1236 """
1237 INSERT INTO quality_metrics_history
1238 (id, project_path, metric_type, metric_value, timestamp, result_id)
1239 VALUES (?, ?, ?, ?, ?, ?)
1240 """,
1241 (
1242 metric_id,
1243 result.working_directory,
1244 metric_name,
1245 metric_value,
1246 result.timestamp,
1247 result_id,
1248 ),
1249 )
1251 async def _store_progress_snapshot(
1252 self,
1253 result_id: str,
1254 result: CrackerjackResult,
1255 project_path: str,
1256 ) -> None:
1257 """Store progress snapshot from result."""
1258 progress_info = result.parsed_data.get("progress_info", {})
1260 if progress_info:
1261 snapshot_id = f"progress_{result_id}"
1263 with sqlite3.connect(self.db_path) as conn:
1264 conn.execute(
1265 """
1266 INSERT INTO progress_snapshots
1267 (id, project_path, command, stage, progress_percentage, current_task,
1268 completed_tasks, failed_tasks, quality_metrics, timestamp, memory_context)
1269 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1270 """,
1271 (
1272 snapshot_id,
1273 project_path,
1274 result.command,
1275 progress_info.get("stage", ""),
1276 progress_info.get("percentage", 0),
1277 progress_info.get("current_task", ""),
1278 json.dumps(progress_info.get("completed_tasks", [])),
1279 json.dumps(progress_info.get("failed_tasks", [])),
1280 json.dumps(result.quality_metrics),
1281 result.timestamp,
1282 json.dumps(result.memory_insights),
1283 ),
1284 )
1287# Global integration instance
1288_crackerjack_integration = None
1291def get_crackerjack_integration() -> CrackerjackIntegration:
1292 """Get global Crackerjack integration instance."""
1293 global _crackerjack_integration
1294 if _crackerjack_integration is None:
1295 _crackerjack_integration = CrackerjackIntegration()
1296 return _crackerjack_integration
1299# Public API functions for MCP tools
1300async def execute_crackerjack_command(
1301 command: str,
1302 args: list[str] | None = None,
1303 working_directory: str = ".",
1304 timeout: int = 300,
1305 ai_agent_mode: bool = False,
1306) -> dict[str, Any]:
1307 """Execute Crackerjack command and return structured results."""
1308 integration = get_crackerjack_integration()
1309 result = await integration.execute_crackerjack_command(
1310 command,
1311 args,
1312 working_directory,
1313 timeout,
1314 ai_agent_mode,
1315 )
1316 return asdict(result)
1319async def get_recent_crackerjack_results(
1320 hours: int = 24,
1321 command: str | None = None,
1322) -> list[dict[str, Any]]:
1323 """Get recent Crackerjack execution results."""
1324 integration = get_crackerjack_integration()
1325 return await integration.get_recent_results(hours, command)
1328async def get_quality_metrics_history(
1329 project_path: str,
1330 metric_type: str | None = None,
1331 days: int = 30,
1332) -> list[dict[str, Any]]:
1333 """Get quality metrics history for trend analysis."""
1334 integration = get_crackerjack_integration()
1335 return await integration.get_quality_metrics_history(
1336 project_path,
1337 metric_type,
1338 days,
1339 )
1342async def analyze_test_failure_patterns(days: int = 7) -> dict[str, Any]:
1343 """Analyze test failure patterns for insights."""
1344 integration = get_crackerjack_integration()
1345 return await integration.get_test_failure_patterns(days)
1348async def get_quality_trends(
1349 project_path: str,
1350 days: int = 30,
1351) -> dict[str, Any]:
1352 """Analyze quality trends over time."""
1353 integration = get_crackerjack_integration()
1354 return await integration.get_quality_trends(project_path, days)
1357async def crackerjack_health_check() -> dict[str, Any]:
1358 """Check Crackerjack integration health and dependencies."""
1359 integration = get_crackerjack_integration()
1360 return await integration.health_check()