Coverage for little_loops / issue_history / quality.py: 94%
180 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""Issue history quality analysis: test gaps, rejections, manual patterns, config gaps."""
3from __future__ import annotations
5import json
6import re
7from pathlib import Path
8from typing import Any
10from little_loops.issue_history._utils import get_issue_content
11from little_loops.issue_history.models import (
12 CompletedIssue,
13 ConfigGap,
14 ConfigGapsAnalysis,
15 HotspotAnalysis,
16 ManualPattern,
17 ManualPatternAnalysis,
18 RejectionAnalysis,
19 RejectionMetrics,
20 TestGap,
21 TestGapAnalysis,
22)
23from little_loops.issue_history.parsing import _find_test_file, _parse_resolution_action
26def analyze_test_gaps(
27 issues: list[CompletedIssue],
28 hotspots: HotspotAnalysis,
29 project_root: Path | None = None,
30) -> TestGapAnalysis:
31 """Correlate bug occurrences with test coverage gaps.
33 Args:
34 issues: List of completed issues (unused, for API consistency)
35 hotspots: Pre-computed hotspot analysis
36 project_root: Project root for anchoring test file existence checks. Defaults to CWD.
38 Returns:
39 TestGapAnalysis with test coverage gap information
40 """
41 # Build map of source files to bug info from hotspots
42 bug_files: dict[str, dict[str, Any]] = {}
44 for hotspot in hotspots.file_hotspots:
45 bug_count = hotspot.issue_types.get("BUG", 0)
46 if bug_count > 0:
47 # Filter to only BUG issue IDs
48 bug_ids = [iid for iid in hotspot.issue_ids if iid.startswith("BUG-")]
49 bug_files[hotspot.path] = {
50 "bug_count": bug_count,
51 "bug_ids": bug_ids,
52 }
54 if not bug_files:
55 return TestGapAnalysis()
57 # Analyze test coverage for each file with bugs
58 gaps: list[TestGap] = []
59 files_with_tests: list[int] = [] # bug counts
60 files_without_tests: list[int] = [] # bug counts
62 for source_file, data in bug_files.items():
63 bug_count = data["bug_count"]
64 bug_ids = data["bug_ids"]
66 test_file = _find_test_file(source_file, project_root=project_root)
67 has_test = test_file is not None
69 # Calculate gap score: higher = more urgent to add tests
70 # Files without tests get amplified scores
71 if has_test:
72 gap_score = bug_count * 1.0
73 files_with_tests.append(bug_count)
74 else:
75 gap_score = bug_count * 10.0 # Amplify untested files
76 files_without_tests.append(bug_count)
78 # Determine priority based on bug count and test presence
79 if not has_test and bug_count >= 5:
80 priority = "critical"
81 elif not has_test and bug_count >= 3:
82 priority = "high"
83 elif not has_test or bug_count >= 4:
84 priority = "medium"
85 else:
86 priority = "low"
88 gaps.append(
89 TestGap(
90 source_file=source_file,
91 bug_count=bug_count,
92 bug_ids=bug_ids,
93 has_test_file=has_test,
94 test_file_path=test_file,
95 gap_score=gap_score,
96 priority=priority,
97 )
98 )
100 # Sort by gap score descending (highest priority first)
101 gaps.sort(key=lambda g: (-g.gap_score, -g.bug_count))
103 # Calculate averages for correlation
104 avg_with_tests = sum(files_with_tests) / len(files_with_tests) if files_with_tests else 0.0
105 avg_without_tests = (
106 sum(files_without_tests) / len(files_without_tests) if files_without_tests else 0.0
107 )
109 # Identify untested bug magnets (from hotspot analysis)
110 untested_magnets = [
111 h.path
112 for h in hotspots.bug_magnets
113 if _find_test_file(h.path, project_root=project_root) is None
114 ]
116 # Priority test targets: untested files sorted by bug count
117 priority_targets = [g.source_file for g in gaps if not g.has_test_file]
119 return TestGapAnalysis(
120 gaps=gaps[:15], # Top 15
121 untested_bug_magnets=untested_magnets,
122 files_with_tests_avg_bugs=avg_with_tests,
123 files_without_tests_avg_bugs=avg_without_tests,
124 priority_test_targets=priority_targets[:10],
125 )
128def _update_rejection_metrics(metrics: RejectionMetrics, category: str) -> None:
129 if category == "completed":
130 metrics.completed_count += 1
131 elif category == "rejected":
132 metrics.rejected_count += 1
133 elif category == "invalid":
134 metrics.invalid_count += 1
135 elif category == "duplicate":
136 metrics.duplicate_count += 1
137 elif category == "deferred":
138 metrics.deferred_count += 1
141def analyze_rejection_rates(
142 issues: list[CompletedIssue],
143 contents: dict[Path, str] | None = None,
144) -> RejectionAnalysis:
145 """Analyze rejection and invalid closure patterns.
147 Args:
148 issues: List of completed issues
149 contents: Pre-loaded issue file contents (path -> content)
151 Returns:
152 RejectionAnalysis with overall and grouped metrics
153 """
154 if not issues:
155 return RejectionAnalysis()
157 # Count by category
158 overall = RejectionMetrics()
159 by_type: dict[str, RejectionMetrics] = {}
160 by_month: dict[str, RejectionMetrics] = {}
161 reason_counts: dict[str, int] = {}
163 for issue in issues:
164 content = get_issue_content(issue, contents)
165 if content is None:
166 continue
168 category = _parse_resolution_action(content)
169 overall.total_closed += 1
171 # Update overall counts
172 _update_rejection_metrics(overall, category)
174 # By type
175 if issue.issue_type not in by_type:
176 by_type[issue.issue_type] = RejectionMetrics()
177 type_metrics = by_type[issue.issue_type]
178 type_metrics.total_closed += 1
179 _update_rejection_metrics(type_metrics, category)
181 # By month
182 if issue.completed_date:
183 month_key = issue.completed_date.strftime("%Y-%m")
184 if month_key not in by_month:
185 by_month[month_key] = RejectionMetrics()
186 month_metrics = by_month[month_key]
187 month_metrics.total_closed += 1
188 _update_rejection_metrics(month_metrics, category)
190 # Extract reason for rejection/invalid
191 if category in ("rejected", "invalid", "duplicate", "deferred"):
192 reason_match = re.search(r"\*\*Reason\*\*:\s*(.+?)(?:\n|$)", content)
193 if reason_match:
194 reason = reason_match.group(1).strip()
195 reason_counts[reason] = reason_counts.get(reason, 0) + 1
197 # Calculate trend from monthly data
198 sorted_months = sorted(by_month.keys())
199 if len(sorted_months) >= 3:
200 recent = sorted_months[-3:]
201 rates = [by_month[m].rejection_rate + by_month[m].invalid_rate for m in recent]
202 if rates[-1] < rates[0] * 0.8:
203 trend = "improving"
204 elif rates[-1] > rates[0] * 1.2:
205 trend = "degrading"
206 else:
207 trend = "stable"
208 else:
209 trend = "stable"
211 # Sort reasons by count
212 common_reasons = sorted(reason_counts.items(), key=lambda x: -x[1])[:10]
214 return RejectionAnalysis(
215 overall=overall,
216 by_type=by_type,
217 by_month=by_month,
218 common_reasons=common_reasons,
219 trend=trend,
220 )
223# Pattern definitions for manual activity detection
224_MANUAL_PATTERNS: dict[str, dict[str, Any]] = {
225 "test": {
226 "patterns": [
227 r"(?:pytest|python -m pytest|npm test|yarn test|jest|cargo test|go test)",
228 r"(?:python -m unittest|nosetests|tox)",
229 ],
230 "description": "Test execution after code changes",
231 "suggestion": "Add post-edit hook for automatic test runs",
232 "complexity": "trivial",
233 },
234 "lint": {
235 "patterns": [
236 r"(?:ruff check|ruff format|black|isort|flake8|pylint)",
237 r"(?:eslint|prettier|tslint)",
238 ],
239 "description": "Lint/format fixes after implementation",
240 "suggestion": "Add pre-commit hook for auto-formatting",
241 "complexity": "simple",
242 },
243 "type_check": {
244 "patterns": [
245 r"(?:mypy|pyright|python -m mypy)",
246 r"(?:tsc|npx tsc)",
247 ],
248 "description": "Type checking during development",
249 "suggestion": "Add mypy to pre-commit or post-edit hook",
250 "complexity": "simple",
251 },
252 "build": {
253 "patterns": [
254 r"(?:npm run build|yarn build|make|cargo build|go build)",
255 r"(?:python -m build|pip install -e)",
256 ],
257 "description": "Build steps during implementation",
258 "suggestion": "Add build verification to test suite or CI",
259 "complexity": "moderate",
260 },
261 "git": {
262 "patterns": [
263 r"git (?:add|commit|push|pull|checkout|branch)",
264 ],
265 "description": "Git operations during issue resolution",
266 "suggestion": "Use /ll:commit skill for standardized commits",
267 "complexity": "trivial",
268 },
269}
272def detect_manual_patterns(
273 issues: list[CompletedIssue],
274 contents: dict[Path, str] | None = None,
275) -> ManualPatternAnalysis:
276 """Detect recurring manual activities that could be automated.
278 Args:
279 issues: List of completed issues
280 contents: Pre-loaded issue file contents (path -> content)
282 Returns:
283 ManualPatternAnalysis with detected patterns
284 """
285 if not issues:
286 return ManualPatternAnalysis()
288 # Track pattern occurrences
289 pattern_data: dict[str, dict[str, Any]] = {}
291 for pattern_type, config in _MANUAL_PATTERNS.items():
292 pattern_data[pattern_type] = {
293 "count": 0,
294 "issues": [],
295 "commands": [],
296 "config": config,
297 }
299 # Scan issue content for patterns
300 for issue in issues:
301 content = get_issue_content(issue, contents)
302 if content is None:
303 continue
305 for pattern_type, config in _MANUAL_PATTERNS.items():
306 for pattern in config["patterns"]:
307 matches = re.findall(pattern, content, re.IGNORECASE)
308 if matches:
309 data = pattern_data[pattern_type]
310 data["count"] += len(matches)
311 if issue.issue_id not in data["issues"]:
312 data["issues"].append(issue.issue_id)
313 # Store unique command examples
314 for match in matches:
315 if match not in data["commands"]:
316 data["commands"].append(match)
318 # Build ManualPattern objects
319 patterns: list[ManualPattern] = []
320 total_interventions = 0
321 automatable = 0
323 for pattern_type, data in pattern_data.items():
324 if data["count"] > 0:
325 config = data["config"]
326 pattern = ManualPattern(
327 pattern_type=pattern_type,
328 pattern_description=config["description"],
329 occurrence_count=data["count"],
330 affected_issues=data["issues"],
331 example_commands=data["commands"][:5],
332 suggested_automation=config["suggestion"],
333 automation_complexity=config["complexity"],
334 )
335 patterns.append(pattern)
336 total_interventions += data["count"]
337 automatable += data["count"]
339 # Sort by occurrence count descending
340 patterns.sort(key=lambda p: -p.occurrence_count)
342 # Build automation suggestions
343 suggestions = [p.suggested_automation for p in patterns if p.occurrence_count >= 2]
345 return ManualPatternAnalysis(
346 patterns=patterns,
347 total_manual_interventions=total_interventions,
348 automatable_count=automatable,
349 automation_suggestions=suggestions[:10],
350 )
353# Mapping from manual pattern types to configuration solutions
354_PATTERN_TO_CONFIG: dict[str, dict[str, Any]] = {
355 "test": {
356 "hook_event": "PostToolUse",
357 "description": "Automatic test execution after code changes",
358 "suggested_config": """hooks/hooks.json:
359 "PostToolUse": [{
360 "matcher": "Edit|Write",
361 "hooks": [{
362 "type": "command",
363 "command": "pytest tests/ -x -q",
364 "timeout": 30000
365 }]
366 }]""",
367 },
368 "lint": {
369 "hook_event": "PreToolUse",
370 "description": "Automatic formatting before file writes",
371 "suggested_config": """hooks/hooks.json:
372 "PreToolUse": [{
373 "matcher": "Write|Edit",
374 "hooks": [{
375 "type": "command",
376 "command": "ruff format --check .",
377 "timeout": 10000
378 }]
379 }]""",
380 },
381 "type_check": {
382 "hook_event": "PostToolUse",
383 "description": "Type checking after code modifications",
384 "suggested_config": """hooks/hooks.json:
385 "PostToolUse": [{
386 "matcher": "Edit|Write",
387 "hooks": [{
388 "type": "command",
389 "command": "mypy --fast .",
390 "timeout": 30000
391 }]
392 }]""",
393 },
394 "build": {
395 "hook_event": "PostToolUse",
396 "description": "Build verification after changes",
397 "suggested_config": """hooks/hooks.json:
398 "PostToolUse": [{
399 "matcher": "Edit|Write",
400 "hooks": [{
401 "type": "command",
402 "command": "npm run build",
403 "timeout": 60000
404 }]
405 }]""",
406 },
407}
410def detect_config_gaps(
411 manual_pattern_analysis: ManualPatternAnalysis,
412 project_root: Path | None = None,
413) -> ConfigGapsAnalysis:
414 """Detect configuration gaps based on manual pattern analysis.
416 Args:
417 manual_pattern_analysis: Results from detect_manual_patterns()
418 project_root: Project root directory (defaults to cwd)
420 Returns:
421 ConfigGapsAnalysis with identified gaps and coverage metrics
422 """
423 if project_root is None:
424 project_root = Path.cwd()
426 # Discover current configuration
427 current_hooks: list[str] = []
428 current_skills: list[str] = []
429 current_agents: list[str] = []
431 # Load hooks configuration
432 hooks_file = project_root / "hooks" / "hooks.json"
433 if hooks_file.exists():
434 try:
435 with open(hooks_file, encoding="utf-8") as f:
436 hooks_data = json.load(f)
437 current_hooks = list(hooks_data.get("hooks", {}).keys())
438 except Exception:
439 pass
441 # Scan for agents
442 agents_dir = project_root / "agents"
443 if agents_dir.is_dir():
444 for agent_file in agents_dir.glob("*.md"):
445 current_agents.append(agent_file.stem)
447 # Scan for skills
448 skills_dir = project_root / "skills"
449 if skills_dir.is_dir():
450 for skill_dir in skills_dir.iterdir():
451 if skill_dir.is_dir() and (skill_dir / "SKILL.md").exists():
452 current_skills.append(skill_dir.name)
454 # Identify gaps from manual patterns
455 gaps: list[ConfigGap] = []
456 covered_patterns = 0
457 recognized_patterns = 0
459 for pattern in manual_pattern_analysis.patterns:
460 config_mapping = _PATTERN_TO_CONFIG.get(pattern.pattern_type)
461 if not config_mapping:
462 continue
464 recognized_patterns += 1
465 hook_event = config_mapping["hook_event"]
467 # Check if hook event is already configured
468 if hook_event in current_hooks:
469 covered_patterns += 1
470 continue
472 # Determine priority based on occurrence count
473 if pattern.occurrence_count >= 10:
474 priority = "high"
475 elif pattern.occurrence_count >= 5:
476 priority = "medium"
477 else:
478 priority = "low"
480 gap = ConfigGap(
481 gap_type="hook",
482 description=config_mapping["description"],
483 evidence=pattern.affected_issues,
484 suggested_config=config_mapping["suggested_config"],
485 priority=priority,
486 pattern_type=pattern.pattern_type,
487 )
488 gaps.append(gap)
490 # Calculate coverage score based on recognized patterns only
491 coverage_score = covered_patterns / recognized_patterns if recognized_patterns > 0 else 1.0
493 # Sort gaps by priority (high first)
494 priority_order = {"high": 0, "medium": 1, "low": 2}
495 gaps.sort(key=lambda g: priority_order.get(g.priority, 3))
497 return ConfigGapsAnalysis(
498 gaps=gaps,
499 current_hooks=current_hooks,
500 current_skills=current_skills,
501 current_agents=current_agents,
502 coverage_score=coverage_score,
503 )