Coverage for little_loops / issue_history / quality.py: 94%

180 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""Issue history quality analysis: test gaps, rejections, manual patterns, config gaps.""" 

2 

3from __future__ import annotations 

4 

5import json 

6import re 

7from pathlib import Path 

8from typing import Any 

9 

10from little_loops.issue_history._utils import get_issue_content 

11from little_loops.issue_history.models import ( 

12 CompletedIssue, 

13 ConfigGap, 

14 ConfigGapsAnalysis, 

15 HotspotAnalysis, 

16 ManualPattern, 

17 ManualPatternAnalysis, 

18 RejectionAnalysis, 

19 RejectionMetrics, 

20 TestGap, 

21 TestGapAnalysis, 

22) 

23from little_loops.issue_history.parsing import _find_test_file, _parse_resolution_action 

24 

25 

26def analyze_test_gaps( 

27 issues: list[CompletedIssue], 

28 hotspots: HotspotAnalysis, 

29 project_root: Path | None = None, 

30) -> TestGapAnalysis: 

31 """Correlate bug occurrences with test coverage gaps. 

32 

33 Args: 

34 issues: List of completed issues (unused, for API consistency) 

35 hotspots: Pre-computed hotspot analysis 

36 project_root: Project root for anchoring test file existence checks. Defaults to CWD. 

37 

38 Returns: 

39 TestGapAnalysis with test coverage gap information 

40 """ 

41 # Build map of source files to bug info from hotspots 

42 bug_files: dict[str, dict[str, Any]] = {} 

43 

44 for hotspot in hotspots.file_hotspots: 

45 bug_count = hotspot.issue_types.get("BUG", 0) 

46 if bug_count > 0: 

47 # Filter to only BUG issue IDs 

48 bug_ids = [iid for iid in hotspot.issue_ids if iid.startswith("BUG-")] 

49 bug_files[hotspot.path] = { 

50 "bug_count": bug_count, 

51 "bug_ids": bug_ids, 

52 } 

53 

54 if not bug_files: 

55 return TestGapAnalysis() 

56 

57 # Analyze test coverage for each file with bugs 

58 gaps: list[TestGap] = [] 

59 files_with_tests: list[int] = [] # bug counts 

60 files_without_tests: list[int] = [] # bug counts 

61 

62 for source_file, data in bug_files.items(): 

63 bug_count = data["bug_count"] 

64 bug_ids = data["bug_ids"] 

65 

66 test_file = _find_test_file(source_file, project_root=project_root) 

67 has_test = test_file is not None 

68 

69 # Calculate gap score: higher = more urgent to add tests 

70 # Files without tests get amplified scores 

71 if has_test: 

72 gap_score = bug_count * 1.0 

73 files_with_tests.append(bug_count) 

74 else: 

75 gap_score = bug_count * 10.0 # Amplify untested files 

76 files_without_tests.append(bug_count) 

77 

78 # Determine priority based on bug count and test presence 

79 if not has_test and bug_count >= 5: 

80 priority = "critical" 

81 elif not has_test and bug_count >= 3: 

82 priority = "high" 

83 elif not has_test or bug_count >= 4: 

84 priority = "medium" 

85 else: 

86 priority = "low" 

87 

88 gaps.append( 

89 TestGap( 

90 source_file=source_file, 

91 bug_count=bug_count, 

92 bug_ids=bug_ids, 

93 has_test_file=has_test, 

94 test_file_path=test_file, 

95 gap_score=gap_score, 

96 priority=priority, 

97 ) 

98 ) 

99 

100 # Sort by gap score descending (highest priority first) 

101 gaps.sort(key=lambda g: (-g.gap_score, -g.bug_count)) 

102 

103 # Calculate averages for correlation 

104 avg_with_tests = sum(files_with_tests) / len(files_with_tests) if files_with_tests else 0.0 

105 avg_without_tests = ( 

106 sum(files_without_tests) / len(files_without_tests) if files_without_tests else 0.0 

107 ) 

108 

109 # Identify untested bug magnets (from hotspot analysis) 

110 untested_magnets = [ 

111 h.path 

112 for h in hotspots.bug_magnets 

113 if _find_test_file(h.path, project_root=project_root) is None 

114 ] 

115 

116 # Priority test targets: untested files sorted by bug count 

117 priority_targets = [g.source_file for g in gaps if not g.has_test_file] 

118 

119 return TestGapAnalysis( 

120 gaps=gaps[:15], # Top 15 

121 untested_bug_magnets=untested_magnets, 

122 files_with_tests_avg_bugs=avg_with_tests, 

123 files_without_tests_avg_bugs=avg_without_tests, 

124 priority_test_targets=priority_targets[:10], 

125 ) 

126 

127 

128def _update_rejection_metrics(metrics: RejectionMetrics, category: str) -> None: 

129 if category == "completed": 

130 metrics.completed_count += 1 

131 elif category == "rejected": 

132 metrics.rejected_count += 1 

133 elif category == "invalid": 

134 metrics.invalid_count += 1 

135 elif category == "duplicate": 

136 metrics.duplicate_count += 1 

137 elif category == "deferred": 

138 metrics.deferred_count += 1 

139 

140 

141def analyze_rejection_rates( 

142 issues: list[CompletedIssue], 

143 contents: dict[Path, str] | None = None, 

144) -> RejectionAnalysis: 

145 """Analyze rejection and invalid closure patterns. 

146 

147 Args: 

148 issues: List of completed issues 

149 contents: Pre-loaded issue file contents (path -> content) 

150 

151 Returns: 

152 RejectionAnalysis with overall and grouped metrics 

153 """ 

154 if not issues: 

155 return RejectionAnalysis() 

156 

157 # Count by category 

158 overall = RejectionMetrics() 

159 by_type: dict[str, RejectionMetrics] = {} 

160 by_month: dict[str, RejectionMetrics] = {} 

161 reason_counts: dict[str, int] = {} 

162 

163 for issue in issues: 

164 content = get_issue_content(issue, contents) 

165 if content is None: 

166 continue 

167 

168 category = _parse_resolution_action(content) 

169 overall.total_closed += 1 

170 

171 # Update overall counts 

172 _update_rejection_metrics(overall, category) 

173 

174 # By type 

175 if issue.issue_type not in by_type: 

176 by_type[issue.issue_type] = RejectionMetrics() 

177 type_metrics = by_type[issue.issue_type] 

178 type_metrics.total_closed += 1 

179 _update_rejection_metrics(type_metrics, category) 

180 

181 # By month 

182 if issue.completed_date: 

183 month_key = issue.completed_date.strftime("%Y-%m") 

184 if month_key not in by_month: 

185 by_month[month_key] = RejectionMetrics() 

186 month_metrics = by_month[month_key] 

187 month_metrics.total_closed += 1 

188 _update_rejection_metrics(month_metrics, category) 

189 

190 # Extract reason for rejection/invalid 

191 if category in ("rejected", "invalid", "duplicate", "deferred"): 

192 reason_match = re.search(r"\*\*Reason\*\*:\s*(.+?)(?:\n|$)", content) 

193 if reason_match: 

194 reason = reason_match.group(1).strip() 

195 reason_counts[reason] = reason_counts.get(reason, 0) + 1 

196 

197 # Calculate trend from monthly data 

198 sorted_months = sorted(by_month.keys()) 

199 if len(sorted_months) >= 3: 

200 recent = sorted_months[-3:] 

201 rates = [by_month[m].rejection_rate + by_month[m].invalid_rate for m in recent] 

202 if rates[-1] < rates[0] * 0.8: 

203 trend = "improving" 

204 elif rates[-1] > rates[0] * 1.2: 

205 trend = "degrading" 

206 else: 

207 trend = "stable" 

208 else: 

209 trend = "stable" 

210 

211 # Sort reasons by count 

212 common_reasons = sorted(reason_counts.items(), key=lambda x: -x[1])[:10] 

213 

214 return RejectionAnalysis( 

215 overall=overall, 

216 by_type=by_type, 

217 by_month=by_month, 

218 common_reasons=common_reasons, 

219 trend=trend, 

220 ) 

221 

222 

223# Pattern definitions for manual activity detection 

224_MANUAL_PATTERNS: dict[str, dict[str, Any]] = { 

225 "test": { 

226 "patterns": [ 

227 r"(?:pytest|python -m pytest|npm test|yarn test|jest|cargo test|go test)", 

228 r"(?:python -m unittest|nosetests|tox)", 

229 ], 

230 "description": "Test execution after code changes", 

231 "suggestion": "Add post-edit hook for automatic test runs", 

232 "complexity": "trivial", 

233 }, 

234 "lint": { 

235 "patterns": [ 

236 r"(?:ruff check|ruff format|black|isort|flake8|pylint)", 

237 r"(?:eslint|prettier|tslint)", 

238 ], 

239 "description": "Lint/format fixes after implementation", 

240 "suggestion": "Add pre-commit hook for auto-formatting", 

241 "complexity": "simple", 

242 }, 

243 "type_check": { 

244 "patterns": [ 

245 r"(?:mypy|pyright|python -m mypy)", 

246 r"(?:tsc|npx tsc)", 

247 ], 

248 "description": "Type checking during development", 

249 "suggestion": "Add mypy to pre-commit or post-edit hook", 

250 "complexity": "simple", 

251 }, 

252 "build": { 

253 "patterns": [ 

254 r"(?:npm run build|yarn build|make|cargo build|go build)", 

255 r"(?:python -m build|pip install -e)", 

256 ], 

257 "description": "Build steps during implementation", 

258 "suggestion": "Add build verification to test suite or CI", 

259 "complexity": "moderate", 

260 }, 

261 "git": { 

262 "patterns": [ 

263 r"git (?:add|commit|push|pull|checkout|branch)", 

264 ], 

265 "description": "Git operations during issue resolution", 

266 "suggestion": "Use /ll:commit skill for standardized commits", 

267 "complexity": "trivial", 

268 }, 

269} 

270 

271 

272def detect_manual_patterns( 

273 issues: list[CompletedIssue], 

274 contents: dict[Path, str] | None = None, 

275) -> ManualPatternAnalysis: 

276 """Detect recurring manual activities that could be automated. 

277 

278 Args: 

279 issues: List of completed issues 

280 contents: Pre-loaded issue file contents (path -> content) 

281 

282 Returns: 

283 ManualPatternAnalysis with detected patterns 

284 """ 

285 if not issues: 

286 return ManualPatternAnalysis() 

287 

288 # Track pattern occurrences 

289 pattern_data: dict[str, dict[str, Any]] = {} 

290 

291 for pattern_type, config in _MANUAL_PATTERNS.items(): 

292 pattern_data[pattern_type] = { 

293 "count": 0, 

294 "issues": [], 

295 "commands": [], 

296 "config": config, 

297 } 

298 

299 # Scan issue content for patterns 

300 for issue in issues: 

301 content = get_issue_content(issue, contents) 

302 if content is None: 

303 continue 

304 

305 for pattern_type, config in _MANUAL_PATTERNS.items(): 

306 for pattern in config["patterns"]: 

307 matches = re.findall(pattern, content, re.IGNORECASE) 

308 if matches: 

309 data = pattern_data[pattern_type] 

310 data["count"] += len(matches) 

311 if issue.issue_id not in data["issues"]: 

312 data["issues"].append(issue.issue_id) 

313 # Store unique command examples 

314 for match in matches: 

315 if match not in data["commands"]: 

316 data["commands"].append(match) 

317 

318 # Build ManualPattern objects 

319 patterns: list[ManualPattern] = [] 

320 total_interventions = 0 

321 automatable = 0 

322 

323 for pattern_type, data in pattern_data.items(): 

324 if data["count"] > 0: 

325 config = data["config"] 

326 pattern = ManualPattern( 

327 pattern_type=pattern_type, 

328 pattern_description=config["description"], 

329 occurrence_count=data["count"], 

330 affected_issues=data["issues"], 

331 example_commands=data["commands"][:5], 

332 suggested_automation=config["suggestion"], 

333 automation_complexity=config["complexity"], 

334 ) 

335 patterns.append(pattern) 

336 total_interventions += data["count"] 

337 automatable += data["count"] 

338 

339 # Sort by occurrence count descending 

340 patterns.sort(key=lambda p: -p.occurrence_count) 

341 

342 # Build automation suggestions 

343 suggestions = [p.suggested_automation for p in patterns if p.occurrence_count >= 2] 

344 

345 return ManualPatternAnalysis( 

346 patterns=patterns, 

347 total_manual_interventions=total_interventions, 

348 automatable_count=automatable, 

349 automation_suggestions=suggestions[:10], 

350 ) 

351 

352 

353# Mapping from manual pattern types to configuration solutions 

354_PATTERN_TO_CONFIG: dict[str, dict[str, Any]] = { 

355 "test": { 

356 "hook_event": "PostToolUse", 

357 "description": "Automatic test execution after code changes", 

358 "suggested_config": """hooks/hooks.json: 

359 "PostToolUse": [{ 

360 "matcher": "Edit|Write", 

361 "hooks": [{ 

362 "type": "command", 

363 "command": "pytest tests/ -x -q", 

364 "timeout": 30000 

365 }] 

366 }]""", 

367 }, 

368 "lint": { 

369 "hook_event": "PreToolUse", 

370 "description": "Automatic formatting before file writes", 

371 "suggested_config": """hooks/hooks.json: 

372 "PreToolUse": [{ 

373 "matcher": "Write|Edit", 

374 "hooks": [{ 

375 "type": "command", 

376 "command": "ruff format --check .", 

377 "timeout": 10000 

378 }] 

379 }]""", 

380 }, 

381 "type_check": { 

382 "hook_event": "PostToolUse", 

383 "description": "Type checking after code modifications", 

384 "suggested_config": """hooks/hooks.json: 

385 "PostToolUse": [{ 

386 "matcher": "Edit|Write", 

387 "hooks": [{ 

388 "type": "command", 

389 "command": "mypy --fast .", 

390 "timeout": 30000 

391 }] 

392 }]""", 

393 }, 

394 "build": { 

395 "hook_event": "PostToolUse", 

396 "description": "Build verification after changes", 

397 "suggested_config": """hooks/hooks.json: 

398 "PostToolUse": [{ 

399 "matcher": "Edit|Write", 

400 "hooks": [{ 

401 "type": "command", 

402 "command": "npm run build", 

403 "timeout": 60000 

404 }] 

405 }]""", 

406 }, 

407} 

408 

409 

410def detect_config_gaps( 

411 manual_pattern_analysis: ManualPatternAnalysis, 

412 project_root: Path | None = None, 

413) -> ConfigGapsAnalysis: 

414 """Detect configuration gaps based on manual pattern analysis. 

415 

416 Args: 

417 manual_pattern_analysis: Results from detect_manual_patterns() 

418 project_root: Project root directory (defaults to cwd) 

419 

420 Returns: 

421 ConfigGapsAnalysis with identified gaps and coverage metrics 

422 """ 

423 if project_root is None: 

424 project_root = Path.cwd() 

425 

426 # Discover current configuration 

427 current_hooks: list[str] = [] 

428 current_skills: list[str] = [] 

429 current_agents: list[str] = [] 

430 

431 # Load hooks configuration 

432 hooks_file = project_root / "hooks" / "hooks.json" 

433 if hooks_file.exists(): 

434 try: 

435 with open(hooks_file, encoding="utf-8") as f: 

436 hooks_data = json.load(f) 

437 current_hooks = list(hooks_data.get("hooks", {}).keys()) 

438 except Exception: 

439 pass 

440 

441 # Scan for agents 

442 agents_dir = project_root / "agents" 

443 if agents_dir.is_dir(): 

444 for agent_file in agents_dir.glob("*.md"): 

445 current_agents.append(agent_file.stem) 

446 

447 # Scan for skills 

448 skills_dir = project_root / "skills" 

449 if skills_dir.is_dir(): 

450 for skill_dir in skills_dir.iterdir(): 

451 if skill_dir.is_dir() and (skill_dir / "SKILL.md").exists(): 

452 current_skills.append(skill_dir.name) 

453 

454 # Identify gaps from manual patterns 

455 gaps: list[ConfigGap] = [] 

456 covered_patterns = 0 

457 recognized_patterns = 0 

458 

459 for pattern in manual_pattern_analysis.patterns: 

460 config_mapping = _PATTERN_TO_CONFIG.get(pattern.pattern_type) 

461 if not config_mapping: 

462 continue 

463 

464 recognized_patterns += 1 

465 hook_event = config_mapping["hook_event"] 

466 

467 # Check if hook event is already configured 

468 if hook_event in current_hooks: 

469 covered_patterns += 1 

470 continue 

471 

472 # Determine priority based on occurrence count 

473 if pattern.occurrence_count >= 10: 

474 priority = "high" 

475 elif pattern.occurrence_count >= 5: 

476 priority = "medium" 

477 else: 

478 priority = "low" 

479 

480 gap = ConfigGap( 

481 gap_type="hook", 

482 description=config_mapping["description"], 

483 evidence=pattern.affected_issues, 

484 suggested_config=config_mapping["suggested_config"], 

485 priority=priority, 

486 pattern_type=pattern.pattern_type, 

487 ) 

488 gaps.append(gap) 

489 

490 # Calculate coverage score based on recognized patterns only 

491 coverage_score = covered_patterns / recognized_patterns if recognized_patterns > 0 else 1.0 

492 

493 # Sort gaps by priority (high first) 

494 priority_order = {"high": 0, "medium": 1, "low": 2} 

495 gaps.sort(key=lambda g: priority_order.get(g.priority, 3)) 

496 

497 return ConfigGapsAnalysis( 

498 gaps=gaps, 

499 current_hooks=current_hooks, 

500 current_skills=current_skills, 

501 current_agents=current_agents, 

502 coverage_score=coverage_score, 

503 )