Coverage for agentos/swarm/eval_feedback_loop.py: 30%
130 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2v1.9.4: Eval-Feedback Loop — closes the gap between CompositeScorer and AutoPilot.
4Wires evaluation scores back into the execution layer, creating a true
5execute → evaluate → feedback → retry 闭环 (closed loop).
6"""
8from __future__ import annotations
10import asyncio
11import time
12from dataclasses import dataclass, field
13from typing import Any, Callable, Optional
16@dataclass
17class FeedbackSignal:
18 """A signal derived from evaluation that triggers self-healing."""
20 source: str # Which scorer produced this
21 metric: str # metric name (e.g. "rouge_l", "bleu", "judge")
22 score: float # raw score
23 threshold: float # expected threshold
24 passed: bool # did it meet threshold?
25 detail: str = "" # human-readable detail
26 suggestion: str = "" # what to improve
29@dataclass
30class RetryConfig:
31 """Configuration for the retry loop."""
33 max_retries: int = 3
34 backoff_base: float = 1.0 # seconds
35 backoff_multiplier: float = 2.0
36 score_improvement_min: float = 0.05 # min score gain to consider improvement
37 timeout: float = 60.0 # total loop timeout (seconds)
40@dataclass
41class LoopResult:
42 """Result of a feedback loop execution."""
44 task: str
45 final_output: Any = None
46 scores: dict[str, float] = field(default_factory=dict)
47 attempts: int = 0
48 best_score: float = 0.0
49 converged: bool = False
50 duration: float = 0.0
51 history: list[dict] = field(default_factory=list) # each attempt trace
54class EvalFeedbackLoop:
55 """Connects evaluation scores to AutoPilot-style retry with incremental
56 prompt refinement.
58 Usage:
59 loop = EvalFeedbackLoop(scorer, RetryConfig(max_retries=3))
60 result = await loop.run(task, executor_fn, expected_output)
61 """
63 def __init__(
64 self,
65 scorer: Any = None, # CompositeScorer / CompositeScorerV2
66 config: RetryConfig | None = None,
67 reflection_prompt: str | None = None,
68 ):
69 self._scorer = scorer
70 self._config = config or RetryConfig()
71 self._reflection_prompt = reflection_prompt or (
72 "The previous attempt scored {score:.3f} (threshold: {threshold:.3f}). "
73 "Weak metrics: {weak_metrics}. "
74 "Please improve the output focusing on these weaknesses."
75 )
77 async def run(
78 self,
79 task: str,
80 executor: Callable[[str], Any],
81 expected: str = "",
82 strategy: str = "general",
83 ) -> LoopResult:
84 """Execute task with evaluation-driven retry loop.
86 Args:
87 task: original task description
88 executor: async/sync callable (task_str) → output
89 expected: expected/reference output for scoring
90 strategy: scoring strategy (qa/code/summary/translation)
92 Returns:
93 LoopResult with final output and convergence info
94 """
95 start = time.time()
96 result = LoopResult(task=task)
97 previous_score = 0.0
99 for attempt in range(1, self._config.max_retries + 1):
100 # Execute
101 output = executor(task)
102 if asyncio.iscoroutine(output):
103 output = await output
105 attempt_trace = {"attempt": attempt, "output": str(output)[:500]}
107 # Score
108 scores = self._score(output, expected, strategy)
109 attempt_trace["scores"] = scores
110 score = scores.get("weighted", 0.0)
111 passed = scores.get("passed", False)
112 attempt_trace["passed"] = passed
114 result.history.append(attempt_trace)
116 # Track best
117 if score > result.best_score:
118 result.best_score = score
119 result.final_output = output
121 # Emit feedback signal
122 signals = self._signals_from_scores(scores, strategy)
123 attempt_trace["signals"] = [
124 {"metric": s.metric, "score": s.score, "passed": s.passed}
125 for s in signals
126 ]
128 # Check convergence
129 if passed:
130 result.converged = True
131 result.attempts = attempt
132 result.scores = scores
133 result.duration = time.time() - start
134 return result
136 # Improvement check
137 if attempt > 1 and (score - previous_score) < self._config.score_improvement_min:
138 result.converged = False
139 result.attempts = attempt
140 result.scores = scores
141 result.final_output = output
142 result.duration = time.time() - start
143 return result
145 previous_score = score
147 # Refine task prompt for next attempt
148 task = self._refine_task(task, signals, score, attempt)
149 attempt_trace["refined_task"] = task
151 # Backoff
152 wait = self._config.backoff_base * (self._config.backoff_multiplier ** (attempt - 1))
153 if (time.time() - start + wait) > self._config.timeout:
154 break
155 await asyncio.sleep(wait)
157 result.attempts = self._config.max_retries
158 result.scores = scores
159 result.duration = time.time() - start
160 return result
162 def _score(
163 self, output: str, expected: str, strategy: str
164 ) -> dict[str, Any]:
165 """Score output against expected using CompositeScorer."""
166 if not self._scorer or not expected:
167 # Heuristic scoring when no scorer/reference available
168 return self._heuristic_score(output, expected)
170 try:
171 result = self._scorer.score(
172 reference=expected,
173 candidate=str(output),
174 task=strategy,
175 )
176 return {
177 "weighted": result.weighted_score,
178 "passed": result.passed,
179 "details": result.details,
180 "raw_scores": result.scores,
181 }
182 except Exception:
183 return self._heuristic_score(output, expected)
185 def _heuristic_score(self, output: str, expected: str) -> dict:
186 """Fallback scoring when no scorer is available."""
187 if not expected:
188 # No reference — score based on output quality heuristics
189 text = str(output) if output else ""
190 quality = 0.5
191 if len(text) > 50:
192 quality += 0.1
193 if len(text) > 200:
194 quality += 0.1
195 if any(kw in text.lower() for kw in ("conclusion", "result", "answer")):
196 quality += 0.1
197 return {"weighted": min(quality, 1.0), "passed": quality >= 0.5, "details": "heuristic"}
199 # Check contains match
200 contains = 1.0 if expected.lower() in str(output).lower() else 0.0
201 return {"weighted": contains * 0.5, "passed": contains > 0, "details": "contains_heuristic"}
203 def _signals_from_scores(
204 self, scores: dict, strategy: str
205 ) -> list[FeedbackSignal]:
206 """Convert score dict to FeedbackSignal list."""
207 raw = scores.get("raw_scores", {})
208 thresholds = {
209 "qa": {"rouge_l": 0.3, "contains": 0.5, "judge": 0.55},
210 "code": {"rouge_l": 0.1, "exact": 0.3, "contains": 0.5, "judge": 0.55},
211 "summary": {"rouge_l": 0.5, "semantic": 0.4, "judge": 0.55},
212 "translation": {"bleu": 0.3, "rouge_l": 0.3, "judge": 0.55},
213 }
214 strat_thresholds = thresholds.get(strategy, {"rouge_l": 0.3, "contains": 0.5})
216 signals = []
217 for metric, score in raw.items():
218 threshold = strat_thresholds.get(metric, 0.5)
219 passed = score >= threshold
220 detail = f"{metric}={score:.3f} vs {threshold:.3f}"
221 suggestion = ""
222 if not passed:
223 suggestion = self._suggestion_for_metric(metric, score, threshold)
224 signals.append(FeedbackSignal(
225 source=strategy,
226 metric=metric,
227 score=score,
228 threshold=threshold,
229 passed=passed,
230 detail=detail,
231 suggestion=suggestion,
232 ))
233 return signals
235 def _suggestion_for_metric(self, metric: str, score: float, threshold: float) -> str:
236 """Generate improvement suggestion based on weak metric."""
237 gap = threshold - score
238 suggestions = {
239 "rouge_l": "Make output more comprehensive; include key phrases from expected answer.",
240 "bleu": "Improve translation accuracy; check terminology and phrasing.",
241 "exact": "Output format doesn't match expected; check structure and delimiters.",
242 "contains": f"Missing key concepts; include: (gap: {gap:.2f})",
243 "semantic": "Semantic meaning differs; rephrase to be closer to expected intent.",
244 "judge": "Output quality below LLM-judge threshold; improve clarity and completeness.",
245 }
246 return suggestions.get(metric, f"Improve {metric} by at least {gap:.2f}")
248 def _refine_task(
249 self,
250 task: str,
251 signals: list[FeedbackSignal],
252 current_score: float,
253 attempt: int,
254 ) -> str:
255 """Enrich task prompt with feedback for next attempt."""
256 weak = [s for s in signals if not s.passed]
257 if not weak:
258 return task
260 weak_metrics = ", ".join(
261 f"{s.metric}({s.score:.2f} < {s.threshold:.2f})" for s in weak
262 )
263 suggestions = "; ".join(s.suggestion for s in weak)
265 reflection = (
266 f"[Retry #{attempt} feedback — score {current_score:.3f}] "
267 f"Weak: {weak_metrics}. {suggestions}"
268 )
270 # Append reflection to task
271 if "---" in task:
272 base, _ = task.split("---", 1)
273 return f"{base.strip()}\n---\n{reflection}"
274 return f"{task}\n---\n{reflection}"