Coverage for session_buddy / utils / quality_utils_v2.py: 85.14%
378 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1#!/usr/bin/env python3
2"""Quality Scoring Algorithm V2 - Measures actual code quality.
4This module implements a comprehensive quality scoring system that focuses on
5real code quality metrics instead of superficial indicators.
7Key improvements over V1:
8- Integrates Crackerjack code quality metrics (coverage, lint, complexity)
9- Smart project health indicators (doesn't penalize modern tooling)
10- Separates permissions/trust from code quality
11- Provides actionable, honest quality assessment
12"""
14from __future__ import annotations
16import re
17import subprocess # nosec B404
18from contextlib import suppress
19from dataclasses import dataclass
20from datetime import datetime, timedelta
21from typing import TYPE_CHECKING, Any
23if TYPE_CHECKING:
24 from pathlib import Path
26 from session_buddy.crackerjack_integration import (
27 get_quality_metrics_history,
28 )
30# Crackerjack integration for quality metrics
31try:
32 from session_buddy.crackerjack_integration import (
33 get_quality_metrics_history,
34 )
36 CRACKERJACK_AVAILABLE = True
37except ImportError:
38 CRACKERJACK_AVAILABLE = False
41@dataclass
42class CodeQualityScore:
43 """Code quality component (40 points max)."""
45 test_coverage: float # 0-15 points
46 lint_score: float # 0-10 points
47 type_coverage: float # 0-10 points
48 complexity_score: float # 0-5 points
49 total: float # Sum of above
50 details: dict[str, Any] # Detailed breakdown
53@dataclass
54class ProjectHealthScore:
55 """Project health component (30 points max)."""
57 tooling_score: float # 0-15 points
58 maturity_score: float # 0-15 points
59 total: float # Sum of above
60 details: dict[str, Any] # Detailed breakdown
63@dataclass
64class DevVelocityScore:
65 """Development velocity component (20 points max)."""
67 git_activity: float # 0-10 points
68 dev_patterns: float # 0-10 points
69 total: float # Sum of above
70 details: dict[str, Any] # Detailed breakdown
73@dataclass
74class SecurityScore:
75 """Security component (10 points max)."""
77 security_tools: float # 0-5 points
78 security_hygiene: float # 0-5 points
79 total: float # Sum of above
80 details: dict[str, Any] # Detailed breakdown
83@dataclass
84class TrustScore:
85 """Separate trust score (not part of quality)."""
87 trusted_operations: float # 0-40 points
88 session_availability: float # 0-30 points
89 tool_ecosystem: float # 0-30 points
90 total: float # 0-100 points
91 details: dict[str, Any] # Detailed breakdown
94@dataclass
95class QualityScoreV2:
96 """Complete quality score V2 result."""
98 total_score: float # 0-100
99 version: str # "2.0"
100 code_quality: CodeQualityScore
101 project_health: ProjectHealthScore
102 dev_velocity: DevVelocityScore
103 security: SecurityScore
104 trust_score: TrustScore
105 recommendations: list[str]
106 timestamp: str
109# Crackerjack metrics cache (5 minute TTL)
110_metrics_cache: dict[str, tuple[dict[str, Any], datetime]] = {}
111_CACHE_TTL_MINUTES = 5
114async def calculate_quality_score_v2(
115 project_dir: Path,
116 permissions_count: int = 0,
117 session_available: bool = True,
118 tool_count: int = 0,
119) -> QualityScoreV2:
120 """Calculate comprehensive quality score V2.
122 Args:
123 project_dir: Project directory to analyze
124 permissions_count: Number of trusted operations (for trust score)
125 session_available: Whether session management is available (for trust score)
126 tool_count: Number of available MCP tools (for trust score)
128 Returns:
129 Complete quality score breakdown
131 """
132 # Calculate each component
133 code_quality = await _calculate_code_quality(project_dir)
134 project_health = await _calculate_project_health(project_dir)
135 dev_velocity = await _calculate_dev_velocity(project_dir)
136 security = await _calculate_security(project_dir)
137 trust_score = _calculate_trust_score(
138 permissions_count,
139 session_available,
140 tool_count,
141 )
143 # Calculate total
144 total = (
145 code_quality.total + project_health.total + dev_velocity.total + security.total
146 )
148 # Generate recommendations
149 recommendations = _generate_recommendations_v2(
150 code_quality,
151 project_health,
152 dev_velocity,
153 security,
154 total,
155 )
157 return QualityScoreV2(
158 total_score=round(
159 total,
160 ), # Convert to int for backward compatibility with tests
161 version="2.0",
162 code_quality=code_quality,
163 project_health=project_health,
164 dev_velocity=dev_velocity,
165 security=security,
166 trust_score=trust_score,
167 recommendations=recommendations,
168 timestamp=datetime.now().isoformat(),
169 )
172async def _calculate_code_quality(project_dir: Path) -> CodeQualityScore:
173 """Calculate code quality score (40 points max).
175 Components:
176 - test_coverage: 15 points (from Crackerjack)
177 - lint_score: 10 points (from Crackerjack)
178 - type_coverage: 10 points (from pyright/mypy)
179 - complexity_score: 5 points (inverse of complexity)
180 """
181 metrics = await _get_crackerjack_metrics(project_dir)
183 # Test coverage (0-15 points)
184 coverage_pct = metrics.get("code_coverage", 0)
185 test_coverage = (coverage_pct / 100) * 15
187 # Lint score (0-10 points)
188 # Crackerjack lint_score is already 0-100, normalized
189 lint_raw = metrics.get("lint_score", 100) # Default to perfect if not available
190 lint_score = (lint_raw / 100) * 10
192 # Type coverage (0-10 points)
193 # Try to extract from pyright/mypy via Crackerjack
194 type_pct = await _get_type_coverage(project_dir, metrics)
195 type_coverage = (type_pct / 100) * 10
197 # Complexity score (0-5 points, inverse)
198 complexity_raw = metrics.get("complexity_score", 100)
199 # complexity_score is 0-100 where 100 is best (low complexity)
200 complexity_score = (complexity_raw / 100) * 5
202 total = test_coverage + lint_score + type_coverage + complexity_score
204 return CodeQualityScore(
205 test_coverage=round(test_coverage, 2),
206 lint_score=round(lint_score, 2),
207 type_coverage=round(type_coverage, 2),
208 complexity_score=round(complexity_score, 2),
209 total=round(total, 2),
210 details={
211 "coverage_pct": coverage_pct,
212 "lint_raw": lint_raw,
213 "type_pct": type_pct,
214 "complexity_raw": complexity_raw,
215 "metrics_source": "crackerjack" if metrics else "fallback",
216 },
217 )
220async def _calculate_project_health(project_dir: Path) -> ProjectHealthScore:
221 """Calculate project health score (30 points max).
223 Components:
224 - tooling_score: 15 points (modern tooling)
225 - maturity_score: 15 points (project maturity)
226 """
227 tooling = _calculate_tooling_score(project_dir)
228 maturity = _calculate_maturity_score(project_dir)
230 return ProjectHealthScore(
231 tooling_score=round(tooling["score"], 2),
232 maturity_score=round(maturity["score"], 2),
233 total=round(tooling["score"] + maturity["score"], 2),
234 details={**tooling["details"], **maturity["details"]},
235 )
238def _score_package_management(project_dir: Path) -> tuple[float, dict[str, str]]:
239 """Score package management setup (0-5 points)."""
240 has_pyproject = (project_dir / "pyproject.toml").exists()
241 has_lockfile = (project_dir / "uv.lock").exists() or (
242 project_dir / "requirements.txt"
243 ).exists()
245 if has_pyproject and has_lockfile:
246 return 5, {"package_mgmt": "modern (pyproject.toml + lockfile)"}
247 if has_pyproject:
248 return 3, {"package_mgmt": "partial (pyproject.toml, no lockfile)"}
249 if has_lockfile: 249 ↛ 250line 249 didn't jump to line 250 because the condition on line 249 was never true
250 return 2, {"package_mgmt": "basic (lockfile only)"}
251 return 0, {}
254def _score_version_control(project_dir: Path) -> tuple[float, dict[str, str]]:
255 """Score version control setup (0-5 points)."""
256 git_dir = project_dir / ".git"
257 if not git_dir.exists():
258 return 0, {"version_control": "none"}
260 with suppress(
261 subprocess.SubprocessError,
262 subprocess.TimeoutExpired,
263 OSError,
264 FileNotFoundError,
265 ):
266 result = subprocess.run(
267 ["git", "log", "--oneline", "-n", "10"],
268 check=False,
269 cwd=project_dir,
270 capture_output=True,
271 text=True,
272 timeout=2,
273 )
274 if result.returncode == 0 and len(result.stdout.strip().split("\n")) >= 5:
275 return 5, {"version_control": "active git repository"}
276 return 3, {"version_control": "git repo (limited history)"}
278 return 2, {"version_control": "git repo (couldn't verify history)"}
281def _score_dependency_management(project_dir: Path) -> tuple[float, dict[str, str]]:
282 """Score dependency management (0-5 points)."""
283 lockfile = project_dir / "uv.lock"
284 if not lockfile.exists():
285 lockfile = project_dir / "requirements.txt"
287 if not lockfile.exists():
288 return 0, {"dependency_mgmt": "none"}
290 with suppress(OSError, PermissionError, FileNotFoundError, ValueError):
291 lockfile_age_days = (
292 datetime.now() - datetime.fromtimestamp(lockfile.stat().st_mtime)
293 ).days
295 if lockfile_age_days < 30: 295 ↛ 297line 295 didn't jump to line 297 because the condition on line 295 was always true
296 return 5, {"dependency_mgmt": "recently updated"}
297 if lockfile_age_days < 90:
298 return 3, {"dependency_mgmt": "moderately current"}
299 return 1, {"dependency_mgmt": f"outdated ({lockfile_age_days} days)"}
301 return 2, {"dependency_mgmt": "present (age unknown)"}
304def _calculate_tooling_score(project_dir: Path) -> dict[str, Any]:
305 """Calculate modern tooling score (0-15 points).
307 Components:
308 - package_management: 5 pts (pyproject.toml + lockfile)
309 - version_control: 5 pts (.git + active history)
310 - dependency_mgmt: 5 pts (lockfile + recent updates)
311 """
312 pkg_score, pkg_details = _score_package_management(project_dir)
313 vc_score, vc_details = _score_version_control(project_dir)
314 dep_score, dep_details = _score_dependency_management(project_dir)
316 total_score = pkg_score + vc_score + dep_score
317 details = pkg_details | vc_details | dep_details
319 return {"score": total_score, "details": details}
322def _calculate_maturity_score(project_dir: Path) -> dict[str, Any]:
323 """Calculate project maturity score (0-15 points)."""
324 score = 0
325 details = {}
327 testing_score, testing_details = _evaluate_testing_infra(project_dir)
328 documentation_score, documentation_details = _evaluate_documentation(project_dir)
329 ci_score, ci_details = _evaluate_ci_cd(project_dir)
331 score += testing_score + documentation_score + ci_score
332 details.update(testing_details)
333 details.update(documentation_details)
334 details.update(ci_details)
336 return {"score": score, "details": details}
339def _evaluate_testing_infra(project_dir: Path) -> tuple[int, dict[str, str]]:
340 """Return score/details describing testing infrastructure maturity."""
341 test_dirs = list(project_dir.glob("test*"))
342 if not test_dirs:
343 return 0, {"testing": "none"}
345 test_dir = test_dirs[0]
346 has_conftest = (test_dir / "conftest.py").exists()
347 test_files = list(test_dir.rglob("test_*.py"))
349 if has_conftest and len(test_files) >= 10:
350 return 5, {"testing": f"comprehensive ({len(test_files)} test files)"}
351 if len(test_files) >= 5:
352 return 3, {"testing": f"moderate ({len(test_files)} test files)"}
353 if test_files:
354 return 1, {"testing": f"basic ({len(test_files)} test files)"}
355 return 0, {"testing": "none"}
358def _evaluate_documentation(project_dir: Path) -> tuple[int, dict[str, str]]:
359 """Return documentation maturity score and details."""
360 has_readme = (project_dir / "README.md").exists()
361 docs_dir = project_dir / "docs"
363 if has_readme and docs_dir.exists():
364 doc_files = list(docs_dir.rglob("*.md"))
365 if len(doc_files) >= 5:
366 return 5, {"documentation": f"comprehensive ({len(doc_files)} docs)"}
367 return 3, {"documentation": f"basic ({len(doc_files)} docs)"}
368 if has_readme:
369 return 2, {"documentation": "README only"}
370 return 0, {"documentation": "none"}
373def _evaluate_ci_cd(project_dir: Path) -> tuple[int, dict[str, str]]:
374 """Return CI/CD maturity score and details."""
375 github_workflows = project_dir / ".github" / "workflows"
376 gitlab_ci = project_dir / ".gitlab-ci.yml"
378 if github_workflows.exists():
379 workflow_files = list(github_workflows.glob("*.yml")) + list(
380 github_workflows.glob("*.yaml"),
381 )
382 if len(workflow_files) >= 2:
383 return 5, {"ci_cd": f"github actions ({len(workflow_files)} workflows)"}
384 if workflow_files: 384 ↛ 388line 384 didn't jump to line 388 because the condition on line 384 was always true
385 return 3, {"ci_cd": "github actions (1 workflow)"}
386 elif gitlab_ci.exists(): 386 ↛ 387line 386 didn't jump to line 387 because the condition on line 386 was never true
387 return 4, {"ci_cd": "gitlab ci"}
388 return 0, {"ci_cd": "none"}
391async def _calculate_dev_velocity(project_dir: Path) -> DevVelocityScore:
392 """Calculate development velocity score (20 points max).
394 Components:
395 - git_activity: 10 points (commit frequency, quality)
396 - dev_patterns: 10 points (issue tracking, branch strategy)
397 """
398 git_activity = _analyze_git_activity(project_dir)
399 dev_patterns = _analyze_dev_patterns(project_dir)
401 return DevVelocityScore(
402 git_activity=round(git_activity["score"], 2),
403 dev_patterns=round(dev_patterns["score"], 2),
404 total=round(git_activity["score"] + dev_patterns["score"], 2),
405 details={**git_activity["details"], **dev_patterns["details"]},
406 )
409def _analyze_git_activity(project_dir: Path) -> dict[str, Any]:
410 """Analyze git activity (0-10 points)."""
411 git_dir = project_dir / ".git"
412 if not git_dir.exists():
413 return {"score": 0, "details": {"activity": "no git repository"}}
415 try:
416 commits = _collect_recent_commits(project_dir)
417 except Exception as exc:
418 return {"score": 0, "details": {"error": f"git analysis failed: {exc}"}}
420 frequency_score, frequency_details = _score_commit_frequency(commits)
421 quality_score, quality_details = _score_commit_quality(commits)
423 # Balance both metrics evenly (0-5 each)
424 total_score = frequency_score + quality_score
425 details = frequency_details | quality_details
426 return {"score": total_score, "details": details}
429def _collect_recent_commits(project_dir: Path) -> list[str]:
430 """Return commit messages for the last 30 days."""
431 since_date = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
432 result = subprocess.run(
433 ["git", "log", f"--since={since_date}", "--pretty=format:%s", "--no-merges"],
434 check=False,
435 cwd=project_dir,
436 capture_output=True,
437 text=True,
438 timeout=5,
439 )
441 if result.returncode != 0 or not result.stdout.strip():
442 return []
443 return result.stdout.strip().split("\n")
446def _score_commit_frequency(commits: list[str]) -> tuple[int, dict[str, str]]:
447 """Score commit frequency (0-5) with descriptive details."""
448 commit_count = len(commits)
449 if commit_count >= 20:
450 return 5, {"frequency": f"active ({commit_count} commits/month)"}
451 if commit_count >= 10: 451 ↛ 452line 451 didn't jump to line 452 because the condition on line 451 was never true
452 return 4, {"frequency": f"regular ({commit_count} commits/month)"}
453 if commit_count >= 5: 453 ↛ 454line 453 didn't jump to line 454 because the condition on line 453 was never true
454 return 2, {"frequency": f"occasional ({commit_count} commits/month)"}
455 if commit_count > 0:
456 return 1, {"frequency": f"sparse ({commit_count} commits/month)"}
457 return 0, {"frequency": "no recent commits"}
460def _score_commit_quality(commits: list[str]) -> tuple[int, dict[str, str]]:
461 """Score conventional commit adherence (0-5)."""
462 if not commits:
463 return 0, {"quality": "no data"}
465 conventional = sum(
466 1
467 for msg in commits
468 if re.match(r"^(feat|fix|docs|style|refactor|test|chore)(\(.*\))?:", msg)
469 )
470 commit_count = len(commits)
472 if conventional >= commit_count * 0.8: 472 ↛ 473line 472 didn't jump to line 473 because the condition on line 472 was never true
473 return 5, {"quality": f"excellent ({conventional}/{commit_count} conventional)"}
474 if conventional >= commit_count * 0.5: 474 ↛ 475line 474 didn't jump to line 475 because the condition on line 474 was never true
475 return 3, {"quality": f"good ({conventional}/{commit_count} conventional)"}
476 if commit_count > 0: 476 ↛ 478line 476 didn't jump to line 478 because the condition on line 476 was always true
477 return 1, {"quality": f"basic ({conventional}/{commit_count} conventional)"}
478 return 0, {"quality": "no data"}
481def _analyze_dev_patterns(project_dir: Path) -> dict[str, Any]:
482 """Analyze development patterns (0-10 points)."""
483 git_dir = project_dir / ".git"
484 if not git_dir.exists():
485 return {"score": 0, "details": {"patterns": "no git repository"}}
487 issue_score, issue_details = _score_issue_tracking(project_dir)
488 branch_score, branch_details = _score_branch_strategy(project_dir)
490 details = issue_details | branch_details
491 return {"score": issue_score + branch_score, "details": details}
494def _score_issue_tracking(project_dir: Path) -> tuple[int, dict[str, str]]:
495 """Analyze recent commits for issue references."""
496 try:
497 result = subprocess.run(
498 ["git", "log", "--oneline", "-n", "50", "--no-merges"],
499 check=False,
500 cwd=project_dir,
501 capture_output=True,
502 text=True,
503 timeout=5,
504 )
505 except Exception as exc:
506 return 0, {"issue_tracking": f"analysis failed: {exc}"}
508 if result.returncode != 0 or not result.stdout.strip():
509 return 0, {"issue_tracking": "no data"}
511 commits = result.stdout.strip().split("\n")
512 issue_refs = sum(1 for msg in commits if re.search(r"#\d+", msg))
514 if issue_refs >= len(commits) * 0.5: 514 ↛ 515line 514 didn't jump to line 515 because the condition on line 514 was never true
515 return 5, {"issue_tracking": f"excellent ({issue_refs}/{len(commits)} refs)"}
516 if issue_refs >= len(commits) * 0.25: 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true
517 return 3, {"issue_tracking": f"good ({issue_refs}/{len(commits)} refs)"}
518 if issue_refs > 0: 518 ↛ 519line 518 didn't jump to line 519 because the condition on line 518 was never true
519 return 1, {"issue_tracking": f"basic ({issue_refs}/{len(commits)} refs)"}
520 return 0, {"issue_tracking": "none"}
523def _score_branch_strategy(project_dir: Path) -> tuple[int, dict[str, str]]:
524 """Evaluate branch naming strategy for feature work."""
525 try:
526 result = subprocess.run(
527 ["git", "branch", "-a"],
528 check=False,
529 cwd=project_dir,
530 capture_output=True,
531 text=True,
532 timeout=5,
533 )
534 except Exception as exc:
535 return 0, {"branch_strategy": f"analysis failed: {exc}"}
537 if result.returncode != 0 or not result.stdout.strip():
538 return 0, {"branch_strategy": "no data"}
540 branches = result.stdout.strip().split("\n")
541 feature_branches = [b for b in branches if "feature/" in b or "feat/" in b]
543 if len(feature_branches) >= 3: 543 ↛ 544line 543 didn't jump to line 544 because the condition on line 543 was never true
544 return 5, {
545 "branch_strategy": f"feature branches ({len(feature_branches)} active)"
546 }
547 if feature_branches: 547 ↛ 548line 547 didn't jump to line 548 because the condition on line 547 was never true
548 return 3, {
549 "branch_strategy": f"some feature branches ({len(feature_branches)})"
550 }
551 return 1, {"branch_strategy": "main-only development"}
554async def _calculate_security(project_dir: Path) -> SecurityScore:
555 """Calculate security score (10 points max).
557 Components:
558 - security_tools: 5 points (bandit, safety checks)
559 - security_hygiene: 5 points (no secrets, secure patterns)
560 """
561 tools_score = await _run_security_checks(project_dir)
562 hygiene_score = _check_security_hygiene(project_dir)
564 return SecurityScore(
565 security_tools=round(tools_score["score"], 2),
566 security_hygiene=round(hygiene_score["score"], 2),
567 total=round(tools_score["score"] + hygiene_score["score"], 2),
568 details={**tools_score["details"], **hygiene_score["details"]},
569 )
572async def _run_security_checks(project_dir: Path) -> dict[str, Any]:
573 """Run security tools via Crackerjack (0-5 points)."""
574 metrics = await _get_crackerjack_metrics(project_dir)
576 security_score_raw = metrics.get("security_score", 100) # Default to safe
577 # Security score from Crackerjack is 0-100, 100 is best
579 score = (security_score_raw / 100) * 5
581 return {
582 "score": score,
583 "details": {
584 "security_raw": security_score_raw,
585 "source": "crackerjack" if metrics else "fallback",
586 },
587 }
590def _check_security_hygiene(project_dir: Path) -> dict[str, Any]:
591 """Check security hygiene (0-5 points)."""
592 score = 5 # Start with perfect, deduct for issues
593 details = {}
595 # Check for .env in .gitignore (critical)
596 gitignore = project_dir / ".gitignore"
597 if gitignore.exists():
598 content = gitignore.read_text()
599 if ".env" in content: 599 ↛ 602line 599 didn't jump to line 602 because the condition on line 599 was always true
600 details["env_ignored"] = "yes"
601 else:
602 score -= 2
603 details["env_ignored"] = "no (-.5 pts)"
604 else:
605 score -= 1
606 details["gitignore"] = "missing"
608 # Check for hardcoded secrets (basic patterns)
609 with suppress(
610 OSError,
611 PermissionError,
612 FileNotFoundError,
613 UnicodeDecodeError,
614 ValueError,
615 ):
616 py_files = list(project_dir.rglob("*.py"))[:50] # Limit to 50 files
617 secret_patterns = [
618 r"password\s*=\s*['\"][^'\"]+['\"]",
619 r"api_key\s*=\s*['\"][^'\"]+['\"]",
620 r"secret\s*=\s*['\"][^'\"]+['\"]",
621 ]
623 for py_file in py_files:
624 content = py_file.read_text()
625 for pattern in secret_patterns:
626 if re.search( 626 ↛ 631line 626 didn't jump to line 631 because the condition on line 626 was never true
627 pattern,
628 content,
629 re.IGNORECASE,
630 ): # REGEX OK: security pattern detection
631 score -= 2
632 details["hardcoded_secrets"] = f"found in {py_file.name}"
633 break
635 return {"score": max(0, score), "details": details}
638def _calculate_trust_score(
639 permissions_count: int,
640 session_available: bool,
641 tool_count: int,
642) -> TrustScore:
643 """Calculate trust score (separate from quality, 0-100).
645 This measures environment trust, not code quality.
646 """
647 # Trusted operations (0-40 points)
648 trusted_ops = min(permissions_count * 10, 40) # 4 operations = max
650 # Session availability (0-30 points)
651 session_score = 30 if session_available else 5
653 # Tool ecosystem (0-30 points)
654 # Scale by number of available tools
655 tool_score = min(tool_count * 3, 30) # 10 tools = max
657 total = trusted_ops + session_score + tool_score
659 return TrustScore(
660 trusted_operations=trusted_ops,
661 session_availability=session_score,
662 tool_ecosystem=tool_score,
663 total=total,
664 details={
665 "permissions_count": permissions_count,
666 "session_available": session_available,
667 "tool_count": tool_count,
668 },
669 )
672def _get_cached_metrics(cache_key: str) -> dict[str, Any] | None:
673 """Get cached metrics if still valid."""
674 if cache_key not in _metrics_cache:
675 return None
677 cached_metrics, cached_time = _metrics_cache[cache_key]
678 if datetime.now() - cached_time < timedelta(minutes=_CACHE_TTL_MINUTES): 678 ↛ 680line 678 didn't jump to line 680 because the condition on line 678 was always true
679 return cached_metrics
680 return None
683def _parse_metrics_history(metrics_history: list[dict[str, Any]]) -> dict[str, Any]:
684 """Parse Crackerjack metrics history into structured format."""
685 # Start with only defaults for non-coverage metrics
686 metrics: dict[str, Any] = {
687 "lint_score": 100, # Default if not found
688 "security_score": 100,
689 "complexity_score": 100,
690 }
692 # Parse all recent metrics and only include what we find
693 for metric in metrics_history[:10]: # Last 10 metrics
694 metric_type = metric.get("metric_type")
695 metric_value = metric.get("metric_value", 0)
697 # Add metrics that exist in the history
698 if metric_type == "code_coverage" and "code_coverage" not in metrics:
699 # First coverage metric found
700 metrics["code_coverage"] = metric_value
701 elif metric_type in {
702 "lint_score",
703 "security_score",
704 "complexity_score",
705 }: # FURB109
706 # Update these if found
707 metrics[metric_type] = metric_value
709 return metrics
712def _read_coverage_json(project_dir: Path) -> float:
713 """Read coverage percentage from coverage.json."""
714 import json
716 coverage_json = project_dir / "coverage.json"
717 if not coverage_json.exists():
718 return 0
720 with suppress(
721 OSError,
722 PermissionError,
723 FileNotFoundError,
724 json.JSONDecodeError,
725 ValueError,
726 KeyError,
727 ):
728 coverage_data = json.loads(coverage_json.read_text())
729 return float(coverage_data.get("totals", {}).get("percent_covered", 0))
731 return 0
734def _create_fallback_metrics(coverage_pct: float) -> dict[str, Any]:
735 """Create default metrics with coverage."""
736 return {
737 "code_coverage": coverage_pct,
738 "lint_score": 100,
739 "security_score": 100,
740 "complexity_score": 100,
741 }
744async def _get_crackerjack_metrics(project_dir: Path) -> dict[str, Any]:
745 """Get Crackerjack quality metrics with caching."""
746 cache_key = str(project_dir.resolve())
748 # Check cache
749 if cached := _get_cached_metrics(cache_key):
750 return cached
752 # Fetch fresh metrics
753 if not CRACKERJACK_AVAILABLE: 753 ↛ 754line 753 didn't jump to line 754 because the condition on line 753 was never true
754 return {}
756 with suppress(ImportError, RuntimeError, ValueError, AttributeError, OSError):
757 # Get recent metrics from Crackerjack history
758 metrics_history = await get_quality_metrics_history(
759 str(project_dir),
760 None,
761 days=1,
762 )
764 if metrics_history: 764 ↛ 765line 764 didn't jump to line 765 because the condition on line 764 was never true
765 metrics = _parse_metrics_history(metrics_history)
767 # If coverage is missing from Crackerjack, try coverage.json fallback
768 if "code_coverage" not in metrics:
769 if coverage_pct := _read_coverage_json(project_dir):
770 metrics["code_coverage"] = coverage_pct
772 # Cache the result
773 _metrics_cache[cache_key] = (metrics, datetime.now())
774 return metrics
776 # Complete fallback: No Crackerjack data at all, try coverage.json
777 if coverage_pct := _read_coverage_json(project_dir):
778 fallback_metrics = _create_fallback_metrics(coverage_pct)
779 _metrics_cache[cache_key] = (fallback_metrics, datetime.now())
780 return fallback_metrics
782 return {}
785async def _get_type_coverage(
786 project_dir: Path,
787 crackerjack_metrics: dict[str, Any],
788) -> float:
789 """Get type coverage percentage.
791 Try to extract from Crackerjack, fallback to manual check.
792 """
793 # First, try to get from Crackerjack metrics
794 if "type_coverage" in crackerjack_metrics:
795 return float(crackerjack_metrics["type_coverage"])
797 # Fallback: Check for pyright/mypy configuration
798 has_pyright = (project_dir / "pyrightconfig.json").exists()
799 has_mypy = (project_dir / "mypy.ini").exists() or (
800 project_dir / "pyproject.toml"
801 ).exists()
803 if has_pyright or has_mypy:
804 # Estimate based on project structure
805 # This is a rough estimate until we have actual coverage data
806 return 70.0 # Assume decent coverage if type checker configured
808 return 30.0 # Low default if no type checking
811def _generate_recommendations_v2(
812 code_quality: CodeQualityScore,
813 project_health: ProjectHealthScore,
814 dev_velocity: DevVelocityScore,
815 security: SecurityScore,
816 total_score: float,
817) -> list[str]:
818 """Generate actionable recommendations based on V2 scores."""
819 recommendations = []
821 # Overall score assessment
822 if total_score >= 90:
823 recommendations.append("⭐ Excellent code quality - maintain current standards")
824 elif total_score >= 75:
825 recommendations.append("✅ Good quality - minor improvements available")
826 elif total_score >= 60:
827 recommendations.append("⚠️ Moderate quality - focus on improvements below")
828 else:
829 recommendations.append("🚨 Quality needs attention - prioritize critical fixes")
831 # Code quality recommendations
832 if code_quality.test_coverage < 10: # <67% coverage
833 recommendations.append(
834 f"🧪 Critical: Increase test coverage ({code_quality.details['coverage_pct']:.1f}% → target 80%+)",
835 )
836 elif code_quality.test_coverage < 13: # <87% coverage
837 recommendations.append(
838 f"🧪 Add more tests ({code_quality.details['coverage_pct']:.1f}% coverage)",
839 )
841 if code_quality.lint_score < 8: # <80% lint score
842 recommendations.append("🔧 Address lint issues to improve code quality")
844 if code_quality.type_coverage < 7: # <70% type coverage
845 recommendations.append("📝 Add type hints for better code safety")
847 if code_quality.complexity_score < 3: # High complexity
848 recommendations.append("🔄 Refactor complex functions (reduce complexity)")
850 # Project health recommendations
851 if project_health.tooling_score < 10:
852 recommendations.append(
853 "🔨 Improve tooling setup (add lockfile, update dependencies)",
854 )
856 if project_health.maturity_score < 10:
857 recommendations.append("📚 Enhance project maturity (add docs, tests, CI/CD)")
859 # Dev velocity recommendations
860 if dev_velocity.git_activity < 5:
861 recommendations.append("💬 Improve commit quality (use conventional commits)")
863 if dev_velocity.dev_patterns < 5:
864 recommendations.append("🌿 Consider feature branch workflow and issue tracking")
866 # Security recommendations
867 if security.total < 8:
868 recommendations.append("🔒 Address security issues (run bandit, check secrets)")
870 return recommendations
873# Backward compatibility: Export V1 calculator as well
874from session_buddy.utils.quality_utils import (
875 _extract_quality_scores,
876 _generate_quality_trend_recommendations,
877)
879__all__ = [
880 "CodeQualityScore",
881 "DevVelocityScore",
882 "ProjectHealthScore",
883 "QualityScoreV2",
884 "SecurityScore",
885 "TrustScore",
886 "_extract_quality_scores",
887 "_generate_quality_trend_recommendations",
888 "calculate_quality_score_v2",
889]