Coverage for agentos/swarm/eval_feedback_loop.py: 30%

130 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2v1.9.4: Eval-Feedback Loop — closes the gap between CompositeScorer and AutoPilot. 

3 

4Wires evaluation scores back into the execution layer, creating a true 

5execute → evaluate → feedback → retry 闭环 (closed loop). 

6""" 

7 

8from __future__ import annotations 

9 

10import asyncio 

11import time 

12from dataclasses import dataclass, field 

13from typing import Any, Callable, Optional 

14 

15 

16@dataclass 

17class FeedbackSignal: 

18 """A signal derived from evaluation that triggers self-healing.""" 

19 

20 source: str # Which scorer produced this 

21 metric: str # metric name (e.g. "rouge_l", "bleu", "judge") 

22 score: float # raw score 

23 threshold: float # expected threshold 

24 passed: bool # did it meet threshold? 

25 detail: str = "" # human-readable detail 

26 suggestion: str = "" # what to improve 

27 

28 

29@dataclass 

30class RetryConfig: 

31 """Configuration for the retry loop.""" 

32 

33 max_retries: int = 3 

34 backoff_base: float = 1.0 # seconds 

35 backoff_multiplier: float = 2.0 

36 score_improvement_min: float = 0.05 # min score gain to consider improvement 

37 timeout: float = 60.0 # total loop timeout (seconds) 

38 

39 

40@dataclass 

41class LoopResult: 

42 """Result of a feedback loop execution.""" 

43 

44 task: str 

45 final_output: Any = None 

46 scores: dict[str, float] = field(default_factory=dict) 

47 attempts: int = 0 

48 best_score: float = 0.0 

49 converged: bool = False 

50 duration: float = 0.0 

51 history: list[dict] = field(default_factory=list) # each attempt trace 

52 

53 

54class EvalFeedbackLoop: 

55 """Connects evaluation scores to AutoPilot-style retry with incremental 

56 prompt refinement. 

57 

58 Usage: 

59 loop = EvalFeedbackLoop(scorer, RetryConfig(max_retries=3)) 

60 result = await loop.run(task, executor_fn, expected_output) 

61 """ 

62 

63 def __init__( 

64 self, 

65 scorer: Any = None, # CompositeScorer / CompositeScorerV2 

66 config: RetryConfig | None = None, 

67 reflection_prompt: str | None = None, 

68 ): 

69 self._scorer = scorer 

70 self._config = config or RetryConfig() 

71 self._reflection_prompt = reflection_prompt or ( 

72 "The previous attempt scored {score:.3f} (threshold: {threshold:.3f}). " 

73 "Weak metrics: {weak_metrics}. " 

74 "Please improve the output focusing on these weaknesses." 

75 ) 

76 

77 async def run( 

78 self, 

79 task: str, 

80 executor: Callable[[str], Any], 

81 expected: str = "", 

82 strategy: str = "general", 

83 ) -> LoopResult: 

84 """Execute task with evaluation-driven retry loop. 

85 

86 Args: 

87 task: original task description 

88 executor: async/sync callable (task_str) → output 

89 expected: expected/reference output for scoring 

90 strategy: scoring strategy (qa/code/summary/translation) 

91 

92 Returns: 

93 LoopResult with final output and convergence info 

94 """ 

95 start = time.time() 

96 result = LoopResult(task=task) 

97 previous_score = 0.0 

98 

99 for attempt in range(1, self._config.max_retries + 1): 

100 # Execute 

101 output = executor(task) 

102 if asyncio.iscoroutine(output): 

103 output = await output 

104 

105 attempt_trace = {"attempt": attempt, "output": str(output)[:500]} 

106 

107 # Score 

108 scores = self._score(output, expected, strategy) 

109 attempt_trace["scores"] = scores 

110 score = scores.get("weighted", 0.0) 

111 passed = scores.get("passed", False) 

112 attempt_trace["passed"] = passed 

113 

114 result.history.append(attempt_trace) 

115 

116 # Track best 

117 if score > result.best_score: 

118 result.best_score = score 

119 result.final_output = output 

120 

121 # Emit feedback signal 

122 signals = self._signals_from_scores(scores, strategy) 

123 attempt_trace["signals"] = [ 

124 {"metric": s.metric, "score": s.score, "passed": s.passed} 

125 for s in signals 

126 ] 

127 

128 # Check convergence 

129 if passed: 

130 result.converged = True 

131 result.attempts = attempt 

132 result.scores = scores 

133 result.duration = time.time() - start 

134 return result 

135 

136 # Improvement check 

137 if attempt > 1 and (score - previous_score) < self._config.score_improvement_min: 

138 result.converged = False 

139 result.attempts = attempt 

140 result.scores = scores 

141 result.final_output = output 

142 result.duration = time.time() - start 

143 return result 

144 

145 previous_score = score 

146 

147 # Refine task prompt for next attempt 

148 task = self._refine_task(task, signals, score, attempt) 

149 attempt_trace["refined_task"] = task 

150 

151 # Backoff 

152 wait = self._config.backoff_base * (self._config.backoff_multiplier ** (attempt - 1)) 

153 if (time.time() - start + wait) > self._config.timeout: 

154 break 

155 await asyncio.sleep(wait) 

156 

157 result.attempts = self._config.max_retries 

158 result.scores = scores 

159 result.duration = time.time() - start 

160 return result 

161 

162 def _score( 

163 self, output: str, expected: str, strategy: str 

164 ) -> dict[str, Any]: 

165 """Score output against expected using CompositeScorer.""" 

166 if not self._scorer or not expected: 

167 # Heuristic scoring when no scorer/reference available 

168 return self._heuristic_score(output, expected) 

169 

170 try: 

171 result = self._scorer.score( 

172 reference=expected, 

173 candidate=str(output), 

174 task=strategy, 

175 ) 

176 return { 

177 "weighted": result.weighted_score, 

178 "passed": result.passed, 

179 "details": result.details, 

180 "raw_scores": result.scores, 

181 } 

182 except Exception: 

183 return self._heuristic_score(output, expected) 

184 

185 def _heuristic_score(self, output: str, expected: str) -> dict: 

186 """Fallback scoring when no scorer is available.""" 

187 if not expected: 

188 # No reference — score based on output quality heuristics 

189 text = str(output) if output else "" 

190 quality = 0.5 

191 if len(text) > 50: 

192 quality += 0.1 

193 if len(text) > 200: 

194 quality += 0.1 

195 if any(kw in text.lower() for kw in ("conclusion", "result", "answer")): 

196 quality += 0.1 

197 return {"weighted": min(quality, 1.0), "passed": quality >= 0.5, "details": "heuristic"} 

198 

199 # Check contains match 

200 contains = 1.0 if expected.lower() in str(output).lower() else 0.0 

201 return {"weighted": contains * 0.5, "passed": contains > 0, "details": "contains_heuristic"} 

202 

203 def _signals_from_scores( 

204 self, scores: dict, strategy: str 

205 ) -> list[FeedbackSignal]: 

206 """Convert score dict to FeedbackSignal list.""" 

207 raw = scores.get("raw_scores", {}) 

208 thresholds = { 

209 "qa": {"rouge_l": 0.3, "contains": 0.5, "judge": 0.55}, 

210 "code": {"rouge_l": 0.1, "exact": 0.3, "contains": 0.5, "judge": 0.55}, 

211 "summary": {"rouge_l": 0.5, "semantic": 0.4, "judge": 0.55}, 

212 "translation": {"bleu": 0.3, "rouge_l": 0.3, "judge": 0.55}, 

213 } 

214 strat_thresholds = thresholds.get(strategy, {"rouge_l": 0.3, "contains": 0.5}) 

215 

216 signals = [] 

217 for metric, score in raw.items(): 

218 threshold = strat_thresholds.get(metric, 0.5) 

219 passed = score >= threshold 

220 detail = f"{metric}={score:.3f} vs {threshold:.3f}" 

221 suggestion = "" 

222 if not passed: 

223 suggestion = self._suggestion_for_metric(metric, score, threshold) 

224 signals.append(FeedbackSignal( 

225 source=strategy, 

226 metric=metric, 

227 score=score, 

228 threshold=threshold, 

229 passed=passed, 

230 detail=detail, 

231 suggestion=suggestion, 

232 )) 

233 return signals 

234 

235 def _suggestion_for_metric(self, metric: str, score: float, threshold: float) -> str: 

236 """Generate improvement suggestion based on weak metric.""" 

237 gap = threshold - score 

238 suggestions = { 

239 "rouge_l": "Make output more comprehensive; include key phrases from expected answer.", 

240 "bleu": "Improve translation accuracy; check terminology and phrasing.", 

241 "exact": "Output format doesn't match expected; check structure and delimiters.", 

242 "contains": f"Missing key concepts; include: (gap: {gap:.2f})", 

243 "semantic": "Semantic meaning differs; rephrase to be closer to expected intent.", 

244 "judge": "Output quality below LLM-judge threshold; improve clarity and completeness.", 

245 } 

246 return suggestions.get(metric, f"Improve {metric} by at least {gap:.2f}") 

247 

248 def _refine_task( 

249 self, 

250 task: str, 

251 signals: list[FeedbackSignal], 

252 current_score: float, 

253 attempt: int, 

254 ) -> str: 

255 """Enrich task prompt with feedback for next attempt.""" 

256 weak = [s for s in signals if not s.passed] 

257 if not weak: 

258 return task 

259 

260 weak_metrics = ", ".join( 

261 f"{s.metric}({s.score:.2f} < {s.threshold:.2f})" for s in weak 

262 ) 

263 suggestions = "; ".join(s.suggestion for s in weak) 

264 

265 reflection = ( 

266 f"[Retry #{attempt} feedback — score {current_score:.3f}] " 

267 f"Weak: {weak_metrics}. {suggestions}" 

268 ) 

269 

270 # Append reflection to task 

271 if "---" in task: 

272 base, _ = task.split("---", 1) 

273 return f"{base.strip()}\n---\n{reflection}" 

274 return f"{task}\n---\n{reflection}"