Coverage for agentos/evaluation/scorers.py: 25%

176 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS v0.70 — 评测打分系统。 

3基因来源: ROUGE/BLEU 经典算法 + 语义相似度 

4 

5评分策略: 

6- ROUGE-L: 最长公共子序列召回率 (摘要质量) 

7- BLEU: n-gram精确率 (翻译质量) 

8- Semantic: 基于embedding的语义相似度 

9- Exact: 精确匹配 

10- Contains: 包含匹配 

11""" 

12 

13from __future__ import annotations 

14 

15import math 

16import re 

17from collections import Counter 

18from dataclasses import dataclass, field 

19from typing import Any, Callable 

20 

21 

22# ── ROUGE-L ───────────────────────────────────── 

23 

24def _lcs_length(x: list, y: list) -> int: 

25 """最长公共子序列长度(DP优化版)。""" 

26 if len(x) < len(y): 

27 x, y = y, x 

28 prev = [0] * (len(y) + 1) 

29 for i in range(1, len(x) + 1): 

30 curr = [0] * (len(y) + 1) 

31 for j in range(1, len(y) + 1): 

32 if x[i - 1] == y[j - 1]: 

33 curr[j] = prev[j - 1] + 1 

34 else: 

35 curr[j] = max(prev[j], curr[j - 1]) 

36 prev = curr 

37 return prev[len(y)] 

38 

39 

40def rouge_l(reference: str, candidate: str) -> float: 

41 """ROUGE-L F1 score (character-level)。""" 

42 if not reference or not candidate: 

43 return 0.0 

44 

45 ref_chars = list(reference) 

46 cand_chars = list(candidate) 

47 lcs = _lcs_length(ref_chars, cand_chars) 

48 

49 if len(cand_chars) == 0 or len(ref_chars) == 0: 

50 return 0.0 

51 

52 recall = lcs / len(ref_chars) 

53 precision = lcs / len(cand_chars) 

54 

55 if recall + precision == 0: 

56 return 0.0 

57 return 2 * recall * precision / (recall + precision) 

58 

59 

60# ── BLEU ──────────────────────────────────────── 

61 

62def _ngrams(tokens: list[str], n: int) -> Counter: 

63 return Counter(tuple(tokens[i:i + n]) for i in range(len(tokens) - n + 1)) 

64 

65 

66def bleu(reference: str, candidate: str, max_n: int = 4, smoothing: bool = True) -> float: 

67 """BLEU score (token-level, with smoothing for short texts).""" 

68 ref_tokens = _tokenize(reference) 

69 cand_tokens = _tokenize(candidate) 

70 

71 if not cand_tokens or not ref_tokens: 

72 return 0.0 

73 

74 precisions = [] 

75 for n in range(1, max_n + 1): 

76 ref_ngrams = _ngrams(ref_tokens, n) 

77 cand_ngrams = _ngrams(cand_tokens, n) 

78 

79 if not cand_ngrams: 

80 if smoothing: 

81 precisions.append(1.0 / (2 ** n)) # Laplace-like decay 

82 else: 

83 precisions.append(0.0) 

84 continue 

85 

86 clipped = sum(min(cand_ngrams[ng], ref_ngrams.get(ng, 0)) for ng in cand_ngrams) 

87 prec = clipped / sum(cand_ngrams.values()) 

88 precisions.append(prec) 

89 

90 if any(p == 0 for p in precisions): 

91 if smoothing: 

92 # Method 1 smoothing: replace zeros with small epsilon 

93 precisions = [p if p > 0 else 1.0 / (2 ** i) for i, p in enumerate(precisions)] 

94 else: 

95 return 0.0 

96 

97 # Brevity penalty 

98 bp = min(1.0, math.exp(1 - len(ref_tokens) / max(len(cand_tokens), 1))) 

99 

100 # Geometric mean of n-gram precisions 

101 log_sum = sum(math.log(p) for p in precisions) 

102 return bp * math.exp(log_sum / max_n) 

103 

104 

105def _tokenize(text: str) -> list[str]: 

106 """英文分词 + 数字/标点分离。""" 

107 # Split on whitespace, keep punctuation as separate tokens for Chinese 

108 text = text.lower() 

109 # For Chinese: character-level 

110 if re.search(r'[\u4e00-\u9fff]', text): 

111 tokens = [] 

112 for ch in text: 

113 if ch.strip(): 

114 tokens.append(ch) 

115 return tokens 

116 # English 

117 return re.findall(r'\w+|[^\w\s]', text) 

118 

119 

120# ── Semantic Similarity ───────────────────────── 

121 

122def semantic_similarity(candidate: str, reference: str, embedder: Any = None) -> float: 

123 """ 

124 基于embedding的语义相似度(cosine similarity)。 

125 需要传入embedder实例或使用默认LocalEmbedder。 

126 Falls back to character Jaccard similarity if embedder unavailable. 

127 """ 

128 if not candidate or not reference: 

129 return 0.0 

130 

131 if embedder is None: 

132 from agentos.cache.embedder import LocalEmbedder 

133 embedder = LocalEmbedder() 

134 

135 try: 

136 emb_cand = embedder.embed(candidate) 

137 emb_ref = embedder.embed(reference) 

138 from agentos.cache.embedder import cosine_similarity as cos_sim 

139 return float(cos_sim(emb_cand, emb_ref)) 

140 except Exception: 

141 # Fallback: character-level Jaccard similarity 

142 set_a = set(candidate.lower()) 

143 set_b = set(reference.lower()) 

144 if not set_a or not set_b: 

145 return 0.0 

146 intersection = set_a & set_b 

147 union = set_a | set_b 

148 return len(intersection) / len(union) if union else 0.0 

149 

150 

151# ── Exact / Contains ──────────────────────────── 

152 

153def exact_match(reference: str, candidate: str) -> float: 

154 """精确匹配:返回 0.0 或 1.0。""" 

155 return 1.0 if reference.strip() == candidate.strip() else 0.0 

156 

157 

158def contains_match(reference: str, candidate: str) -> float: 

159 """候选文本是否包含参考文本(忽略大小写)。""" 

160 return 1.0 if reference.lower() in candidate.lower() else 0.0 

161 

162 

163# ── Composite Scorer ──────────────────────────── 

164 

165@dataclass 

166class ScoringStrategy: 

167 """评分配置策略。""" 

168 

169 name: str = "composite" 

170 weights: dict[str, float] = field(default_factory=lambda: { 

171 "rouge_l": 0.3, 

172 "bleu": 0.2, 

173 "exact": 0.2, 

174 "contains": 0.3, 

175 }) 

176 pass_threshold: float = 0.6 

177 

178 

179@dataclass 

180class ScoreResult: 

181 """评分结果。""" 

182 

183 reference: str 

184 candidate: str 

185 scores: dict[str, float] = field(default_factory=dict) 

186 weighted_score: float = 0.0 

187 passed: bool = False 

188 details: str = "" 

189 

190 def to_dict(self) -> dict: 

191 return { 

192 "scores": self.scores, 

193 "weighted_score": round(self.weighted_score, 4), 

194 "passed": self.passed, 

195 "details": self.details, 

196 } 

197 

198 

199class CompositeScorer: 

200 """ 

201 复合评分器 — 多策略加权。 

202 """ 

203 

204 def __init__(self, strategy: ScoringStrategy | None = None): 

205 self.strategy = strategy or ScoringStrategy() 

206 

207 def score(self, reference: str, candidate: str, embedder: Any = None) -> ScoreResult: 

208 """对候选文本打分。""" 

209 scores: dict[str, float] = {} 

210 

211 # ROUGE-L 

212 if "rouge_l" in self.strategy.weights: 

213 scores["rouge_l"] = rouge_l(reference, candidate) 

214 

215 # BLEU 

216 if "bleu" in self.strategy.weights: 

217 scores["bleu"] = bleu(reference, candidate) 

218 

219 # Exact 

220 if "exact" in self.strategy.weights: 

221 scores["exact"] = exact_match(reference, candidate) 

222 

223 # Contains 

224 if "contains" in self.strategy.weights: 

225 scores["contains"] = contains_match(reference, candidate) 

226 

227 # Semantic 

228 if "semantic" in self.strategy.weights or self.strategy.weights.get("semantic", 0) > 0: 

229 scores["semantic"] = semantic_similarity(candidate, reference, embedder) 

230 

231 # Weighted 

232 weighted = sum( 

233 scores.get(k, 0) * w 

234 for k, w in self.strategy.weights.items() 

235 ) 

236 

237 passed = weighted >= self.strategy.pass_threshold 

238 details = ", ".join(f"{k}={v:.3f}" for k, v in scores.items()) 

239 

240 return ScoreResult( 

241 reference=reference, 

242 candidate=candidate, 

243 scores=scores, 

244 weighted_score=weighted, 

245 passed=passed, 

246 details=details, 

247 ) 

248 

249 def batch_score( 

250 self, 

251 pairs: list[tuple[str, str]], 

252 embedder: Any = None, 

253 ) -> list[ScoreResult]: 

254 """批量评分。""" 

255 return [self.score(ref, cand, embedder) for ref, cand in pairs] 

256 

257 

258# ── Pre-built Strategies ──────────────────────── 

259 

260STRATEGY_CODE_GEN = ScoringStrategy( 

261 name="code_generation", 

262 weights={"rouge_l": 0.1, "bleu": 0.1, "exact": 0.3, "contains": 0.5}, 

263 pass_threshold=0.5, 

264) 

265 

266STRATEGY_QA = ScoringStrategy( 

267 name="question_answering", 

268 weights={"rouge_l": 0.3, "contains": 0.5, "exact": 0.2}, 

269 pass_threshold=0.5, 

270) 

271 

272STRATEGY_SUMMARY = ScoringStrategy( 

273 name="summarization", 

274 weights={"rouge_l": 0.6, "bleu": 0.1, "semantic": 0.3}, 

275 pass_threshold=0.25, 

276) 

277 

278STRATEGY_TRANSLATION = ScoringStrategy( 

279 name="translation", 

280 weights={"bleu": 0.6, "rouge_l": 0.2, "semantic": 0.2}, 

281 pass_threshold=0.30, 

282) 

283 

284 

285# ── LLM‑as‑Judge ──────────────────────────────── 

286 

287_JUDGE_PROMPT = """You are an evaluation judge. Grade the following answer against the reference. 

288Output ONLY a number between 0.0 and 1.0 and a one-sentence reason. 

289 

290Task: {task} 

291Reference (expected): {reference} 

292Candidate (actual): {candidate} 

293 

294Score (0.0-1.0): 

295Reason:""" 

296 

297 

298def llm_judge(reference: str, candidate: str, task: str = "general", 

299 model: str = "gpt-4o-mini", api_key: str = "") -> float: 

300 """ 

301 LLM‑as‑Judge: 用 LLM 评估候选答案与参考答案的一致性。 

302 需要 OPENAI_API_KEY (或兼容 endpoint)。 

303 Returns 0.0 on any error / no key. 

304 """ 

305 if not api_key: 

306 import os 

307 api_key = os.environ.get("OPENAI_API_KEY", "") 

308 

309 if not api_key: 

310 return 0.0 

311 

312 prompt = _JUDGE_PROMPT.format(task=task, reference=reference, candidate=candidate) 

313 

314 try: 

315 import requests 

316 resp = requests.post( 

317 "https://api.openai.com/v1/chat/completions", 

318 headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, 

319 json={ 

320 "model": model, 

321 "messages": [{"role": "user", "content": prompt}], 

322 "temperature": 0.0, 

323 "max_tokens": 50, 

324 }, 

325 timeout=15, 

326 ) 

327 if resp.status_code != 200: 

328 return 0.0 

329 

330 data = resp.json() 

331 text = data["choices"][0]["message"]["content"].strip() 

332 

333 # Extract first float from response 

334 import re as _re 

335 m = _re.search(r"(\d+\.?\d*)", text) 

336 if m: 

337 return max(0.0, min(1.0, float(m.group(1)))) 

338 

339 return 0.0 

340 except Exception: 

341 return 0.0 

342 

343 

344# ── Strategy w/ LLM‑Judge ─────────────────────── 

345 

346STRATEGY_QA_JUDGE = ScoringStrategy( 

347 name="qa_with_judge", 

348 weights={"rouge_l": 0.2, "contains": 0.3, "exact": 0.1, "judge": 0.4}, 

349 pass_threshold=0.55, 

350) 

351 

352STRATEGY_SUMMARY_JUDGE = ScoringStrategy( 

353 name="summary_with_judge", 

354 weights={"rouge_l": 0.3, "bleu": 0.1, "judge": 0.6}, 

355 pass_threshold=0.55, 

356) 

357 

358STRATEGY_CODE_JUDGE = ScoringStrategy( 

359 name="code_with_judge", 

360 weights={"rouge_l": 0.05, "bleu": 0.05, "exact": 0.2, "contains": 0.3, "judge": 0.4}, 

361 pass_threshold=0.55, 

362) 

363 

364 

365class CompositeScorerV2(CompositeScorer): 

366 """v2 scorer with optional LLM‑as‑Judge.""" 

367 

368 def __init__(self, strategy: ScoringStrategy | None = None, 

369 llm_model: str = "gpt-4o-mini"): 

370 super().__init__(strategy) 

371 self._llm_model = llm_model 

372 

373 def score(self, reference: str, candidate: str, embedder: Any = None, 

374 task: str = "general") -> ScoreResult: 

375 scores: dict[str, float] = {} 

376 

377 if "rouge_l" in self.strategy.weights: 

378 scores["rouge_l"] = rouge_l(reference, candidate) 

379 if "bleu" in self.strategy.weights: 

380 scores["bleu"] = bleu(reference, candidate) 

381 if "exact" in self.strategy.weights: 

382 scores["exact"] = exact_match(reference, candidate) 

383 if "contains" in self.strategy.weights: 

384 scores["contains"] = contains_match(reference, candidate) 

385 if "semantic" in self.strategy.weights: 

386 scores["semantic"] = semantic_similarity(candidate, reference, embedder) 

387 if "judge" in self.strategy.weights: 

388 scores["judge"] = llm_judge(reference, candidate, task=task, model=self._llm_model) 

389 

390 weighted = sum(scores.get(k, 0) * w for k, w in self.strategy.weights.items()) 

391 passed = weighted >= self.strategy.pass_threshold 

392 details = ", ".join(f"{k}={v:.3f}" for k, v in scores.items()) 

393 

394 return ScoreResult( 

395 reference=reference, candidate=candidate, 

396 scores=scores, weighted_score=weighted, 

397 passed=passed, details=details, 

398 )