Coverage for session_buddy / tools / recommendation_engine.py: 93.03%

209 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-04 00:43 -0800

1"""Recommendation engine for learning from crackerjack execution history.""" 

2 

3import re 

4from collections import defaultdict 

5from contextlib import suppress 

6from dataclasses import dataclass 

7from datetime import datetime, timedelta 

8from typing import Any 

9 

10from .agent_analyzer import AgentRecommendation, AgentType 

11 

12 

13@dataclass 

14class FailurePattern: 

15 """Detected failure pattern from historical executions.""" 

16 

17 pattern_signature: str # Unique identifier for the pattern 

18 occurrences: int 

19 last_seen: datetime 

20 successful_fixes: list[AgentType] # Agents that successfully fixed this pattern 

21 failed_fixes: list[AgentType] # Agents that failed to fix this pattern 

22 avg_fix_time: float # Average time to fix in seconds 

23 

24 

25@dataclass 

26class AgentEffectiveness: 

27 """Track effectiveness of an agent over time.""" 

28 

29 agent: AgentType 

30 total_recommendations: int 

31 successful_fixes: int 

32 failed_fixes: int 

33 avg_confidence: float 

34 success_rate: float # 0.0-1.0 

35 

36 

37class RecommendationEngine: 

38 """Learn from execution history to improve recommendations.""" 

39 

40 @classmethod 

41 async def _get_cached_result(cls, project: str, days: int) -> dict[str, Any] | None: 

42 """Get cached analysis result if available.""" 

43 from .history_cache import get_cache 

44 

45 cache = get_cache() 

46 return await cache.get(project, days) 

47 

48 @classmethod 

49 def _filter_results_by_date( 

50 cls, 

51 results: list[dict[str, Any]], 

52 start_date: datetime, 

53 ) -> list[dict[str, Any]]: 

54 """Filter results by date range.""" 

55 filtered_results = [] 

56 for result in results: 

57 timestamp_str = result.get("timestamp") 

58 if timestamp_str: 58 ↛ 56line 58 didn't jump to line 56 because the condition on line 58 was always true

59 try: 

60 if isinstance(timestamp_str, str): 60 ↛ 63line 60 didn't jump to line 63 because the condition on line 60 was always true

61 result_date = datetime.fromisoformat(timestamp_str) 

62 else: 

63 result_date = timestamp_str 

64 if result_date >= start_date: 64 ↛ 56line 64 didn't jump to line 56 because the condition on line 64 was always true

65 filtered_results.append(result) 

66 except (ValueError, AttributeError): 

67 filtered_results.append(result) 

68 

69 return filtered_results 

70 

71 @classmethod 

72 async def _cache_result( 

73 cls, 

74 project: str, 

75 days: int, 

76 result: dict[str, Any], 

77 ) -> None: 

78 """Cache analysis result.""" 

79 from .history_cache import get_cache 

80 

81 cache = get_cache() 

82 await cache.set(project, days, result) 

83 

84 @classmethod 

85 async def analyze_history( 

86 cls, 

87 db: Any, # ReflectionDatabase 

88 project: str, 

89 days: int = 30, 

90 use_cache: bool = True, 

91 ) -> dict[str, Any]: 

92 """Analyze execution history for patterns and effectiveness. 

93 

94 Args: 

95 db: ReflectionDatabase instance 

96 project: Project name 

97 days: Number of days to analyze 

98 use_cache: Whether to use cached results (default: True) 

99 

100 Returns: 

101 Dictionary with patterns, agent effectiveness, and insights 

102 

103 """ 

104 # Check cache first 

105 if use_cache: 

106 if cached_result := await cls._get_cached_result(project, days): 

107 return cached_result 

108 

109 end_date = datetime.now() 

110 start_date = end_date - timedelta(days=days) 

111 

112 # Search for crackerjack executions with agent recommendations 

113 results = await db.search_conversations( 

114 query="crackerjack agent_recommendations", 

115 project=project, 

116 limit=100, 

117 ) 

118 

119 # Filter by date 

120 filtered_results = cls._filter_results_by_date(results, start_date) 

121 

122 # Extract patterns and effectiveness 

123 patterns = cls._extract_patterns(filtered_results) 

124 effectiveness = cls._calculate_agent_effectiveness(filtered_results) 

125 insights = cls._generate_insights(patterns, effectiveness) 

126 

127 analysis_result: dict[str, Any] = { 

128 "patterns": patterns, 

129 "agent_effectiveness": effectiveness, 

130 "insights": insights, 

131 "total_executions": len(filtered_results), 

132 "date_range": {"start": start_date, "end": end_date}, 

133 } 

134 

135 # Cache the result 

136 if use_cache: 

137 await cls._cache_result(project, days, analysis_result) 

138 

139 return analysis_result 

140 

141 @classmethod 

142 def _update_timestamp( 

143 cls, 

144 pattern_data: dict[str, dict[str, Any]], 

145 signature: str, 

146 timestamp_str: str | None, 

147 ) -> None: 

148 """Update last seen timestamp for pattern.""" 

149 if not timestamp_str: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true

150 return 

151 

152 with suppress(ValueError, AttributeError): # FURB107 

153 timestamp = ( 

154 datetime.fromisoformat(timestamp_str) 

155 if isinstance(timestamp_str, str) 

156 else timestamp_str 

157 ) 

158 if ( 

159 not pattern_data[signature]["last_seen"] 

160 or timestamp > pattern_data[signature]["last_seen"] 

161 ): 

162 pattern_data[signature]["last_seen"] = timestamp 

163 

164 @classmethod 

165 def _track_agent_fixes( 

166 cls, 

167 pattern_data: dict[str, dict[str, Any]], 

168 signature: str, 

169 recommendations: list[dict[str, Any]], 

170 next_metadata: dict[str, Any], 

171 ) -> None: 

172 """Track agent recommendation success/failure.""" 

173 next_exit_code = next_metadata.get("exit_code", 1) 

174 

175 for rec in recommendations: 

176 agent_name = rec.get("agent") 

177 if not agent_name: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true

178 continue 

179 

180 with suppress(ValueError): # FURB107 

181 agent = AgentType(agent_name) 

182 if next_exit_code == 0: 

183 pattern_data[signature]["successful_fixes"].append(agent) 

184 if exec_time := next_metadata.get("execution_time"): 

185 pattern_data[signature]["fix_times"].append(exec_time) 

186 else: 

187 pattern_data[signature]["failed_fixes"].append(agent) 

188 

189 @classmethod 

190 def _extract_patterns(cls, results: list[dict[str, Any]]) -> list[FailurePattern]: 

191 """Extract failure patterns from execution history.""" 

192 pattern_data: dict[str, dict[str, Any]] = defaultdict( 

193 lambda: { 

194 "occurrences": 0, 

195 "last_seen": None, 

196 "successful_fixes": [], 

197 "failed_fixes": [], 

198 "fix_times": [], 

199 }, 

200 ) 

201 

202 for i, result in enumerate(results): 

203 metadata = result.get("metadata", {}) 

204 content = result.get("content", "") 

205 

206 signature = cls._generate_signature(content, metadata) 

207 if not signature: 

208 continue 

209 

210 pattern_data[signature]["occurrences"] += 1 

211 cls._update_timestamp(pattern_data, signature, result.get("timestamp")) 

212 

213 recommendations = metadata.get("agent_recommendations", []) 

214 if recommendations and i + 1 < len(results): 

215 next_metadata = results[i + 1].get("metadata", {}) 

216 cls._track_agent_fixes( 

217 pattern_data, 

218 signature, 

219 recommendations, 

220 next_metadata, 

221 ) 

222 

223 # Convert to FailurePattern objects 

224 patterns = [ 

225 FailurePattern( 

226 pattern_signature=signature, 

227 occurrences=data["occurrences"], 

228 last_seen=data["last_seen"] or datetime.now(), 

229 successful_fixes=data["successful_fixes"], 

230 failed_fixes=data["failed_fixes"], 

231 avg_fix_time=( 

232 sum(data["fix_times"]) / len(data["fix_times"]) 

233 if data["fix_times"] 

234 else 0.0 

235 ), 

236 ) 

237 for signature, data in pattern_data.items() 

238 ] 

239 

240 return sorted(patterns, key=lambda p: p.occurrences, reverse=True) 

241 

242 @classmethod 

243 def _process_recommendation( 

244 cls, 

245 rec: dict[str, Any], 

246 next_exit_code: int, 

247 agent_stats: dict[AgentType, dict[str, Any]], 

248 ) -> None: 

249 """Process a single recommendation and update stats.""" 

250 agent_name = rec.get("agent") 

251 if not agent_name: 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true

252 return 

253 

254 with suppress(ValueError): # FURB107 

255 agent = AgentType(agent_name) 

256 agent_stats[agent]["total_recommendations"] += 1 

257 agent_stats[agent]["confidences"].append(rec.get("confidence", 0.0)) 

258 

259 if next_exit_code == 0: 

260 agent_stats[agent]["successful_fixes"] += 1 

261 else: 

262 agent_stats[agent]["failed_fixes"] += 1 

263 

264 @classmethod 

265 def _create_effectiveness( 

266 cls, 

267 agent: AgentType, 

268 stats: dict[str, Any], 

269 ) -> AgentEffectiveness | None: 

270 """Create AgentEffectiveness from stats dict.""" 

271 total = stats["total_recommendations"] 

272 if total == 0: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true

273 return None 

274 

275 successful = stats["successful_fixes"] 

276 success_rate = successful / total 

277 avg_confidence = ( 

278 sum(stats["confidences"]) / len(stats["confidences"]) 

279 if stats["confidences"] 

280 else 0.0 

281 ) 

282 

283 return AgentEffectiveness( 

284 agent=agent, 

285 total_recommendations=total, 

286 successful_fixes=successful, 

287 failed_fixes=stats["failed_fixes"], 

288 avg_confidence=avg_confidence, 

289 success_rate=success_rate, 

290 ) 

291 

292 @classmethod 

293 def _calculate_agent_effectiveness( 

294 cls, 

295 results: list[dict[str, Any]], 

296 ) -> list[AgentEffectiveness]: 

297 """Calculate effectiveness metrics for each agent.""" 

298 agent_stats: dict[AgentType, dict[str, Any]] = defaultdict( 

299 lambda: { 

300 "total_recommendations": 0, 

301 "successful_fixes": 0, 

302 "failed_fixes": 0, 

303 "confidences": [], 

304 }, 

305 ) 

306 

307 for i, result in enumerate(results): 

308 metadata = result.get("metadata", {}) 

309 recommendations = metadata.get("agent_recommendations", []) 

310 

311 if recommendations and i + 1 < len(results): 

312 next_exit_code = results[i + 1].get("metadata", {}).get("exit_code", 1) 

313 for rec in recommendations: 

314 cls._process_recommendation(rec, next_exit_code, agent_stats) 

315 

316 # Convert to AgentEffectiveness objects 

317 effectiveness = [ 

318 eff 

319 for agent, stats in agent_stats.items() 

320 if (eff := cls._create_effectiveness(agent, stats)) is not None 

321 ] 

322 

323 return sorted(effectiveness, key=lambda e: e.success_rate, reverse=True) 

324 

325 @classmethod 

326 def _generate_signature(cls, content: str, metadata: dict[str, Any]) -> str: 

327 """Generate unique signature for a failure pattern.""" 

328 # Extract key error indicators 

329 exit_code = metadata.get("exit_code", 0) 

330 if exit_code == 0: 

331 return "" # Not a failure 

332 

333 metrics = metadata.get("metrics", {}) 

334 

335 # Build signature from error characteristics 

336 signature_parts = [] 

337 

338 # Complexity violations 

339 if metrics.get("complexity_violations", 0) > 0: 

340 signature_parts.append(f"complexity:{metrics['max_complexity']}") 

341 

342 # Security issues 

343 if metrics.get("security_issues", 0) > 0: 

344 signature_parts.append(f"security:{metrics['security_issues']}") 

345 

346 # Test failures 

347 if metrics.get("tests_failed", 0) > 0: 

348 signature_parts.append(f"test_failures:{metrics['tests_failed']}") 

349 

350 # Type errors 

351 if metrics.get("type_errors", 0) > 0: 

352 signature_parts.append(f"type_errors:{metrics['type_errors']}") 

353 

354 # Formatting issues 

355 if metrics.get("formatting_issues", 0) > 0: 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true

356 signature_parts.append("formatting") 

357 

358 # Extract specific error patterns from content 

359 error_patterns = [ 

360 r"B\d{3}", # Bandit codes 

361 r"E\d{3}", # Ruff codes 

362 r"F\d{3}", # Pyflakes codes 

363 ] 

364 

365 for pattern in error_patterns: 

366 matches = re.findall( # REGEX OK: error code extraction from patterns 

367 pattern, 

368 content, 

369 ) 

370 if matches: 

371 signature_parts.extend(sorted(set(matches))[:3]) # Top 3 unique codes 

372 

373 return "|".join(signature_parts) if signature_parts else "unknown_failure" 

374 

375 @classmethod 

376 def _get_pattern_insights(cls, patterns: list[FailurePattern]) -> list[str]: 

377 """Generate insights from failure patterns.""" 

378 if not patterns: 

379 return [] 

380 

381 insights = [] 

382 most_common = patterns[0] 

383 insights.append( 

384 f"🔄 Most common failure: '{most_common.pattern_signature}' " 

385 f"({most_common.occurrences} occurrences)", 

386 ) 

387 

388 recent_patterns = [ 

389 p for p in patterns if (datetime.now() - p.last_seen).days <= 7 

390 ] 

391 if len(recent_patterns) > 3: 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true

392 insights.append( 

393 f"⚠️ {len(recent_patterns)} different failure patterns in last 7 days - " 

394 f"consider addressing root causes", 

395 ) 

396 

397 return insights 

398 

399 @classmethod 

400 def _get_effectiveness_insights( 

401 cls, 

402 effectiveness: list[AgentEffectiveness], 

403 ) -> list[str]: 

404 """Generate insights from agent effectiveness.""" 

405 if not effectiveness: 

406 return [] 

407 

408 insights = [] 

409 top_agent = effectiveness[0] 

410 if top_agent.success_rate >= 0.8: 

411 insights.append( 

412 f"{top_agent.agent.value} has {top_agent.success_rate:.0%} success rate - " 

413 f"highly effective!", 

414 ) 

415 

416 low_performers = [e for e in effectiveness if e.success_rate < 0.3] 

417 if low_performers: 

418 agents = ", ".join(e.agent.value for e in low_performers[:2]) 

419 insights.append( 

420 f"📉 Low success rate for: {agents} - " 

421 f"review recommendations or patterns", 

422 ) 

423 

424 return insights 

425 

426 @classmethod 

427 def _get_cross_pattern_insights( 

428 cls, 

429 patterns: list[FailurePattern], 

430 effectiveness: list[AgentEffectiveness], 

431 ) -> list[str]: 

432 """Generate insights from pattern-effectiveness correlation.""" 

433 if not (patterns and effectiveness): 

434 return [] 

435 

436 reliable_fixes = [ 

437 p 

438 for p in patterns 

439 if p.successful_fixes and not p.failed_fixes and p.occurrences >= 2 

440 ] 

441 

442 if reliable_fixes: 442 ↛ 443line 442 didn't jump to line 443 because the condition on line 442 was never true

443 return [ 

444 f"{len(reliable_fixes)} patterns have consistent successful fixes - " 

445 f"good agent-pattern matching", 

446 ] 

447 return [] 

448 

449 @classmethod 

450 def _generate_insights( 

451 cls, 

452 patterns: list[FailurePattern], 

453 effectiveness: list[AgentEffectiveness], 

454 ) -> list[str]: 

455 """Generate actionable insights from patterns and effectiveness data.""" 

456 insights = ( 

457 cls._get_pattern_insights(patterns) 

458 + cls._get_effectiveness_insights(effectiveness) 

459 + cls._get_cross_pattern_insights(patterns, effectiveness) 

460 ) 

461 

462 if not insights: 

463 insights.append( 

464 "📊 Insufficient data - continue using AI mode to build history", 

465 ) 

466 

467 return insights 

468 

469 @classmethod 

470 def _adjust_single_recommendation( 

471 cls, 

472 rec: AgentRecommendation, 

473 agent_eff: AgentEffectiveness | None, 

474 ) -> AgentRecommendation: 

475 """Adjust a single recommendation based on effectiveness data.""" 

476 if not agent_eff or agent_eff.total_recommendations < 5: 

477 return rec # Not enough data 

478 

479 # Blend original and learned confidence (60% learned, 40% original) 

480 adjusted_confidence = min( 

481 (0.6 * agent_eff.success_rate) + (0.4 * rec.confidence), 

482 1.0, 

483 ) 

484 

485 return AgentRecommendation( 

486 agent=rec.agent, 

487 confidence=adjusted_confidence, 

488 reason=f"{rec.reason} (adjusted based on {agent_eff.success_rate:.0%} historical success)", 

489 quick_fix_command=rec.quick_fix_command, 

490 pattern_matched=rec.pattern_matched, 

491 ) 

492 

493 @classmethod 

494 def adjust_confidence( 

495 cls, 

496 recommendations: list[AgentRecommendation], 

497 effectiveness: list[AgentEffectiveness], 

498 ) -> list[AgentRecommendation]: 

499 """Adjust recommendation confidence scores based on historical effectiveness. 

500 

501 Args: 

502 recommendations: Original recommendations from AgentAnalyzer 

503 effectiveness: Historical effectiveness data 

504 

505 Returns: 

506 Recommendations with adjusted confidence scores 

507 

508 """ 

509 effectiveness_map = {e.agent: e for e in effectiveness} 

510 

511 adjusted = [ 

512 cls._adjust_single_recommendation(rec, effectiveness_map.get(rec.agent)) 

513 for rec in recommendations 

514 ] 

515 

516 return sorted(adjusted, key=lambda r: r.confidence, reverse=True)[:3]