Coverage for session_buddy / utils / crackerjack / output_parser.py: 61.11%

242 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1"""Crackerjack output parser for structured data extraction. 

2 

3This module provides parsing capabilities for Crackerjack tool output, 

4extracting test results, lint issues, security findings, coverage data, 

5complexity metrics, and progress information. 

6""" 

7 

8from __future__ import annotations 

9 

10import operator 

11from typing import Any 

12 

13from session_buddy.utils.crackerjack.pattern_builder import PatternMappingsBuilder 

14from session_buddy.utils.regex_patterns import SAFE_PATTERNS 

15 

16 

17class CrackerjackOutputParser: 

18 """Parses Crackerjack output for structured data extraction.""" 

19 

20 def __init__(self) -> None: 

21 """Initialize output parser with builder pattern.""" 

22 self.patterns = self._create_patterns() 

23 

24 def _create_patterns(self) -> dict[str, str]: 

25 """Create pattern mappings using builder pattern.""" 

26 return ( 

27 PatternMappingsBuilder() 

28 .add_test_patterns() 

29 .add_lint_patterns() 

30 .add_security_patterns() 

31 .add_quality_patterns() 

32 .add_progress_patterns() 

33 .add_coverage_patterns() 

34 .add_misc_patterns() 

35 .build() 

36 ) 

37 

38 def parse_output( 

39 self, 

40 command: str, 

41 stdout: str, 

42 stderr: str, 

43 ) -> tuple[dict[str, Any], list[str]]: 

44 """Parse Crackerjack output and extract insights.""" 

45 parsed_data = self._init_parsed_data(command) 

46 memory_insights: list[str] = [] 

47 full_output = f"{stdout}\n{stderr}" 

48 

49 # Apply applicable parsers based on command 

50 for parser_type in self._get_applicable_parsers(command): 

51 self._apply_parser(parser_type, full_output, parsed_data, memory_insights) 

52 

53 # Always parse progress information 

54 self._apply_parser("progress", full_output, parsed_data, memory_insights) 

55 

56 return parsed_data, memory_insights 

57 

58 def _init_parsed_data(self, command: str) -> dict[str, Any]: 

59 """Initialize parsed data structure.""" 

60 return { 

61 "command": command, 

62 "test_results": [], 

63 "lint_issues": [], 

64 "security_issues": [], 

65 "coverage_data": {}, 

66 "complexity_data": {}, 

67 "progress_info": {}, 

68 "quality_metrics": {}, 

69 } 

70 

71 def _get_applicable_parsers(self, command: str) -> list[str]: 

72 """Get list of parsers to apply for a command.""" 

73 parser_map = { 

74 "test": ["test", "coverage"], 

75 "check": ["test", "lint", "security", "coverage", "complexity"], 

76 "lint": ["lint"], 

77 "format": ["lint"], 

78 "security": ["security"], 

79 "coverage": ["coverage"], 

80 "complexity": ["complexity"], 

81 } 

82 return parser_map.get(command, []) 

83 

84 def _apply_parser( 

85 self, 

86 parser_type: str, 

87 output: str, 

88 parsed_data: dict[str, Any], 

89 insights: list[str], 

90 ) -> None: 

91 """Apply a specific parser and extract insights.""" 

92 parser_methods = { 

93 "test": (self._parse_test_output, self._extract_test_insights), 

94 "lint": (self._parse_lint_output, self._extract_lint_insights), 

95 "security": (self._parse_security_output, self._extract_security_insights), 

96 "coverage": (self._parse_coverage_output, self._extract_coverage_insights), 

97 "complexity": ( 

98 self._parse_complexity_output, 

99 self._extract_complexity_insights, 

100 ), 

101 "progress": (self._parse_progress_output, self._extract_progress_insights), 

102 } 

103 

104 if parser_type in parser_methods: 104 ↛ exitline 104 didn't return from function '_apply_parser' because the condition on line 104 was always true

105 parse_method, extract_method = parser_methods[parser_type] 

106 parsed_data.update(parse_method(output)) 

107 insights.extend(extract_method(parsed_data)) 

108 

109 def _parse_test_output(self, output: str) -> dict[str, Any]: 

110 """Parse pytest output for test results.""" 

111 data: dict[str, Any] = {"test_results": [], "test_summary": {}} 

112 

113 lines = output.split("\n") 

114 

115 for line in lines: 

116 # Test result lines 

117 pytest_pattern = SAFE_PATTERNS[self.patterns["pytest_result"]] 

118 match = pytest_pattern.search(line) 

119 if match: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true

120 file_path, test_name, status, coverage, duration = match.groups() 

121 data["test_results"].append( 

122 { 

123 "file": file_path, 

124 "test": test_name, 

125 "status": status.lower(), 

126 "coverage": coverage, 

127 "duration": duration, 

128 }, 

129 ) 

130 

131 # Summary lines 

132 summary_pattern = SAFE_PATTERNS[self.patterns["pytest_summary"]] 

133 summary_match = summary_pattern.search(line) 

134 if summary_match: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true

135 summary_text = summary_match.group(1) 

136 if "passed" in summary_text or "failed" in summary_text: 

137 data["test_summary"]["summary"] = summary_text 

138 

139 return data 

140 

141 def _parse_lint_output(self, output: str) -> dict[str, Any]: 

142 """Parse lint output for code quality issues.""" 

143 data: dict[str, Any] = {"lint_issues": [], "lint_summary": {}} 

144 

145 lines = output.split("\n") 

146 total_errors = 0 

147 

148 for line in lines: 

149 # Ruff errors 

150 ruff_pattern = SAFE_PATTERNS[self.patterns["ruff_error"]] 

151 ruff_match = ruff_pattern.search(line) 

152 if ruff_match: 152 ↛ 153line 152 didn't jump to line 153 because the condition on line 152 was never true

153 file_path, line_num, col_num, error_type, message = ruff_match.groups() 

154 data["lint_issues"].append( 

155 { 

156 "tool": "ruff", 

157 "file": file_path, 

158 "line": int(line_num), 

159 "column": int(col_num), 

160 "type": error_type, 

161 "message": message, 

162 }, 

163 ) 

164 total_errors += 1 

165 

166 # Pyright errors 

167 pyright_pattern = SAFE_PATTERNS[self.patterns["pyright_error"]] 

168 pyright_match = pyright_pattern.search(line) 

169 if pyright_match: 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true

170 file_path, line_num, col_num, severity, message = pyright_match.groups() 

171 data["lint_issues"].append( 

172 { 

173 "tool": "pyright", 

174 "file": file_path, 

175 "line": int(line_num), 

176 "column": int(col_num), 

177 "type": severity, 

178 "message": message, 

179 }, 

180 ) 

181 total_errors += 1 

182 

183 data["lint_summary"] = {"total_issues": total_errors} 

184 return data 

185 

186 def _parse_security_output(self, output: str) -> dict[str, Any]: 

187 """Parse bandit security scan output.""" 

188 data: dict[str, Any] = {"security_issues": [], "security_summary": {}} 

189 

190 lines = output.split("\n") 

191 current_issue = None 

192 

193 for line in lines: 

194 bandit_issue_pattern = SAFE_PATTERNS[self.patterns["bandit_issue"]] 

195 issue_match = bandit_issue_pattern.search(line) 

196 if issue_match: 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true

197 issue_id, description = issue_match.groups() 

198 current_issue = { 

199 "id": issue_id, 

200 "description": description, 

201 "severity": None, 

202 "confidence": None, 

203 } 

204 data["security_issues"].append(current_issue) 

205 

206 bandit_severity_pattern = SAFE_PATTERNS[self.patterns["bandit_severity"]] 

207 severity_match = bandit_severity_pattern.search(line) 

208 if severity_match and current_issue: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true

209 severity, confidence = severity_match.groups() 

210 current_issue["severity"] = severity 

211 current_issue["confidence"] = confidence 

212 

213 data["security_summary"] = {"total_issues": len(data["security_issues"])} 

214 return data 

215 

216 def _parse_coverage_output(self, output: str) -> dict[str, Any]: 

217 """Parse coverage report output.""" 

218 data: dict[str, Any] = {"coverage_data": {}, "coverage_summary": {}} 

219 

220 lines = output.split("\n") 

221 

222 for line in lines: 

223 # Individual file coverage 

224 coverage_line_pattern = SAFE_PATTERNS[self.patterns["coverage_line"]] 

225 coverage_match = coverage_line_pattern.search(line) 

226 if coverage_match: 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true

227 file_path, statements, missing, coverage = coverage_match.groups() 

228 data["coverage_data"][file_path] = { 

229 "statements": int(statements), 

230 "missing": int(missing), 

231 "coverage": int(coverage.rstrip("%")), 

232 } 

233 

234 # Total coverage 

235 pytest_coverage_pattern = SAFE_PATTERNS[self.patterns["pytest_coverage"]] 

236 total_match = pytest_coverage_pattern.search(line) 

237 if total_match: 237 ↛ 238line 237 didn't jump to line 238 because the condition on line 237 was never true

238 total_coverage = int(total_match.group(1)) 

239 data["coverage_summary"]["total_coverage"] = total_coverage 

240 

241 return data 

242 

243 def _parse_complexity_output(self, output: str) -> dict[str, Any]: 

244 """Parse complexity analysis output.""" 

245 data: dict[str, Any] = {"complexity_data": {}, "complexity_summary": {}} 

246 

247 lines = output.split("\n") 

248 total_files = 0 

249 high_complexity = 0 

250 

251 for line in lines: 

252 complexity_pattern = SAFE_PATTERNS[self.patterns["complexity_score"]] 

253 complexity_match = complexity_pattern.search(line) 

254 if complexity_match: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true

255 file_path, lines_count, complexity_score = complexity_match.groups() 

256 complexity_val = float(complexity_score) 

257 data["complexity_data"][file_path] = { 

258 "lines": int(lines_count), 

259 "complexity": complexity_val, 

260 } 

261 total_files += 1 

262 if complexity_val > 10: # Configurable threshold 

263 high_complexity += 1 

264 

265 data["complexity_summary"] = { 

266 "total_files": total_files, 

267 "high_complexity_files": high_complexity, 

268 } 

269 return data 

270 

271 def _parse_progress_output(self, output: str) -> dict[str, Any]: 

272 """Parse progress indicators from output.""" 

273 data: dict[str, Any] = {"progress_info": {}} 

274 lines = output.split("\n") 

275 

276 progress_state = self._initialize_progress_state() 

277 

278 for line in lines: 

279 self._process_progress_line(line, data, progress_state) 

280 

281 self._finalize_progress_data(data, progress_state) 

282 return data 

283 

284 def _initialize_progress_state(self) -> dict[str, Any]: 

285 """Initialize progress parsing state.""" 

286 return { 

287 "completed_tasks": [], 

288 "failed_tasks": [], 

289 "current_percentage": 0.0, 

290 } 

291 

292 def _process_progress_line( 

293 self, 

294 line: str, 

295 data: dict[str, Any], 

296 progress_state: dict[str, Any], 

297 ) -> None: 

298 """Process a single line for progress indicators.""" 

299 self._extract_current_task(line, data) 

300 self._extract_percentage(line, progress_state) 

301 self._extract_completed_tasks(line, progress_state) 

302 self._extract_failed_tasks(line, progress_state) 

303 

304 def _extract_current_task(self, line: str, data: dict[str, Any]) -> None: 

305 """Extract current task from line.""" 

306 progress_pattern = SAFE_PATTERNS[self.patterns["progress_indicator"]] 

307 progress_match = progress_pattern.search(line) 

308 if progress_match: 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true

309 data["progress_info"]["current_task"] = progress_match.group(1) 

310 

311 def _extract_percentage(self, line: str, progress_state: dict[str, Any]) -> None: 

312 """Extract percentage completion from line.""" 

313 percentage_pattern = SAFE_PATTERNS[self.patterns["percentage"]] 

314 percentage_match = percentage_pattern.search(line) 

315 if percentage_match: 315 ↛ 316line 315 didn't jump to line 316 because the condition on line 315 was never true

316 progress_state["current_percentage"] = float(percentage_match.group(1)) 

317 

318 def _extract_completed_tasks( 

319 self, 

320 line: str, 

321 progress_state: dict[str, Any], 

322 ) -> None: 

323 """Extract completed tasks from line.""" 

324 completion_pattern = SAFE_PATTERNS[self.patterns["task_completion"]] 

325 completion_match = completion_pattern.search(line) 

326 if completion_match: 326 ↛ 327line 326 didn't jump to line 327 because the condition on line 326 was never true

327 task = self._get_task_from_match(completion_match) 

328 if task: 

329 progress_state["completed_tasks"].append(task.strip()) 

330 

331 def _extract_failed_tasks(self, line: str, progress_state: dict[str, Any]) -> None: 

332 """Extract failed tasks from line.""" 

333 failure_pattern = SAFE_PATTERNS[self.patterns["task_failure"]] 

334 failure_match = failure_pattern.search(line) 

335 if failure_match: 335 ↛ 336line 335 didn't jump to line 336 because the condition on line 335 was never true

336 task = self._get_task_from_match(failure_match) 

337 if task: 

338 progress_state["failed_tasks"].append(task.strip()) 

339 

340 def _get_task_from_match(self, match: Any) -> str | None: 

341 """Extract task name from pattern match groups.""" 

342 return match.group(1) or match.group(2) or match.group(3) # type: ignore[no-any-return] 

343 

344 def _finalize_progress_data( 

345 self, 

346 data: dict[str, Any], 

347 progress_state: dict[str, Any], 

348 ) -> None: 

349 """Update final progress data with collected state.""" 

350 data["progress_info"].update( 

351 { 

352 "percentage": progress_state["current_percentage"], 

353 "completed_tasks": progress_state["completed_tasks"], 

354 "failed_tasks": progress_state["failed_tasks"], 

355 }, 

356 ) 

357 

358 def _extract_test_insights(self, parsed_data: dict[str, Any]) -> list[str]: 

359 """Extract memory insights from test results.""" 

360 insights = [] 

361 test_results = parsed_data.get("test_results", []) 

362 

363 if test_results: 

364 passed = sum(1 for t in test_results if t["status"] == "passed") 

365 failed = sum(1 for t in test_results if t["status"] == "failed") 

366 total = len(test_results) 

367 

368 if total > 0: 

369 pass_rate = (passed / total) * 100 

370 insights.append( 

371 f"Test suite: {passed}/{total} tests passed ({pass_rate:.1f}% pass rate)", 

372 ) 

373 

374 if failed > 0: 

375 failed_files = { 

376 t["file"] for t in test_results if t["status"] == "failed" 

377 } 

378 insights.append( 

379 f"Test failures found in {len(failed_files)} files: {', '.join(failed_files)}", 

380 ) 

381 

382 if pass_rate == 100: 

383 insights.append("All tests passing - code quality is stable") 

384 elif pass_rate < 80: 

385 insights.append( 

386 "Test pass rate below 80% - investigate failing tests", 

387 ) 

388 

389 return insights 

390 

391 def _extract_lint_insights(self, parsed_data: dict[str, Any]) -> list[str]: 

392 """Extract memory insights from lint results.""" 

393 insights = [] 

394 lint_issues = parsed_data.get("lint_issues", []) 

395 

396 if lint_issues: 396 ↛ 397line 396 didn't jump to line 397 because the condition on line 396 was never true

397 total_issues = len(lint_issues) 

398 by_type: dict[str, int] = {} 

399 by_file: dict[str, int] = {} 

400 

401 for issue in lint_issues: 

402 issue_type = issue.get("type", "unknown") 

403 file_path = issue.get("file", "unknown") 

404 

405 by_type[issue_type] = by_type.get(issue_type, 0) + 1 

406 by_file[file_path] = by_file.get(file_path, 0) + 1 

407 

408 insights.append(f"Code quality: {total_issues} lint issues found") 

409 

410 # Top issue types 

411 top_types = sorted( 

412 by_type.items(), key=operator.itemgetter(1), reverse=True 

413 )[:3] 

414 if top_types: 

415 type_summary = ", ".join(f"{t}: {c}" for t, c in top_types) 

416 insights.append(f"Most common issues: {type_summary}") 

417 

418 # Files needing attention 

419 top_files = sorted( 

420 by_file.items(), key=operator.itemgetter(1), reverse=True 

421 )[:3] 

422 if top_files and top_files[0][1] > 5: 

423 insights.append( 

424 f"Files needing attention: {top_files[0][0]} ({top_files[0][1]} issues)", 

425 ) 

426 else: 

427 insights.append("Code quality: No lint issues found - code is clean") 

428 

429 return insights 

430 

431 def _extract_security_insights(self, parsed_data: dict[str, Any]) -> list[str]: 

432 """Extract memory insights from security scan.""" 

433 insights = [] 

434 security_issues = parsed_data.get("security_issues", []) 

435 

436 if security_issues: 436 ↛ 437line 436 didn't jump to line 437 because the condition on line 436 was never true

437 total_issues = len(security_issues) 

438 high_severity = sum( 

439 1 for i in security_issues if i.get("severity") == "HIGH" 

440 ) 

441 

442 insights.append( 

443 f"Security scan: {total_issues} potential security issues found", 

444 ) 

445 

446 if high_severity > 0: 

447 insights.append( 

448 f"⚠️ {high_severity} high-severity security issues require immediate attention", 

449 ) 

450 else: 

451 insights.append("No high-severity security issues detected") 

452 else: 

453 insights.append( 

454 "Security scan: No security issues detected - code appears secure", 

455 ) 

456 

457 return insights 

458 

459 def _extract_coverage_insights(self, parsed_data: dict[str, Any]) -> list[str]: 

460 """Extract memory insights from coverage data.""" 

461 insights = [] 

462 coverage_summary = parsed_data.get("coverage_summary", {}) 

463 

464 if "total_coverage" in coverage_summary: 464 ↛ 465line 464 didn't jump to line 465 because the condition on line 464 was never true

465 coverage = coverage_summary["total_coverage"] 

466 insights.append(f"Test coverage: {coverage}% of code is covered by tests") 

467 

468 if coverage >= 90: 

469 insights.append("Excellent test coverage - code is well tested") 

470 elif coverage >= 80: 

471 insights.append("Good test coverage - consider adding more tests") 

472 elif coverage >= 60: 

473 insights.append( 

474 "Moderate test coverage - significant testing gaps exist", 

475 ) 

476 else: 

477 insights.append( 

478 "Low test coverage - critical testing gaps need attention", 

479 ) 

480 

481 return insights 

482 

483 def _extract_complexity_insights(self, parsed_data: dict[str, Any]) -> list[str]: 

484 """Extract memory insights from complexity analysis.""" 

485 insights = [] 

486 complexity_summary = parsed_data.get("complexity_summary", {}) 

487 

488 if complexity_summary: 488 ↛ 505line 488 didn't jump to line 505 because the condition on line 488 was always true

489 total_files = complexity_summary.get("total_files", 0) 

490 high_complexity = complexity_summary.get("high_complexity_files", 0) 

491 

492 if total_files > 0: 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true

493 complexity_rate = (high_complexity / total_files) * 100 

494 insights.append( 

495 f"Code complexity: {high_complexity}/{total_files} files have high complexity ({complexity_rate:.1f}%)", 

496 ) 

497 

498 if complexity_rate == 0: 

499 insights.append("Code complexity is well managed") 

500 elif complexity_rate > 20: 

501 insights.append( 

502 "Consider refactoring high-complexity files for maintainability", 

503 ) 

504 

505 return insights 

506 

507 def _extract_progress_insights(self, parsed_data: dict[str, Any]) -> list[str]: 

508 """Extract memory insights from progress information.""" 

509 insights = [] 

510 progress_info = parsed_data.get("progress_info", {}) 

511 

512 completed_tasks = progress_info.get("completed_tasks", []) 

513 failed_tasks = progress_info.get("failed_tasks", []) 

514 percentage = progress_info.get("percentage", 0) 

515 

516 if completed_tasks: 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true

517 insights.append(f"Progress: Completed {len(completed_tasks)} tasks") 

518 

519 if failed_tasks: 519 ↛ 520line 519 didn't jump to line 520 because the condition on line 519 was never true

520 insights.append( 

521 f"⚠️ {len(failed_tasks)} tasks failed: {', '.join(failed_tasks[:3])}", 

522 ) 

523 

524 if percentage > 0: 524 ↛ 525line 524 didn't jump to line 525 because the condition on line 524 was never true

525 insights.append(f"Overall progress: {percentage}% complete") 

526 

527 return insights