Coverage for session_buddy / utils / crackerjack / output_parser.py: 61.11%
242 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1"""Crackerjack output parser for structured data extraction.
3This module provides parsing capabilities for Crackerjack tool output,
4extracting test results, lint issues, security findings, coverage data,
5complexity metrics, and progress information.
6"""
8from __future__ import annotations
10import operator
11from typing import Any
13from session_buddy.utils.crackerjack.pattern_builder import PatternMappingsBuilder
14from session_buddy.utils.regex_patterns import SAFE_PATTERNS
17class CrackerjackOutputParser:
18 """Parses Crackerjack output for structured data extraction."""
20 def __init__(self) -> None:
21 """Initialize output parser with builder pattern."""
22 self.patterns = self._create_patterns()
24 def _create_patterns(self) -> dict[str, str]:
25 """Create pattern mappings using builder pattern."""
26 return (
27 PatternMappingsBuilder()
28 .add_test_patterns()
29 .add_lint_patterns()
30 .add_security_patterns()
31 .add_quality_patterns()
32 .add_progress_patterns()
33 .add_coverage_patterns()
34 .add_misc_patterns()
35 .build()
36 )
38 def parse_output(
39 self,
40 command: str,
41 stdout: str,
42 stderr: str,
43 ) -> tuple[dict[str, Any], list[str]]:
44 """Parse Crackerjack output and extract insights."""
45 parsed_data = self._init_parsed_data(command)
46 memory_insights: list[str] = []
47 full_output = f"{stdout}\n{stderr}"
49 # Apply applicable parsers based on command
50 for parser_type in self._get_applicable_parsers(command):
51 self._apply_parser(parser_type, full_output, parsed_data, memory_insights)
53 # Always parse progress information
54 self._apply_parser("progress", full_output, parsed_data, memory_insights)
56 return parsed_data, memory_insights
58 def _init_parsed_data(self, command: str) -> dict[str, Any]:
59 """Initialize parsed data structure."""
60 return {
61 "command": command,
62 "test_results": [],
63 "lint_issues": [],
64 "security_issues": [],
65 "coverage_data": {},
66 "complexity_data": {},
67 "progress_info": {},
68 "quality_metrics": {},
69 }
71 def _get_applicable_parsers(self, command: str) -> list[str]:
72 """Get list of parsers to apply for a command."""
73 parser_map = {
74 "test": ["test", "coverage"],
75 "check": ["test", "lint", "security", "coverage", "complexity"],
76 "lint": ["lint"],
77 "format": ["lint"],
78 "security": ["security"],
79 "coverage": ["coverage"],
80 "complexity": ["complexity"],
81 }
82 return parser_map.get(command, [])
84 def _apply_parser(
85 self,
86 parser_type: str,
87 output: str,
88 parsed_data: dict[str, Any],
89 insights: list[str],
90 ) -> None:
91 """Apply a specific parser and extract insights."""
92 parser_methods = {
93 "test": (self._parse_test_output, self._extract_test_insights),
94 "lint": (self._parse_lint_output, self._extract_lint_insights),
95 "security": (self._parse_security_output, self._extract_security_insights),
96 "coverage": (self._parse_coverage_output, self._extract_coverage_insights),
97 "complexity": (
98 self._parse_complexity_output,
99 self._extract_complexity_insights,
100 ),
101 "progress": (self._parse_progress_output, self._extract_progress_insights),
102 }
104 if parser_type in parser_methods: 104 ↛ exitline 104 didn't return from function '_apply_parser' because the condition on line 104 was always true
105 parse_method, extract_method = parser_methods[parser_type]
106 parsed_data.update(parse_method(output))
107 insights.extend(extract_method(parsed_data))
109 def _parse_test_output(self, output: str) -> dict[str, Any]:
110 """Parse pytest output for test results."""
111 data: dict[str, Any] = {"test_results": [], "test_summary": {}}
113 lines = output.split("\n")
115 for line in lines:
116 # Test result lines
117 pytest_pattern = SAFE_PATTERNS[self.patterns["pytest_result"]]
118 match = pytest_pattern.search(line)
119 if match: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true
120 file_path, test_name, status, coverage, duration = match.groups()
121 data["test_results"].append(
122 {
123 "file": file_path,
124 "test": test_name,
125 "status": status.lower(),
126 "coverage": coverage,
127 "duration": duration,
128 },
129 )
131 # Summary lines
132 summary_pattern = SAFE_PATTERNS[self.patterns["pytest_summary"]]
133 summary_match = summary_pattern.search(line)
134 if summary_match: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true
135 summary_text = summary_match.group(1)
136 if "passed" in summary_text or "failed" in summary_text:
137 data["test_summary"]["summary"] = summary_text
139 return data
141 def _parse_lint_output(self, output: str) -> dict[str, Any]:
142 """Parse lint output for code quality issues."""
143 data: dict[str, Any] = {"lint_issues": [], "lint_summary": {}}
145 lines = output.split("\n")
146 total_errors = 0
148 for line in lines:
149 # Ruff errors
150 ruff_pattern = SAFE_PATTERNS[self.patterns["ruff_error"]]
151 ruff_match = ruff_pattern.search(line)
152 if ruff_match: 152 ↛ 153line 152 didn't jump to line 153 because the condition on line 152 was never true
153 file_path, line_num, col_num, error_type, message = ruff_match.groups()
154 data["lint_issues"].append(
155 {
156 "tool": "ruff",
157 "file": file_path,
158 "line": int(line_num),
159 "column": int(col_num),
160 "type": error_type,
161 "message": message,
162 },
163 )
164 total_errors += 1
166 # Pyright errors
167 pyright_pattern = SAFE_PATTERNS[self.patterns["pyright_error"]]
168 pyright_match = pyright_pattern.search(line)
169 if pyright_match: 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true
170 file_path, line_num, col_num, severity, message = pyright_match.groups()
171 data["lint_issues"].append(
172 {
173 "tool": "pyright",
174 "file": file_path,
175 "line": int(line_num),
176 "column": int(col_num),
177 "type": severity,
178 "message": message,
179 },
180 )
181 total_errors += 1
183 data["lint_summary"] = {"total_issues": total_errors}
184 return data
186 def _parse_security_output(self, output: str) -> dict[str, Any]:
187 """Parse bandit security scan output."""
188 data: dict[str, Any] = {"security_issues": [], "security_summary": {}}
190 lines = output.split("\n")
191 current_issue = None
193 for line in lines:
194 bandit_issue_pattern = SAFE_PATTERNS[self.patterns["bandit_issue"]]
195 issue_match = bandit_issue_pattern.search(line)
196 if issue_match: 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true
197 issue_id, description = issue_match.groups()
198 current_issue = {
199 "id": issue_id,
200 "description": description,
201 "severity": None,
202 "confidence": None,
203 }
204 data["security_issues"].append(current_issue)
206 bandit_severity_pattern = SAFE_PATTERNS[self.patterns["bandit_severity"]]
207 severity_match = bandit_severity_pattern.search(line)
208 if severity_match and current_issue: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true
209 severity, confidence = severity_match.groups()
210 current_issue["severity"] = severity
211 current_issue["confidence"] = confidence
213 data["security_summary"] = {"total_issues": len(data["security_issues"])}
214 return data
216 def _parse_coverage_output(self, output: str) -> dict[str, Any]:
217 """Parse coverage report output."""
218 data: dict[str, Any] = {"coverage_data": {}, "coverage_summary": {}}
220 lines = output.split("\n")
222 for line in lines:
223 # Individual file coverage
224 coverage_line_pattern = SAFE_PATTERNS[self.patterns["coverage_line"]]
225 coverage_match = coverage_line_pattern.search(line)
226 if coverage_match: 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true
227 file_path, statements, missing, coverage = coverage_match.groups()
228 data["coverage_data"][file_path] = {
229 "statements": int(statements),
230 "missing": int(missing),
231 "coverage": int(coverage.rstrip("%")),
232 }
234 # Total coverage
235 pytest_coverage_pattern = SAFE_PATTERNS[self.patterns["pytest_coverage"]]
236 total_match = pytest_coverage_pattern.search(line)
237 if total_match: 237 ↛ 238line 237 didn't jump to line 238 because the condition on line 237 was never true
238 total_coverage = int(total_match.group(1))
239 data["coverage_summary"]["total_coverage"] = total_coverage
241 return data
243 def _parse_complexity_output(self, output: str) -> dict[str, Any]:
244 """Parse complexity analysis output."""
245 data: dict[str, Any] = {"complexity_data": {}, "complexity_summary": {}}
247 lines = output.split("\n")
248 total_files = 0
249 high_complexity = 0
251 for line in lines:
252 complexity_pattern = SAFE_PATTERNS[self.patterns["complexity_score"]]
253 complexity_match = complexity_pattern.search(line)
254 if complexity_match: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true
255 file_path, lines_count, complexity_score = complexity_match.groups()
256 complexity_val = float(complexity_score)
257 data["complexity_data"][file_path] = {
258 "lines": int(lines_count),
259 "complexity": complexity_val,
260 }
261 total_files += 1
262 if complexity_val > 10: # Configurable threshold
263 high_complexity += 1
265 data["complexity_summary"] = {
266 "total_files": total_files,
267 "high_complexity_files": high_complexity,
268 }
269 return data
271 def _parse_progress_output(self, output: str) -> dict[str, Any]:
272 """Parse progress indicators from output."""
273 data: dict[str, Any] = {"progress_info": {}}
274 lines = output.split("\n")
276 progress_state = self._initialize_progress_state()
278 for line in lines:
279 self._process_progress_line(line, data, progress_state)
281 self._finalize_progress_data(data, progress_state)
282 return data
284 def _initialize_progress_state(self) -> dict[str, Any]:
285 """Initialize progress parsing state."""
286 return {
287 "completed_tasks": [],
288 "failed_tasks": [],
289 "current_percentage": 0.0,
290 }
292 def _process_progress_line(
293 self,
294 line: str,
295 data: dict[str, Any],
296 progress_state: dict[str, Any],
297 ) -> None:
298 """Process a single line for progress indicators."""
299 self._extract_current_task(line, data)
300 self._extract_percentage(line, progress_state)
301 self._extract_completed_tasks(line, progress_state)
302 self._extract_failed_tasks(line, progress_state)
304 def _extract_current_task(self, line: str, data: dict[str, Any]) -> None:
305 """Extract current task from line."""
306 progress_pattern = SAFE_PATTERNS[self.patterns["progress_indicator"]]
307 progress_match = progress_pattern.search(line)
308 if progress_match: 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true
309 data["progress_info"]["current_task"] = progress_match.group(1)
311 def _extract_percentage(self, line: str, progress_state: dict[str, Any]) -> None:
312 """Extract percentage completion from line."""
313 percentage_pattern = SAFE_PATTERNS[self.patterns["percentage"]]
314 percentage_match = percentage_pattern.search(line)
315 if percentage_match: 315 ↛ 316line 315 didn't jump to line 316 because the condition on line 315 was never true
316 progress_state["current_percentage"] = float(percentage_match.group(1))
318 def _extract_completed_tasks(
319 self,
320 line: str,
321 progress_state: dict[str, Any],
322 ) -> None:
323 """Extract completed tasks from line."""
324 completion_pattern = SAFE_PATTERNS[self.patterns["task_completion"]]
325 completion_match = completion_pattern.search(line)
326 if completion_match: 326 ↛ 327line 326 didn't jump to line 327 because the condition on line 326 was never true
327 task = self._get_task_from_match(completion_match)
328 if task:
329 progress_state["completed_tasks"].append(task.strip())
331 def _extract_failed_tasks(self, line: str, progress_state: dict[str, Any]) -> None:
332 """Extract failed tasks from line."""
333 failure_pattern = SAFE_PATTERNS[self.patterns["task_failure"]]
334 failure_match = failure_pattern.search(line)
335 if failure_match: 335 ↛ 336line 335 didn't jump to line 336 because the condition on line 335 was never true
336 task = self._get_task_from_match(failure_match)
337 if task:
338 progress_state["failed_tasks"].append(task.strip())
340 def _get_task_from_match(self, match: Any) -> str | None:
341 """Extract task name from pattern match groups."""
342 return match.group(1) or match.group(2) or match.group(3) # type: ignore[no-any-return]
344 def _finalize_progress_data(
345 self,
346 data: dict[str, Any],
347 progress_state: dict[str, Any],
348 ) -> None:
349 """Update final progress data with collected state."""
350 data["progress_info"].update(
351 {
352 "percentage": progress_state["current_percentage"],
353 "completed_tasks": progress_state["completed_tasks"],
354 "failed_tasks": progress_state["failed_tasks"],
355 },
356 )
358 def _extract_test_insights(self, parsed_data: dict[str, Any]) -> list[str]:
359 """Extract memory insights from test results."""
360 insights = []
361 test_results = parsed_data.get("test_results", [])
363 if test_results:
364 passed = sum(1 for t in test_results if t["status"] == "passed")
365 failed = sum(1 for t in test_results if t["status"] == "failed")
366 total = len(test_results)
368 if total > 0:
369 pass_rate = (passed / total) * 100
370 insights.append(
371 f"Test suite: {passed}/{total} tests passed ({pass_rate:.1f}% pass rate)",
372 )
374 if failed > 0:
375 failed_files = {
376 t["file"] for t in test_results if t["status"] == "failed"
377 }
378 insights.append(
379 f"Test failures found in {len(failed_files)} files: {', '.join(failed_files)}",
380 )
382 if pass_rate == 100:
383 insights.append("All tests passing - code quality is stable")
384 elif pass_rate < 80:
385 insights.append(
386 "Test pass rate below 80% - investigate failing tests",
387 )
389 return insights
391 def _extract_lint_insights(self, parsed_data: dict[str, Any]) -> list[str]:
392 """Extract memory insights from lint results."""
393 insights = []
394 lint_issues = parsed_data.get("lint_issues", [])
396 if lint_issues: 396 ↛ 397line 396 didn't jump to line 397 because the condition on line 396 was never true
397 total_issues = len(lint_issues)
398 by_type: dict[str, int] = {}
399 by_file: dict[str, int] = {}
401 for issue in lint_issues:
402 issue_type = issue.get("type", "unknown")
403 file_path = issue.get("file", "unknown")
405 by_type[issue_type] = by_type.get(issue_type, 0) + 1
406 by_file[file_path] = by_file.get(file_path, 0) + 1
408 insights.append(f"Code quality: {total_issues} lint issues found")
410 # Top issue types
411 top_types = sorted(
412 by_type.items(), key=operator.itemgetter(1), reverse=True
413 )[:3]
414 if top_types:
415 type_summary = ", ".join(f"{t}: {c}" for t, c in top_types)
416 insights.append(f"Most common issues: {type_summary}")
418 # Files needing attention
419 top_files = sorted(
420 by_file.items(), key=operator.itemgetter(1), reverse=True
421 )[:3]
422 if top_files and top_files[0][1] > 5:
423 insights.append(
424 f"Files needing attention: {top_files[0][0]} ({top_files[0][1]} issues)",
425 )
426 else:
427 insights.append("Code quality: No lint issues found - code is clean")
429 return insights
431 def _extract_security_insights(self, parsed_data: dict[str, Any]) -> list[str]:
432 """Extract memory insights from security scan."""
433 insights = []
434 security_issues = parsed_data.get("security_issues", [])
436 if security_issues: 436 ↛ 437line 436 didn't jump to line 437 because the condition on line 436 was never true
437 total_issues = len(security_issues)
438 high_severity = sum(
439 1 for i in security_issues if i.get("severity") == "HIGH"
440 )
442 insights.append(
443 f"Security scan: {total_issues} potential security issues found",
444 )
446 if high_severity > 0:
447 insights.append(
448 f"⚠️ {high_severity} high-severity security issues require immediate attention",
449 )
450 else:
451 insights.append("No high-severity security issues detected")
452 else:
453 insights.append(
454 "Security scan: No security issues detected - code appears secure",
455 )
457 return insights
459 def _extract_coverage_insights(self, parsed_data: dict[str, Any]) -> list[str]:
460 """Extract memory insights from coverage data."""
461 insights = []
462 coverage_summary = parsed_data.get("coverage_summary", {})
464 if "total_coverage" in coverage_summary: 464 ↛ 465line 464 didn't jump to line 465 because the condition on line 464 was never true
465 coverage = coverage_summary["total_coverage"]
466 insights.append(f"Test coverage: {coverage}% of code is covered by tests")
468 if coverage >= 90:
469 insights.append("Excellent test coverage - code is well tested")
470 elif coverage >= 80:
471 insights.append("Good test coverage - consider adding more tests")
472 elif coverage >= 60:
473 insights.append(
474 "Moderate test coverage - significant testing gaps exist",
475 )
476 else:
477 insights.append(
478 "Low test coverage - critical testing gaps need attention",
479 )
481 return insights
483 def _extract_complexity_insights(self, parsed_data: dict[str, Any]) -> list[str]:
484 """Extract memory insights from complexity analysis."""
485 insights = []
486 complexity_summary = parsed_data.get("complexity_summary", {})
488 if complexity_summary: 488 ↛ 505line 488 didn't jump to line 505 because the condition on line 488 was always true
489 total_files = complexity_summary.get("total_files", 0)
490 high_complexity = complexity_summary.get("high_complexity_files", 0)
492 if total_files > 0: 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true
493 complexity_rate = (high_complexity / total_files) * 100
494 insights.append(
495 f"Code complexity: {high_complexity}/{total_files} files have high complexity ({complexity_rate:.1f}%)",
496 )
498 if complexity_rate == 0:
499 insights.append("Code complexity is well managed")
500 elif complexity_rate > 20:
501 insights.append(
502 "Consider refactoring high-complexity files for maintainability",
503 )
505 return insights
507 def _extract_progress_insights(self, parsed_data: dict[str, Any]) -> list[str]:
508 """Extract memory insights from progress information."""
509 insights = []
510 progress_info = parsed_data.get("progress_info", {})
512 completed_tasks = progress_info.get("completed_tasks", [])
513 failed_tasks = progress_info.get("failed_tasks", [])
514 percentage = progress_info.get("percentage", 0)
516 if completed_tasks: 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true
517 insights.append(f"Progress: Completed {len(completed_tasks)} tasks")
519 if failed_tasks: 519 ↛ 520line 519 didn't jump to line 520 because the condition on line 519 was never true
520 insights.append(
521 f"⚠️ {len(failed_tasks)} tasks failed: {', '.join(failed_tasks[:3])}",
522 )
524 if percentage > 0: 524 ↛ 525line 524 didn't jump to line 525 because the condition on line 524 was never true
525 insights.append(f"Overall progress: {percentage}% complete")
527 return insights