Coverage for session_buddy / tools / recommendation_engine.py: 93.03%
209 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-04 00:43 -0800
1"""Recommendation engine for learning from crackerjack execution history."""
3import re
4from collections import defaultdict
5from contextlib import suppress
6from dataclasses import dataclass
7from datetime import datetime, timedelta
8from typing import Any
10from .agent_analyzer import AgentRecommendation, AgentType
13@dataclass
14class FailurePattern:
15 """Detected failure pattern from historical executions."""
17 pattern_signature: str # Unique identifier for the pattern
18 occurrences: int
19 last_seen: datetime
20 successful_fixes: list[AgentType] # Agents that successfully fixed this pattern
21 failed_fixes: list[AgentType] # Agents that failed to fix this pattern
22 avg_fix_time: float # Average time to fix in seconds
25@dataclass
26class AgentEffectiveness:
27 """Track effectiveness of an agent over time."""
29 agent: AgentType
30 total_recommendations: int
31 successful_fixes: int
32 failed_fixes: int
33 avg_confidence: float
34 success_rate: float # 0.0-1.0
37class RecommendationEngine:
38 """Learn from execution history to improve recommendations."""
40 @classmethod
41 async def _get_cached_result(cls, project: str, days: int) -> dict[str, Any] | None:
42 """Get cached analysis result if available."""
43 from .history_cache import get_cache
45 cache = get_cache()
46 return await cache.get(project, days)
48 @classmethod
49 def _filter_results_by_date(
50 cls,
51 results: list[dict[str, Any]],
52 start_date: datetime,
53 ) -> list[dict[str, Any]]:
54 """Filter results by date range."""
55 filtered_results = []
56 for result in results:
57 timestamp_str = result.get("timestamp")
58 if timestamp_str: 58 ↛ 56line 58 didn't jump to line 56 because the condition on line 58 was always true
59 try:
60 if isinstance(timestamp_str, str): 60 ↛ 63line 60 didn't jump to line 63 because the condition on line 60 was always true
61 result_date = datetime.fromisoformat(timestamp_str)
62 else:
63 result_date = timestamp_str
64 if result_date >= start_date: 64 ↛ 56line 64 didn't jump to line 56 because the condition on line 64 was always true
65 filtered_results.append(result)
66 except (ValueError, AttributeError):
67 filtered_results.append(result)
69 return filtered_results
71 @classmethod
72 async def _cache_result(
73 cls,
74 project: str,
75 days: int,
76 result: dict[str, Any],
77 ) -> None:
78 """Cache analysis result."""
79 from .history_cache import get_cache
81 cache = get_cache()
82 await cache.set(project, days, result)
84 @classmethod
85 async def analyze_history(
86 cls,
87 db: Any, # ReflectionDatabase
88 project: str,
89 days: int = 30,
90 use_cache: bool = True,
91 ) -> dict[str, Any]:
92 """Analyze execution history for patterns and effectiveness.
94 Args:
95 db: ReflectionDatabase instance
96 project: Project name
97 days: Number of days to analyze
98 use_cache: Whether to use cached results (default: True)
100 Returns:
101 Dictionary with patterns, agent effectiveness, and insights
103 """
104 # Check cache first
105 if use_cache:
106 if cached_result := await cls._get_cached_result(project, days):
107 return cached_result
109 end_date = datetime.now()
110 start_date = end_date - timedelta(days=days)
112 # Search for crackerjack executions with agent recommendations
113 results = await db.search_conversations(
114 query="crackerjack agent_recommendations",
115 project=project,
116 limit=100,
117 )
119 # Filter by date
120 filtered_results = cls._filter_results_by_date(results, start_date)
122 # Extract patterns and effectiveness
123 patterns = cls._extract_patterns(filtered_results)
124 effectiveness = cls._calculate_agent_effectiveness(filtered_results)
125 insights = cls._generate_insights(patterns, effectiveness)
127 analysis_result: dict[str, Any] = {
128 "patterns": patterns,
129 "agent_effectiveness": effectiveness,
130 "insights": insights,
131 "total_executions": len(filtered_results),
132 "date_range": {"start": start_date, "end": end_date},
133 }
135 # Cache the result
136 if use_cache:
137 await cls._cache_result(project, days, analysis_result)
139 return analysis_result
141 @classmethod
142 def _update_timestamp(
143 cls,
144 pattern_data: dict[str, dict[str, Any]],
145 signature: str,
146 timestamp_str: str | None,
147 ) -> None:
148 """Update last seen timestamp for pattern."""
149 if not timestamp_str: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true
150 return
152 with suppress(ValueError, AttributeError): # FURB107
153 timestamp = (
154 datetime.fromisoformat(timestamp_str)
155 if isinstance(timestamp_str, str)
156 else timestamp_str
157 )
158 if (
159 not pattern_data[signature]["last_seen"]
160 or timestamp > pattern_data[signature]["last_seen"]
161 ):
162 pattern_data[signature]["last_seen"] = timestamp
164 @classmethod
165 def _track_agent_fixes(
166 cls,
167 pattern_data: dict[str, dict[str, Any]],
168 signature: str,
169 recommendations: list[dict[str, Any]],
170 next_metadata: dict[str, Any],
171 ) -> None:
172 """Track agent recommendation success/failure."""
173 next_exit_code = next_metadata.get("exit_code", 1)
175 for rec in recommendations:
176 agent_name = rec.get("agent")
177 if not agent_name: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true
178 continue
180 with suppress(ValueError): # FURB107
181 agent = AgentType(agent_name)
182 if next_exit_code == 0:
183 pattern_data[signature]["successful_fixes"].append(agent)
184 if exec_time := next_metadata.get("execution_time"):
185 pattern_data[signature]["fix_times"].append(exec_time)
186 else:
187 pattern_data[signature]["failed_fixes"].append(agent)
189 @classmethod
190 def _extract_patterns(cls, results: list[dict[str, Any]]) -> list[FailurePattern]:
191 """Extract failure patterns from execution history."""
192 pattern_data: dict[str, dict[str, Any]] = defaultdict(
193 lambda: {
194 "occurrences": 0,
195 "last_seen": None,
196 "successful_fixes": [],
197 "failed_fixes": [],
198 "fix_times": [],
199 },
200 )
202 for i, result in enumerate(results):
203 metadata = result.get("metadata", {})
204 content = result.get("content", "")
206 signature = cls._generate_signature(content, metadata)
207 if not signature:
208 continue
210 pattern_data[signature]["occurrences"] += 1
211 cls._update_timestamp(pattern_data, signature, result.get("timestamp"))
213 recommendations = metadata.get("agent_recommendations", [])
214 if recommendations and i + 1 < len(results):
215 next_metadata = results[i + 1].get("metadata", {})
216 cls._track_agent_fixes(
217 pattern_data,
218 signature,
219 recommendations,
220 next_metadata,
221 )
223 # Convert to FailurePattern objects
224 patterns = [
225 FailurePattern(
226 pattern_signature=signature,
227 occurrences=data["occurrences"],
228 last_seen=data["last_seen"] or datetime.now(),
229 successful_fixes=data["successful_fixes"],
230 failed_fixes=data["failed_fixes"],
231 avg_fix_time=(
232 sum(data["fix_times"]) / len(data["fix_times"])
233 if data["fix_times"]
234 else 0.0
235 ),
236 )
237 for signature, data in pattern_data.items()
238 ]
240 return sorted(patterns, key=lambda p: p.occurrences, reverse=True)
242 @classmethod
243 def _process_recommendation(
244 cls,
245 rec: dict[str, Any],
246 next_exit_code: int,
247 agent_stats: dict[AgentType, dict[str, Any]],
248 ) -> None:
249 """Process a single recommendation and update stats."""
250 agent_name = rec.get("agent")
251 if not agent_name: 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true
252 return
254 with suppress(ValueError): # FURB107
255 agent = AgentType(agent_name)
256 agent_stats[agent]["total_recommendations"] += 1
257 agent_stats[agent]["confidences"].append(rec.get("confidence", 0.0))
259 if next_exit_code == 0:
260 agent_stats[agent]["successful_fixes"] += 1
261 else:
262 agent_stats[agent]["failed_fixes"] += 1
264 @classmethod
265 def _create_effectiveness(
266 cls,
267 agent: AgentType,
268 stats: dict[str, Any],
269 ) -> AgentEffectiveness | None:
270 """Create AgentEffectiveness from stats dict."""
271 total = stats["total_recommendations"]
272 if total == 0: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true
273 return None
275 successful = stats["successful_fixes"]
276 success_rate = successful / total
277 avg_confidence = (
278 sum(stats["confidences"]) / len(stats["confidences"])
279 if stats["confidences"]
280 else 0.0
281 )
283 return AgentEffectiveness(
284 agent=agent,
285 total_recommendations=total,
286 successful_fixes=successful,
287 failed_fixes=stats["failed_fixes"],
288 avg_confidence=avg_confidence,
289 success_rate=success_rate,
290 )
292 @classmethod
293 def _calculate_agent_effectiveness(
294 cls,
295 results: list[dict[str, Any]],
296 ) -> list[AgentEffectiveness]:
297 """Calculate effectiveness metrics for each agent."""
298 agent_stats: dict[AgentType, dict[str, Any]] = defaultdict(
299 lambda: {
300 "total_recommendations": 0,
301 "successful_fixes": 0,
302 "failed_fixes": 0,
303 "confidences": [],
304 },
305 )
307 for i, result in enumerate(results):
308 metadata = result.get("metadata", {})
309 recommendations = metadata.get("agent_recommendations", [])
311 if recommendations and i + 1 < len(results):
312 next_exit_code = results[i + 1].get("metadata", {}).get("exit_code", 1)
313 for rec in recommendations:
314 cls._process_recommendation(rec, next_exit_code, agent_stats)
316 # Convert to AgentEffectiveness objects
317 effectiveness = [
318 eff
319 for agent, stats in agent_stats.items()
320 if (eff := cls._create_effectiveness(agent, stats)) is not None
321 ]
323 return sorted(effectiveness, key=lambda e: e.success_rate, reverse=True)
325 @classmethod
326 def _generate_signature(cls, content: str, metadata: dict[str, Any]) -> str:
327 """Generate unique signature for a failure pattern."""
328 # Extract key error indicators
329 exit_code = metadata.get("exit_code", 0)
330 if exit_code == 0:
331 return "" # Not a failure
333 metrics = metadata.get("metrics", {})
335 # Build signature from error characteristics
336 signature_parts = []
338 # Complexity violations
339 if metrics.get("complexity_violations", 0) > 0:
340 signature_parts.append(f"complexity:{metrics['max_complexity']}")
342 # Security issues
343 if metrics.get("security_issues", 0) > 0:
344 signature_parts.append(f"security:{metrics['security_issues']}")
346 # Test failures
347 if metrics.get("tests_failed", 0) > 0:
348 signature_parts.append(f"test_failures:{metrics['tests_failed']}")
350 # Type errors
351 if metrics.get("type_errors", 0) > 0:
352 signature_parts.append(f"type_errors:{metrics['type_errors']}")
354 # Formatting issues
355 if metrics.get("formatting_issues", 0) > 0: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true
356 signature_parts.append("formatting")
358 # Extract specific error patterns from content
359 error_patterns = [
360 r"B\d{3}", # Bandit codes
361 r"E\d{3}", # Ruff codes
362 r"F\d{3}", # Pyflakes codes
363 ]
365 for pattern in error_patterns:
366 matches = re.findall( # REGEX OK: error code extraction from patterns
367 pattern,
368 content,
369 )
370 if matches:
371 signature_parts.extend(sorted(set(matches))[:3]) # Top 3 unique codes
373 return "|".join(signature_parts) if signature_parts else "unknown_failure"
375 @classmethod
376 def _get_pattern_insights(cls, patterns: list[FailurePattern]) -> list[str]:
377 """Generate insights from failure patterns."""
378 if not patterns:
379 return []
381 insights = []
382 most_common = patterns[0]
383 insights.append(
384 f"🔄 Most common failure: '{most_common.pattern_signature}' "
385 f"({most_common.occurrences} occurrences)",
386 )
388 recent_patterns = [
389 p for p in patterns if (datetime.now() - p.last_seen).days <= 7
390 ]
391 if len(recent_patterns) > 3: 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true
392 insights.append(
393 f"⚠️ {len(recent_patterns)} different failure patterns in last 7 days - "
394 f"consider addressing root causes",
395 )
397 return insights
399 @classmethod
400 def _get_effectiveness_insights(
401 cls,
402 effectiveness: list[AgentEffectiveness],
403 ) -> list[str]:
404 """Generate insights from agent effectiveness."""
405 if not effectiveness:
406 return []
408 insights = []
409 top_agent = effectiveness[0]
410 if top_agent.success_rate >= 0.8:
411 insights.append(
412 f"⭐ {top_agent.agent.value} has {top_agent.success_rate:.0%} success rate - "
413 f"highly effective!",
414 )
416 low_performers = [e for e in effectiveness if e.success_rate < 0.3]
417 if low_performers:
418 agents = ", ".join(e.agent.value for e in low_performers[:2])
419 insights.append(
420 f"📉 Low success rate for: {agents} - "
421 f"review recommendations or patterns",
422 )
424 return insights
426 @classmethod
427 def _get_cross_pattern_insights(
428 cls,
429 patterns: list[FailurePattern],
430 effectiveness: list[AgentEffectiveness],
431 ) -> list[str]:
432 """Generate insights from pattern-effectiveness correlation."""
433 if not (patterns and effectiveness):
434 return []
436 reliable_fixes = [
437 p
438 for p in patterns
439 if p.successful_fixes and not p.failed_fixes and p.occurrences >= 2
440 ]
442 if reliable_fixes: 442 ↛ 443line 442 didn't jump to line 443 because the condition on line 442 was never true
443 return [
444 f"✅ {len(reliable_fixes)} patterns have consistent successful fixes - "
445 f"good agent-pattern matching",
446 ]
447 return []
449 @classmethod
450 def _generate_insights(
451 cls,
452 patterns: list[FailurePattern],
453 effectiveness: list[AgentEffectiveness],
454 ) -> list[str]:
455 """Generate actionable insights from patterns and effectiveness data."""
456 insights = (
457 cls._get_pattern_insights(patterns)
458 + cls._get_effectiveness_insights(effectiveness)
459 + cls._get_cross_pattern_insights(patterns, effectiveness)
460 )
462 if not insights:
463 insights.append(
464 "📊 Insufficient data - continue using AI mode to build history",
465 )
467 return insights
469 @classmethod
470 def _adjust_single_recommendation(
471 cls,
472 rec: AgentRecommendation,
473 agent_eff: AgentEffectiveness | None,
474 ) -> AgentRecommendation:
475 """Adjust a single recommendation based on effectiveness data."""
476 if not agent_eff or agent_eff.total_recommendations < 5:
477 return rec # Not enough data
479 # Blend original and learned confidence (60% learned, 40% original)
480 adjusted_confidence = min(
481 (0.6 * agent_eff.success_rate) + (0.4 * rec.confidence),
482 1.0,
483 )
485 return AgentRecommendation(
486 agent=rec.agent,
487 confidence=adjusted_confidence,
488 reason=f"{rec.reason} (adjusted based on {agent_eff.success_rate:.0%} historical success)",
489 quick_fix_command=rec.quick_fix_command,
490 pattern_matched=rec.pattern_matched,
491 )
493 @classmethod
494 def adjust_confidence(
495 cls,
496 recommendations: list[AgentRecommendation],
497 effectiveness: list[AgentEffectiveness],
498 ) -> list[AgentRecommendation]:
499 """Adjust recommendation confidence scores based on historical effectiveness.
501 Args:
502 recommendations: Original recommendations from AgentAnalyzer
503 effectiveness: Historical effectiveness data
505 Returns:
506 Recommendations with adjusted confidence scores
508 """
509 effectiveness_map = {e.agent: e for e in effectiveness}
511 adjusted = [
512 cls._adjust_single_recommendation(rec, effectiveness_map.get(rec.agent))
513 for rec in recommendations
514 ]
516 return sorted(adjusted, key=lambda r: r.confidence, reverse=True)[:3]