Coverage for session_buddy / crackerjack_integration.py: 58.72%
417 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1"""Crackerjack Integration module for progress tracking and test monitoring.
3This module provides deep integration with Crackerjack for:
4- Progress tracking output parsing for memory enrichment
5- Test result monitoring for context enhancement
6- Command execution with result capture
7- Quality metrics integration
8"""
10import asyncio
11import json
12import logging
13import operator
14import sqlite3
15import tempfile
16import threading
17import time
18from dataclasses import asdict, dataclass
19from datetime import datetime, timedelta
20from enum import Enum
21from pathlib import Path
22from typing import Any
24from session_buddy.utils.crackerjack import (
25 CrackerjackOutputParser,
26)
28logger = logging.getLogger(__name__)
31class CrackerjackCommand(Enum):
32 """Supported Crackerjack commands."""
34 # Core quality commands
35 ANALYZE = "analyze" # Comprehensive analysis command
36 CHECK = "check"
37 TEST = "test"
38 LINT = "lint"
39 FORMAT = "format"
40 TYPECHECK = "typecheck" # Type checking support
42 # Security and complexity
43 SECURITY = "security"
44 COMPLEXITY = "complexity"
45 COVERAGE = "coverage"
47 # Build and maintenance
48 BUILD = "build"
49 CLEAN = "clean"
51 # Documentation
52 DOCS = "docs"
54 # Release management
55 RELEASE = "release" # Release command support
58class TestStatus(Enum):
59 """Test execution status."""
61 PASSED = "passed"
62 FAILED = "failed"
63 SKIPPED = "skipped"
64 ERROR = "error"
65 XFAIL = "xfail"
66 XPASS = "xpass"
69class QualityMetric(Enum):
70 """Quality metrics tracked."""
72 CODE_COVERAGE = "coverage"
73 COMPLEXITY = "complexity"
74 LINT_SCORE = "lint_score"
75 SECURITY_SCORE = "security_score"
76 TEST_PASS_RATE = "test_pass_rate" # nosec B105
77 BUILD_STATUS = "build_status"
80@dataclass
81class CrackerjackResult:
82 """Result of Crackerjack command execution."""
84 command: str
85 exit_code: int
86 stdout: str
87 stderr: str
88 execution_time: float
89 timestamp: datetime
90 working_directory: str
91 parsed_data: dict[str, Any] | None
92 quality_metrics: dict[str, float]
93 test_results: list[dict[str, Any]]
94 memory_insights: list[str]
97@dataclass
98class TestResult:
99 """Individual test result information."""
101 test_id: str
102 test_name: str
103 status: TestStatus
104 duration: float
105 file_path: str
106 line_number: int | None
107 error_message: str | None
108 traceback: str | None
109 tags: list[str]
110 coverage_data: dict[str, Any] | None
113@dataclass
114class ProgressSnapshot:
115 """Progress tracking snapshot."""
117 timestamp: datetime
118 project_path: str
119 command: str
120 stage: str
121 progress_percentage: float
122 current_task: str
123 completed_tasks: list[str]
124 failed_tasks: list[str]
125 quality_metrics: dict[str, float]
126 estimated_completion: datetime | None
127 memory_context: list[str]
130# PatternMappingsBuilder and CrackerjackOutputParser classes have been extracted
131# to session_buddy.utils.crackerjack module for better modularity and reusability.
134class CrackerjackIntegration:
135 """Main integration class for Crackerjack command execution and monitoring."""
137 def __init__(self, db_path: str | None = None) -> None:
138 """Initialize Crackerjack integration."""
139 self.db_path = db_path or str(
140 Path.home() / ".claude" / "data" / "crackerjack_integration.db",
141 )
142 self.parser = CrackerjackOutputParser()
143 self._lock = threading.Lock()
144 try:
145 self._init_database()
146 except Exception:
147 # Fall back to a temp-writable path if the default is not writable
148 tmp_db = (
149 Path(tempfile.gettempdir())
150 / "session-mgmt-mcp"
151 / "data"
152 / "crackerjack_integration.db"
153 )
154 tmp_db.parent.mkdir(parents=True, exist_ok=True)
155 self.db_path = str(tmp_db)
156 self._init_database()
158 def execute_command(
159 self,
160 cmd: list[str],
161 **kwargs: Any,
162 ) -> dict[str, Any]:
163 """Execute command synchronously (for CommandRunner protocol compatibility).
165 This is a synchronous wrapper around execute_crackerjack_command for
166 compatibility with crackerjack's CommandRunner protocol.
167 """
168 import os
169 import subprocess # nosec B404
171 try:
172 env = kwargs.get("env", os.environ.copy())
174 # Execute the command directly using subprocess
175 result = subprocess.run(
176 cmd,
177 check=False,
178 capture_output=True,
179 text=True,
180 timeout=kwargs.get("timeout", 300),
181 cwd=kwargs.get("cwd", "."),
182 env=env,
183 **{
184 k: v
185 for k, v in kwargs.items()
186 if k not in {"timeout", "cwd", "env"}
187 },
188 )
190 return {
191 "stdout": result.stdout,
192 "stderr": result.stderr,
193 "returncode": result.returncode,
194 "success": result.returncode == 0,
195 }
197 except subprocess.TimeoutExpired as e:
198 return {
199 "stdout": e.stdout.decode() if e.stdout else "",
200 "stderr": e.stderr.decode() if e.stderr else "Command timed out",
201 "returncode": -1,
202 "success": False,
203 }
204 except Exception as e:
205 return {"stdout": "", "stderr": str(e), "returncode": -2, "success": False}
207 def _init_database(self) -> None:
208 """Initialize SQLite database for Crackerjack integration."""
209 Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
211 with sqlite3.connect(self.db_path) as conn:
212 conn.execute("""
213 CREATE TABLE IF NOT EXISTS crackerjack_results (
214 id TEXT PRIMARY KEY,
215 command TEXT NOT NULL,
216 exit_code INTEGER,
217 stdout TEXT,
218 stderr TEXT,
219 execution_time REAL,
220 timestamp TIMESTAMP,
221 working_directory TEXT,
222 parsed_data TEXT, -- JSON
223 quality_metrics TEXT, -- JSON
224 memory_insights TEXT -- JSON array
225 )
226 """)
228 conn.execute("""
229 CREATE TABLE IF NOT EXISTS test_results (
230 id TEXT PRIMARY KEY,
231 result_id TEXT NOT NULL,
232 test_name TEXT NOT NULL,
233 status TEXT NOT NULL,
234 duration REAL,
235 file_path TEXT,
236 line_number INTEGER,
237 error_message TEXT,
238 timestamp TIMESTAMP,
239 FOREIGN KEY (result_id) REFERENCES crackerjack_results(id)
240 )
241 """)
243 conn.execute("""
244 CREATE TABLE IF NOT EXISTS progress_snapshots (
245 id TEXT PRIMARY KEY,
246 project_path TEXT NOT NULL,
247 command TEXT NOT NULL,
248 stage TEXT,
249 progress_percentage REAL,
250 current_task TEXT,
251 completed_tasks TEXT, -- JSON array
252 failed_tasks TEXT, -- JSON array
253 quality_metrics TEXT, -- JSON
254 timestamp TIMESTAMP,
255 memory_context TEXT -- JSON array
256 )
257 """)
259 conn.execute("""
260 CREATE TABLE IF NOT EXISTS quality_metrics_history (
261 id TEXT PRIMARY KEY,
262 project_path TEXT NOT NULL,
263 metric_type TEXT NOT NULL,
264 metric_value REAL NOT NULL,
265 timestamp TIMESTAMP,
266 result_id TEXT,
267 FOREIGN KEY (result_id) REFERENCES crackerjack_results(id)
268 )
269 """)
271 # Create indices
272 conn.execute(
273 "CREATE INDEX IF NOT EXISTS idx_results_timestamp ON crackerjack_results(timestamp)",
274 )
275 conn.execute(
276 "CREATE INDEX IF NOT EXISTS idx_results_command ON crackerjack_results(command)",
277 )
278 conn.execute(
279 "CREATE INDEX IF NOT EXISTS idx_test_results_status ON test_results(status)",
280 )
281 conn.execute(
282 "CREATE INDEX IF NOT EXISTS idx_progress_project ON progress_snapshots(project_path)",
283 )
284 conn.execute(
285 "CREATE INDEX IF NOT EXISTS idx_metrics_type ON quality_metrics_history(metric_type)",
286 )
288 def _build_command_flags(self, command: str, ai_agent_mode: bool) -> list[str]:
289 """Build appropriate command flags for the given command.
291 Crackerjack CLI structure (v0.47+):
292 - Uses 'run' subcommand followed by flags
293 - Example: python -m crackerjack run --fast --quick
294 - Example: python -m crackerjack run --comp --run-tests
295 - Example: python -m crackerjack run --all patch (release workflow)
297 IMPORTANT: --all requires an argument (patch|minor|major|auto|interactive)
298 For general quality checks, use 'run' with no flags or --fast/--comp
299 """
300 command_mappings = {
301 # Fast hooks only (formatters and basic checks)
302 "lint": ["run", "--fast"],
303 "format": ["run", "--fast"],
304 # Comprehensive hooks (type checking, security, complexity)
305 "check": ["run", "--comp"],
306 "typecheck": ["run", "--comp"],
307 "security": ["run", "--comp"], # Security is part of comprehensive hooks
308 "complexity": [
309 "run",
310 "--comp",
311 ], # Complexity is part of comprehensive hooks
312 "analyze": ["run", "--comp"], # Comprehensive analysis
313 # Test execution
314 "test": ["run", "--run-tests"],
315 # Build and maintenance (default run with no special flags)
316 "build": ["run"],
317 "clean": ["run"], # Clean happens automatically in current version
318 # All quality checks (use default run, NOT --all which is for release)
319 "all": ["run"], # Just run with no special flags
320 "run": ["run"],
321 # Standalone commands (no 'run' prefix)
322 "run-tests": ["run-tests"],
323 }
325 flags = command_mappings.get(command.lower(), ["run"])
326 if ai_agent_mode:
327 flags.append("--ai-fix")
328 return flags
330 async def _execute_process(
331 self,
332 full_command: list[str],
333 working_directory: str,
334 timeout: int,
335 ) -> tuple[int, str, str, float]:
336 """Execute the subprocess and return exit code, stdout, stderr, and execution time."""
337 import os
339 start_time = time.time()
341 env = os.environ.copy()
343 process = await asyncio.create_subprocess_exec(
344 *full_command,
345 cwd=working_directory,
346 stdout=asyncio.subprocess.PIPE,
347 stderr=asyncio.subprocess.PIPE,
348 env=env,
349 )
351 stdout, stderr = await asyncio.wait_for(
352 process.communicate(),
353 timeout=timeout,
354 )
356 exit_code = process.returncode or 0
357 execution_time = time.time() - start_time
358 stdout_text = stdout.decode("utf-8", errors="ignore")
359 stderr_text = stderr.decode("utf-8", errors="ignore")
361 return exit_code, stdout_text, stderr_text, execution_time
363 def _create_error_result(
364 self,
365 command: str,
366 exit_code: int,
367 stderr: str,
368 execution_time: float,
369 working_directory: str,
370 memory_insight: str,
371 ) -> CrackerjackResult:
372 """Create a standardized error result."""
373 return CrackerjackResult(
374 command=command,
375 exit_code=exit_code,
376 stdout="",
377 stderr=stderr,
378 execution_time=execution_time,
379 timestamp=datetime.now(),
380 working_directory=working_directory,
381 parsed_data={},
382 quality_metrics={},
383 test_results=[],
384 memory_insights=[memory_insight],
385 )
387 async def execute_crackerjack_command(
388 self,
389 command: str,
390 args: list[str] | None = None,
391 working_directory: str = ".",
392 timeout: int = 300,
393 ai_agent_mode: bool = False,
394 ) -> CrackerjackResult:
395 """Execute Crackerjack command and capture results."""
396 args = args or []
397 command_flags = self._build_command_flags(command, ai_agent_mode)
398 quick_commands = {"lint", "check", "test", "format", "typecheck"}
399 if command.lower() in quick_commands and "--quick" not in args:
400 if "--ai-fix" in command_flags:
401 command_flags.insert(command_flags.index("--ai-fix"), "--quick")
402 else:
403 command_flags.append("--quick")
404 full_command = ["python", "-m", "crackerjack", *command_flags, *args]
406 start_time = time.time()
407 result_id = f"cj_{int(start_time * 1000)}"
409 try:
410 (
411 exit_code,
412 stdout_text,
413 stderr_text,
414 execution_time,
415 ) = await self._execute_process(full_command, working_directory, timeout)
417 parsed_data, memory_insights = self.parser.parse_output(
418 command,
419 stdout_text,
420 stderr_text,
421 )
422 quality_metrics = self._calculate_quality_metrics(
423 parsed_data,
424 exit_code,
425 stderr_text,
426 )
428 result = CrackerjackResult(
429 command=command,
430 exit_code=exit_code,
431 stdout=stdout_text,
432 stderr=stderr_text,
433 execution_time=execution_time,
434 timestamp=datetime.now(),
435 working_directory=working_directory,
436 parsed_data=parsed_data,
437 quality_metrics=quality_metrics,
438 test_results=parsed_data.get("test_results", []),
439 memory_insights=memory_insights,
440 )
442 await self._store_result(result_id, result)
443 await self._store_progress_snapshot(result_id, result, working_directory)
444 return result
446 except TimeoutError:
447 execution_time = time.time() - start_time
448 error_result = self._create_error_result(
449 command,
450 -1,
451 f"Command timed out after {timeout} seconds",
452 execution_time,
453 working_directory,
454 f"Command '{command}' timed out - consider optimizing or increasing timeout",
455 )
456 await self._store_result(result_id, error_result)
457 return error_result
459 except Exception as e:
460 execution_time = time.time() - start_time
461 error_result = self._create_error_result(
462 command,
463 -2,
464 f"Execution error: {e!s}",
465 execution_time,
466 working_directory,
467 f"Command '{command}' failed with error: {e!s}",
468 )
469 await self._store_result(result_id, error_result)
470 return error_result
472 async def get_recent_results(
473 self,
474 hours: int = 24,
475 command: str | None = None,
476 ) -> list[dict[str, Any]]:
477 """Get recent Crackerjack execution results."""
478 since = datetime.now() - timedelta(hours=hours)
480 with sqlite3.connect(self.db_path) as conn:
481 conn.row_factory = sqlite3.Row
483 where_conditions = ["timestamp >= ?"]
484 params = [since.isoformat()]
486 if command: 486 ↛ 487line 486 didn't jump to line 487 because the condition on line 486 was never true
487 where_conditions.append("command = ?")
488 params.append(command)
490 # Build SQL safely - all user input is parameterized via params list
491 query = (
492 "SELECT * FROM crackerjack_results WHERE "
493 + " AND ".join(where_conditions)
494 + " ORDER BY timestamp DESC"
495 )
497 cursor = conn.execute(query, params)
498 results = []
500 for row in cursor.fetchall():
501 result = dict(row)
502 result["parsed_data"] = json.loads(result["parsed_data"] or "{}")
503 result["quality_metrics"] = json.loads(
504 result["quality_metrics"] or "{}",
505 )
506 result["memory_insights"] = json.loads(
507 result["memory_insights"] or "[]",
508 )
509 results.append(result)
511 return results
513 async def get_quality_metrics_history(
514 self,
515 project_path: str,
516 metric_type: str | None = None,
517 days: int = 30,
518 ) -> list[dict[str, Any]]:
519 """Get quality metrics history for trend analysis."""
520 since = datetime.now() - timedelta(days=days)
522 with sqlite3.connect(self.db_path) as conn:
523 conn.row_factory = sqlite3.Row
525 where_conditions = ["project_path = ?", "timestamp >= ?"]
526 params = [project_path, since.isoformat()]
528 if metric_type: 528 ↛ 529line 528 didn't jump to line 529 because the condition on line 528 was never true
529 where_conditions.append("metric_type = ?")
530 params.append(metric_type)
532 # Build SQL safely - all user input is parameterized via params list
533 query = (
534 "SELECT * FROM quality_metrics_history WHERE "
535 + " AND ".join(where_conditions)
536 + " ORDER BY timestamp DESC"
537 )
539 cursor = conn.execute(query, params)
540 return [dict(row) for row in cursor.fetchall()]
542 async def get_test_failure_patterns(self, days: int = 7) -> dict[str, Any]:
543 """Analyze test failure patterns for insights."""
544 since = datetime.now() - timedelta(days=days)
546 with sqlite3.connect(self.db_path) as conn:
547 conn.row_factory = sqlite3.Row
549 # Get failed tests
550 failed_tests = conn.execute(
551 """
552 SELECT test_name, file_path, error_message, COUNT(*) as failure_count
553 FROM test_results
554 WHERE status = 'failed' AND timestamp >= ?
555 GROUP BY test_name, file_path, error_message
556 ORDER BY failure_count DESC
557 """,
558 (since.isoformat(),),
559 ).fetchall()
561 # Get flaky tests (alternating pass/fail)
562 flaky_tests = conn.execute(
563 """
564 SELECT test_name, file_path,
565 COUNT(DISTINCT status) as status_count,
566 COUNT(*) as total_runs
567 FROM test_results
568 WHERE timestamp >= ?
569 GROUP BY test_name, file_path
570 HAVING status_count > 1 AND total_runs >= 3
571 ORDER BY status_count DESC, total_runs DESC
572 """,
573 (since.isoformat(),),
574 ).fetchall()
576 # Get most failing files
577 failing_files = conn.execute(
578 """
579 SELECT file_path, COUNT(*) as failure_count
580 FROM test_results
581 WHERE status = 'failed' AND timestamp >= ?
582 GROUP BY file_path
583 ORDER BY failure_count DESC
584 LIMIT 10
585 """,
586 (since.isoformat(),),
587 ).fetchall()
589 return {
590 "failed_tests": [dict(row) for row in failed_tests],
591 "flaky_tests": [dict(row) for row in flaky_tests],
592 "failing_files": [dict(row) for row in failing_files],
593 "analysis_period_days": days,
594 }
596 def _filter_metrics_by_type(
597 self,
598 metrics_history: list[dict[str, Any]],
599 metric_type: str,
600 ) -> list[dict[str, Any]]:
601 """Filter metrics history by type and sort by timestamp."""
602 metric_values = [m for m in metrics_history if m["metric_type"] == metric_type]
603 metric_values.sort(key=operator.itemgetter("timestamp"), reverse=True)
604 return metric_values
606 def _calculate_trend_direction(self, change: float) -> str:
607 """Determine trend direction from change value."""
608 if change > 0:
609 return "improving"
610 if change < 0:
611 return "declining"
612 return "stable"
614 def _calculate_trend_strength(self, change: float) -> str:
615 """Determine trend strength from absolute change value."""
616 abs_change = abs(change)
617 if abs_change > 5:
618 return "strong"
619 if abs_change > 1:
620 return "moderate"
621 return "weak"
623 def _create_trend_data(self, metric_values: list[dict[str, Any]]) -> dict[str, Any]:
624 """Create trend data from metric values with sufficient data."""
625 mid_point = len(metric_values) // 2
626 recent = metric_values[:mid_point] if mid_point > 0 else metric_values
627 older = metric_values[mid_point:] if mid_point > 0 else []
629 if not (recent and older):
630 current_avg = sum(m["metric_value"] for m in metric_values) / len(
631 metric_values,
632 )
633 return {
634 "direction": "insufficient_data",
635 "change": 0,
636 "change_percentage": 0,
637 "recent_average": current_avg,
638 "previous_average": current_avg,
639 "data_points": len(metric_values),
640 "trend_strength": "unknown",
641 }
643 recent_avg = sum(m["metric_value"] for m in recent) / len(recent)
644 older_avg = sum(m["metric_value"] for m in older) / len(older)
645 change = recent_avg - older_avg
647 return {
648 "direction": self._calculate_trend_direction(change),
649 "change": abs(change),
650 "change_percentage": (abs(change) / older_avg * 100)
651 if older_avg > 0
652 else 0,
653 "recent_average": recent_avg,
654 "previous_average": older_avg,
655 "data_points": len(metric_values),
656 "trend_strength": self._calculate_trend_strength(change),
657 }
659 def _calculate_overall_assessment(
660 self,
661 trends: dict[str, Any],
662 days: int,
663 ) -> dict[str, Any]:
664 """Calculate overall trend assessment from individual trend data."""
665 improving_metrics = sum(
666 1 for t in trends.values() if t["direction"] == "improving"
667 )
668 declining_metrics = sum(
669 1 for t in trends.values() if t["direction"] == "declining"
670 )
672 if improving_metrics > declining_metrics:
673 overall_direction = "improving"
674 elif declining_metrics > improving_metrics:
675 overall_direction = "declining"
676 else:
677 overall_direction = "stable"
679 return {
680 "overall_direction": overall_direction,
681 "improving_count": improving_metrics,
682 "declining_count": declining_metrics,
683 "stable_count": len(trends) - improving_metrics - declining_metrics,
684 "analysis_period_days": days,
685 }
687 async def get_quality_trends(
688 self,
689 project_path: str,
690 days: int = 30,
691 ) -> dict[str, Any]:
692 """Analyze quality trends over time."""
693 metrics_history = await self.get_quality_metrics_history(
694 project_path,
695 None,
696 days,
697 )
699 metric_types = (
700 "test_pass_rate",
701 "code_coverage",
702 "lint_score",
703 "security_score",
704 "complexity_score",
705 )
706 trends = {}
708 for metric_type in metric_types:
709 metric_values = self._filter_metrics_by_type(metrics_history, metric_type)
710 if len(metric_values) >= 2:
711 trends[metric_type] = self._create_trend_data(metric_values)
713 overall_assessment = self._calculate_overall_assessment(trends, days)
715 return {
716 "trends": trends,
717 "overall": overall_assessment,
718 "recommendations": self._generate_trend_recommendations(trends),
719 }
721 def _get_declining_recommendation(
722 self,
723 metric_type: str,
724 change: float,
725 ) -> str | None:
726 """Get recommendation for declining metrics."""
727 recommendations_map = {
728 "test_pass_rate": f"⚠️ Test pass rate declining by {change:.1f}% - investigate failing tests",
729 "code_coverage": f"⚠️ Code coverage declining by {change:.1f}% - add more tests",
730 "lint_score": "⚠️ Code quality declining - address lint issues",
731 "security_score": "🔒 Security score declining - review security findings",
732 "complexity_score": "🔧 Code complexity increasing - consider refactoring",
733 }
734 return recommendations_map.get(metric_type)
736 def _get_improving_recommendation(
737 self,
738 metric_type: str,
739 recent_avg: float,
740 ) -> str | None:
741 """Get recommendation for improving metrics with high averages."""
742 if metric_type == "test_pass_rate" and recent_avg > 95:
743 return "✅ Excellent test pass rate trend - maintain current practices"
744 if metric_type == "code_coverage" and recent_avg > 85:
745 return "✅ Great coverage improvement - continue testing efforts"
746 return None
748 def _generate_trend_recommendations(self, trends: dict[str, Any]) -> list[str]:
749 """Generate recommendations based on quality trends."""
750 recommendations = []
752 for metric_type, trend_data in trends.items():
753 direction = trend_data["direction"]
754 strength = trend_data["trend_strength"]
755 change = trend_data["change"]
756 recent_avg = trend_data["recent_average"]
758 if direction == "declining" and strength in {"strong", "moderate"}:
759 recommendation = self._get_declining_recommendation(metric_type, change)
760 if recommendation:
761 recommendations.append(recommendation)
762 elif direction == "improving" and strength == "strong":
763 recommendation = self._get_improving_recommendation(
764 metric_type,
765 recent_avg,
766 )
767 if recommendation:
768 recommendations.append(recommendation)
770 if not recommendations:
771 recommendations.append(
772 "📈 Quality metrics are stable - continue current practices",
773 )
775 return recommendations
777 async def health_check(self) -> dict[str, Any]:
778 """Check integration health and dependencies."""
779 health: dict[str, Any] = {
780 "crackerjack_available": False,
781 "database_accessible": False,
782 "version_compatible": False,
783 "recommendations": [],
784 "status": "unhealthy",
785 }
787 try:
788 await self._check_crackerjack_availability(health)
789 await self._check_database_health(health)
790 health["status"] = self._determine_health_status(health)
792 except sqlite3.Error as e:
793 health["database_accessible"] = False
794 health["recommendations"].append(f"❌ Database error: {e}")
795 except Exception as e:
796 health["error"] = str(e)
797 health["recommendations"].append(f"❌ Health check error: {e}")
799 return health
801 async def _check_crackerjack_availability(self, health: dict[str, Any]) -> None:
802 """Check if crackerjack command is available."""
803 process = await asyncio.create_subprocess_exec(
804 "crackerjack",
805 "--help",
806 stdout=asyncio.subprocess.DEVNULL,
807 stderr=asyncio.subprocess.DEVNULL,
808 )
809 await process.communicate()
810 health["crackerjack_available"] = process.returncode == 0
812 if health["crackerjack_available"]:
813 health["recommendations"].append(
814 "✅ Crackerjack is available and responding",
815 )
816 else:
817 health["recommendations"].append(
818 "❌ Crackerjack not available - install with 'uv add crackerjack'",
819 )
821 async def _check_database_health(self, health: dict[str, Any]) -> None:
822 """Check database accessibility and data availability."""
823 with sqlite3.connect(self.db_path) as conn:
824 conn.execute("SELECT 1").fetchone()
825 health["database_accessible"] = True
826 health["recommendations"].append("✅ Database connection successful")
828 cursor = conn.execute("SELECT COUNT(*) FROM crackerjack_results")
829 result_count = cursor.fetchone()[0]
831 if result_count > 0:
832 health["recommendations"].append(
833 f"📊 {result_count} execution records available",
834 )
835 else:
836 health["recommendations"].append(
837 "📝 No execution history - run some crackerjack commands",
838 )
840 def _determine_health_status(self, health: dict[str, Any]) -> str:
841 """Determine overall health status from component health."""
842 if health["crackerjack_available"] and health["database_accessible"]:
843 return "healthy"
844 if health["database_accessible"]:
845 return "partial"
846 return "unhealthy"
848 def _calculate_quality_metrics(
849 self,
850 parsed_data: dict[str, Any],
851 exit_code: int,
852 stderr_content: str = "",
853 ) -> dict[str, float]:
854 """Calculate quality metrics from parsed data."""
855 metrics = {}
857 metrics.update(self._calculate_test_metrics(parsed_data))
858 metrics.update(self._calculate_coverage_metrics(parsed_data))
859 metrics.update(self._calculate_lint_metrics(parsed_data))
860 metrics.update(self._calculate_security_metrics(parsed_data))
861 metrics.update(self._calculate_complexity_metrics(parsed_data))
863 if stderr_content: 863 ↛ 864line 863 didn't jump to line 864 because the condition on line 863 was never true
864 metrics.update(self._parse_stderr_metrics(stderr_content))
866 metrics["build_status"] = float(100 if exit_code == 0 else 0)
868 return metrics
870 def _calculate_test_metrics(self, parsed_data: dict[str, Any]) -> dict[str, float]:
871 """Calculate test pass rate metrics."""
872 metrics = {}
873 test_results = parsed_data.get("test_results", [])
874 if test_results:
875 passed = sum(1 for t in test_results if t["status"] == "passed")
876 total = len(test_results)
877 metrics["test_pass_rate"] = float(
878 (passed / total) * 100 if total > 0 else 0,
879 )
880 return metrics
882 def _calculate_coverage_metrics(
883 self, parsed_data: dict[str, Any]
884 ) -> dict[str, float]:
885 """Calculate code coverage metrics."""
886 metrics = {}
887 coverage_summary = parsed_data.get("coverage_summary", {})
888 if "total_coverage" in coverage_summary: 888 ↛ 889line 888 didn't jump to line 889 because the condition on line 888 was never true
889 metrics["code_coverage"] = float(coverage_summary["total_coverage"])
890 return metrics
892 def _calculate_lint_metrics(self, parsed_data: dict[str, Any]) -> dict[str, float]:
893 """Calculate lint score metrics (inverted so higher is better)."""
894 metrics = {}
895 lint_summary = parsed_data.get("lint_summary", {})
896 if "total_issues" in lint_summary:
897 total_issues = lint_summary["total_issues"]
898 metrics["lint_score"] = float(
899 max(0, 100 - total_issues) if total_issues < 100 else 0,
900 )
901 return metrics
903 def _calculate_security_metrics(
904 self, parsed_data: dict[str, Any]
905 ) -> dict[str, float]:
906 """Calculate security score metrics (inverted so higher is better)."""
907 metrics = {}
908 security_summary = parsed_data.get("security_summary", {})
909 if "total_issues" in security_summary:
910 total_issues = security_summary["total_issues"]
911 metrics["security_score"] = float(
912 max(0, 100 - (total_issues * 10)) if total_issues < 10 else 0,
913 )
914 return metrics
916 def _calculate_complexity_metrics(
917 self, parsed_data: dict[str, Any]
918 ) -> dict[str, float]:
919 """Calculate complexity score metrics (inverted so higher is better)."""
920 metrics = {}
921 complexity_summary = parsed_data.get("complexity_summary", {})
922 if complexity_summary:
923 total_files = complexity_summary.get("total_files", 0)
924 high_complexity = complexity_summary.get("high_complexity_files", 0)
925 if total_files > 0: 925 ↛ 926line 925 didn't jump to line 926 because the condition on line 925 was never true
926 complexity_rate = (high_complexity / total_files) * 100
927 metrics["complexity_score"] = float(max(0, 100 - complexity_rate))
928 return metrics
930 def _parse_stderr_metrics(self, stderr_content: str) -> dict[str, float]:
931 """Parse quality metrics from structured logging in stderr."""
932 metrics = {}
934 # Look for common structured logging patterns in stderr
935 lines = stderr_content.split("\n")
937 for line in lines:
938 # Parse structured log entries that might contain quality metrics
939 if '"quality"' in line or '"metric"' in line or '"score"' in line:
940 # This is a simplified approach - would in practice need to
941 # handle the actual structured format
942 import re
944 # Look for patterns like: "quality": value or "metric": value
945 quality_pattern = r'"quality"\s*:\s*(\d+\.?\d*)'
946 metric_pattern = r'"metric"\s*:\s*(\d+\.?\d*)'
947 score_pattern = r'"score"\s*:\s*(\d+\.?\d*)'
949 quality_match = re.search(quality_pattern, line)
950 if quality_match:
951 metrics["parsed_quality"] = float(quality_match.group(1))
953 metric_match = re.search(metric_pattern, line)
954 if metric_match:
955 metrics["parsed_metric"] = float(metric_match.group(1))
957 score_match = re.search(score_pattern, line)
958 if score_match:
959 metrics["parsed_score"] = float(score_match.group(1))
961 return metrics
963 async def _store_result(self, result_id: str, result: CrackerjackResult) -> None:
964 """Store Crackerjack result in database."""
965 try:
966 with sqlite3.connect(self.db_path) as conn:
967 conn.execute(
968 """
969 INSERT INTO crackerjack_results
970 (id, command, exit_code, stdout, stderr, execution_time, timestamp,
971 working_directory, parsed_data, quality_metrics, memory_insights)
972 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
973 """,
974 (
975 result_id,
976 result.command,
977 result.exit_code,
978 result.stdout,
979 result.stderr,
980 result.execution_time,
981 result.timestamp.isoformat(),
982 result.working_directory,
983 json.dumps(result.parsed_data),
984 json.dumps(result.quality_metrics),
985 json.dumps(result.memory_insights),
986 ),
987 )
989 # Store individual test results
990 for test_result in result.test_results: 990 ↛ 991line 990 didn't jump to line 991 because the loop on line 990 never started
991 test_id = (
992 f"test_{result_id}_{hash(test_result.get('test', 'unknown'))}"
993 )
994 conn.execute(
995 """
996 INSERT INTO test_results
997 (id, result_id, test_name, status, duration, file_path, timestamp)
998 VALUES (?, ?, ?, ?, ?, ?, ?)
999 """,
1000 (
1001 test_id,
1002 result_id,
1003 test_result.get("test", ""),
1004 test_result.get("status", ""),
1005 test_result.get("duration", 0),
1006 test_result.get("file", ""),
1007 result.timestamp,
1008 ),
1009 )
1011 # Store quality metrics
1012 for metric_name, metric_value in result.quality_metrics.items():
1013 metric_id = f"metric_{result_id}_{metric_name}"
1014 conn.execute(
1015 """
1016 INSERT INTO quality_metrics_history
1017 (id, project_path, metric_type, metric_value, timestamp, result_id)
1018 VALUES (?, ?, ?, ?, ?, ?)
1019 """,
1020 (
1021 metric_id,
1022 result.working_directory,
1023 metric_name,
1024 metric_value,
1025 result.timestamp.isoformat(),
1026 result_id,
1027 ),
1028 )
1029 except Exception:
1030 # In sandboxed/readonly environments, skip persistence
1031 return
1033 async def _store_progress_snapshot(
1034 self,
1035 result_id: str,
1036 result: CrackerjackResult,
1037 project_path: str,
1038 ) -> None:
1039 """Store progress snapshot from result."""
1040 progress_info: dict[str, Any] = (
1041 result.parsed_data.get("progress_info", {}) if result.parsed_data else {}
1042 )
1044 if progress_info: 1044 ↛ exitline 1044 didn't return from function '_store_progress_snapshot' because the condition on line 1044 was always true
1045 snapshot_id = f"progress_{result_id}"
1046 try:
1047 with sqlite3.connect(self.db_path) as conn:
1048 conn.execute(
1049 """
1050 INSERT INTO progress_snapshots
1051 (id, project_path, command, stage, progress_percentage, current_task,
1052 completed_tasks, failed_tasks, quality_metrics, timestamp, memory_context)
1053 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1054 """,
1055 (
1056 snapshot_id,
1057 project_path,
1058 result.command,
1059 progress_info.get("stage", ""),
1060 progress_info.get("percentage", 0),
1061 progress_info.get("current_task", ""),
1062 json.dumps(progress_info.get("completed_tasks", [])),
1063 json.dumps(progress_info.get("failed_tasks", [])),
1064 json.dumps(result.quality_metrics),
1065 result.timestamp.isoformat(),
1066 json.dumps(result.memory_insights),
1067 ),
1068 )
1069 except Exception:
1070 # In sandboxed/readonly environments, skip persistence
1071 return
1074# Global integration instance
1075_crackerjack_integration = None
1078def get_crackerjack_integration() -> CrackerjackIntegration:
1079 """Get global Crackerjack integration instance."""
1080 global _crackerjack_integration
1081 if _crackerjack_integration is None:
1082 _crackerjack_integration = CrackerjackIntegration()
1083 return _crackerjack_integration
1086# Public API functions for MCP tools
1087async def execute_crackerjack_command(
1088 command: str,
1089 args: list[str] | None = None,
1090 working_directory: str = ".",
1091 timeout: int = 300,
1092 ai_agent_mode: bool = False,
1093) -> dict[str, Any]:
1094 """Execute Crackerjack command and return structured results."""
1095 integration = get_crackerjack_integration()
1096 result = await integration.execute_crackerjack_command(
1097 command,
1098 args,
1099 working_directory,
1100 timeout,
1101 ai_agent_mode,
1102 )
1103 return asdict(result)
1106async def get_recent_crackerjack_results(
1107 hours: int = 24,
1108 command: str | None = None,
1109) -> list[dict[str, Any]]:
1110 """Get recent Crackerjack execution results."""
1111 integration = get_crackerjack_integration()
1112 return await integration.get_recent_results(hours, command)
1115async def get_quality_metrics_history(
1116 project_path: str,
1117 metric_type: str | None = None,
1118 days: int = 30,
1119) -> list[dict[str, Any]]:
1120 """Get quality metrics history for trend analysis."""
1121 integration = get_crackerjack_integration()
1122 return await integration.get_quality_metrics_history(
1123 project_path,
1124 metric_type,
1125 days,
1126 )
1129async def analyze_test_failure_patterns(days: int = 7) -> dict[str, Any]:
1130 """Analyze test failure patterns for insights."""
1131 integration = get_crackerjack_integration()
1132 return await integration.get_test_failure_patterns(days)
1135async def get_quality_trends(
1136 project_path: str,
1137 days: int = 30,
1138) -> dict[str, Any]:
1139 """Analyze quality trends over time."""
1140 integration = get_crackerjack_integration()
1141 return await integration.get_quality_trends(project_path, days)
1144async def crackerjack_health_check() -> dict[str, Any]:
1145 """Check Crackerjack integration health and dependencies."""
1146 integration = get_crackerjack_integration()
1147 return await integration.health_check()