Coverage for agentos/evaluation/scorers.py: 25%
176 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS v0.70 — 评测打分系统。
3基因来源: ROUGE/BLEU 经典算法 + 语义相似度
5评分策略:
6- ROUGE-L: 最长公共子序列召回率 (摘要质量)
7- BLEU: n-gram精确率 (翻译质量)
8- Semantic: 基于embedding的语义相似度
9- Exact: 精确匹配
10- Contains: 包含匹配
11"""
13from __future__ import annotations
15import math
16import re
17from collections import Counter
18from dataclasses import dataclass, field
19from typing import Any, Callable
22# ── ROUGE-L ─────────────────────────────────────
24def _lcs_length(x: list, y: list) -> int:
25 """最长公共子序列长度(DP优化版)。"""
26 if len(x) < len(y):
27 x, y = y, x
28 prev = [0] * (len(y) + 1)
29 for i in range(1, len(x) + 1):
30 curr = [0] * (len(y) + 1)
31 for j in range(1, len(y) + 1):
32 if x[i - 1] == y[j - 1]:
33 curr[j] = prev[j - 1] + 1
34 else:
35 curr[j] = max(prev[j], curr[j - 1])
36 prev = curr
37 return prev[len(y)]
40def rouge_l(reference: str, candidate: str) -> float:
41 """ROUGE-L F1 score (character-level)。"""
42 if not reference or not candidate:
43 return 0.0
45 ref_chars = list(reference)
46 cand_chars = list(candidate)
47 lcs = _lcs_length(ref_chars, cand_chars)
49 if len(cand_chars) == 0 or len(ref_chars) == 0:
50 return 0.0
52 recall = lcs / len(ref_chars)
53 precision = lcs / len(cand_chars)
55 if recall + precision == 0:
56 return 0.0
57 return 2 * recall * precision / (recall + precision)
60# ── BLEU ────────────────────────────────────────
62def _ngrams(tokens: list[str], n: int) -> Counter:
63 return Counter(tuple(tokens[i:i + n]) for i in range(len(tokens) - n + 1))
66def bleu(reference: str, candidate: str, max_n: int = 4, smoothing: bool = True) -> float:
67 """BLEU score (token-level, with smoothing for short texts)."""
68 ref_tokens = _tokenize(reference)
69 cand_tokens = _tokenize(candidate)
71 if not cand_tokens or not ref_tokens:
72 return 0.0
74 precisions = []
75 for n in range(1, max_n + 1):
76 ref_ngrams = _ngrams(ref_tokens, n)
77 cand_ngrams = _ngrams(cand_tokens, n)
79 if not cand_ngrams:
80 if smoothing:
81 precisions.append(1.0 / (2 ** n)) # Laplace-like decay
82 else:
83 precisions.append(0.0)
84 continue
86 clipped = sum(min(cand_ngrams[ng], ref_ngrams.get(ng, 0)) for ng in cand_ngrams)
87 prec = clipped / sum(cand_ngrams.values())
88 precisions.append(prec)
90 if any(p == 0 for p in precisions):
91 if smoothing:
92 # Method 1 smoothing: replace zeros with small epsilon
93 precisions = [p if p > 0 else 1.0 / (2 ** i) for i, p in enumerate(precisions)]
94 else:
95 return 0.0
97 # Brevity penalty
98 bp = min(1.0, math.exp(1 - len(ref_tokens) / max(len(cand_tokens), 1)))
100 # Geometric mean of n-gram precisions
101 log_sum = sum(math.log(p) for p in precisions)
102 return bp * math.exp(log_sum / max_n)
105def _tokenize(text: str) -> list[str]:
106 """英文分词 + 数字/标点分离。"""
107 # Split on whitespace, keep punctuation as separate tokens for Chinese
108 text = text.lower()
109 # For Chinese: character-level
110 if re.search(r'[\u4e00-\u9fff]', text):
111 tokens = []
112 for ch in text:
113 if ch.strip():
114 tokens.append(ch)
115 return tokens
116 # English
117 return re.findall(r'\w+|[^\w\s]', text)
120# ── Semantic Similarity ─────────────────────────
122def semantic_similarity(candidate: str, reference: str, embedder: Any = None) -> float:
123 """
124 基于embedding的语义相似度(cosine similarity)。
125 需要传入embedder实例或使用默认LocalEmbedder。
126 Falls back to character Jaccard similarity if embedder unavailable.
127 """
128 if not candidate or not reference:
129 return 0.0
131 if embedder is None:
132 from agentos.cache.embedder import LocalEmbedder
133 embedder = LocalEmbedder()
135 try:
136 emb_cand = embedder.embed(candidate)
137 emb_ref = embedder.embed(reference)
138 from agentos.cache.embedder import cosine_similarity as cos_sim
139 return float(cos_sim(emb_cand, emb_ref))
140 except Exception:
141 # Fallback: character-level Jaccard similarity
142 set_a = set(candidate.lower())
143 set_b = set(reference.lower())
144 if not set_a or not set_b:
145 return 0.0
146 intersection = set_a & set_b
147 union = set_a | set_b
148 return len(intersection) / len(union) if union else 0.0
151# ── Exact / Contains ────────────────────────────
153def exact_match(reference: str, candidate: str) -> float:
154 """精确匹配:返回 0.0 或 1.0。"""
155 return 1.0 if reference.strip() == candidate.strip() else 0.0
158def contains_match(reference: str, candidate: str) -> float:
159 """候选文本是否包含参考文本(忽略大小写)。"""
160 return 1.0 if reference.lower() in candidate.lower() else 0.0
163# ── Composite Scorer ────────────────────────────
165@dataclass
166class ScoringStrategy:
167 """评分配置策略。"""
169 name: str = "composite"
170 weights: dict[str, float] = field(default_factory=lambda: {
171 "rouge_l": 0.3,
172 "bleu": 0.2,
173 "exact": 0.2,
174 "contains": 0.3,
175 })
176 pass_threshold: float = 0.6
179@dataclass
180class ScoreResult:
181 """评分结果。"""
183 reference: str
184 candidate: str
185 scores: dict[str, float] = field(default_factory=dict)
186 weighted_score: float = 0.0
187 passed: bool = False
188 details: str = ""
190 def to_dict(self) -> dict:
191 return {
192 "scores": self.scores,
193 "weighted_score": round(self.weighted_score, 4),
194 "passed": self.passed,
195 "details": self.details,
196 }
199class CompositeScorer:
200 """
201 复合评分器 — 多策略加权。
202 """
204 def __init__(self, strategy: ScoringStrategy | None = None):
205 self.strategy = strategy or ScoringStrategy()
207 def score(self, reference: str, candidate: str, embedder: Any = None) -> ScoreResult:
208 """对候选文本打分。"""
209 scores: dict[str, float] = {}
211 # ROUGE-L
212 if "rouge_l" in self.strategy.weights:
213 scores["rouge_l"] = rouge_l(reference, candidate)
215 # BLEU
216 if "bleu" in self.strategy.weights:
217 scores["bleu"] = bleu(reference, candidate)
219 # Exact
220 if "exact" in self.strategy.weights:
221 scores["exact"] = exact_match(reference, candidate)
223 # Contains
224 if "contains" in self.strategy.weights:
225 scores["contains"] = contains_match(reference, candidate)
227 # Semantic
228 if "semantic" in self.strategy.weights or self.strategy.weights.get("semantic", 0) > 0:
229 scores["semantic"] = semantic_similarity(candidate, reference, embedder)
231 # Weighted
232 weighted = sum(
233 scores.get(k, 0) * w
234 for k, w in self.strategy.weights.items()
235 )
237 passed = weighted >= self.strategy.pass_threshold
238 details = ", ".join(f"{k}={v:.3f}" for k, v in scores.items())
240 return ScoreResult(
241 reference=reference,
242 candidate=candidate,
243 scores=scores,
244 weighted_score=weighted,
245 passed=passed,
246 details=details,
247 )
249 def batch_score(
250 self,
251 pairs: list[tuple[str, str]],
252 embedder: Any = None,
253 ) -> list[ScoreResult]:
254 """批量评分。"""
255 return [self.score(ref, cand, embedder) for ref, cand in pairs]
258# ── Pre-built Strategies ────────────────────────
260STRATEGY_CODE_GEN = ScoringStrategy(
261 name="code_generation",
262 weights={"rouge_l": 0.1, "bleu": 0.1, "exact": 0.3, "contains": 0.5},
263 pass_threshold=0.5,
264)
266STRATEGY_QA = ScoringStrategy(
267 name="question_answering",
268 weights={"rouge_l": 0.3, "contains": 0.5, "exact": 0.2},
269 pass_threshold=0.5,
270)
272STRATEGY_SUMMARY = ScoringStrategy(
273 name="summarization",
274 weights={"rouge_l": 0.6, "bleu": 0.1, "semantic": 0.3},
275 pass_threshold=0.25,
276)
278STRATEGY_TRANSLATION = ScoringStrategy(
279 name="translation",
280 weights={"bleu": 0.6, "rouge_l": 0.2, "semantic": 0.2},
281 pass_threshold=0.30,
282)
285# ── LLM‑as‑Judge ────────────────────────────────
287_JUDGE_PROMPT = """You are an evaluation judge. Grade the following answer against the reference.
288Output ONLY a number between 0.0 and 1.0 and a one-sentence reason.
290Task: {task}
291Reference (expected): {reference}
292Candidate (actual): {candidate}
294Score (0.0-1.0):
295Reason:"""
298def llm_judge(reference: str, candidate: str, task: str = "general",
299 model: str = "gpt-4o-mini", api_key: str = "") -> float:
300 """
301 LLM‑as‑Judge: 用 LLM 评估候选答案与参考答案的一致性。
302 需要 OPENAI_API_KEY (或兼容 endpoint)。
303 Returns 0.0 on any error / no key.
304 """
305 if not api_key:
306 import os
307 api_key = os.environ.get("OPENAI_API_KEY", "")
309 if not api_key:
310 return 0.0
312 prompt = _JUDGE_PROMPT.format(task=task, reference=reference, candidate=candidate)
314 try:
315 import requests
316 resp = requests.post(
317 "https://api.openai.com/v1/chat/completions",
318 headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
319 json={
320 "model": model,
321 "messages": [{"role": "user", "content": prompt}],
322 "temperature": 0.0,
323 "max_tokens": 50,
324 },
325 timeout=15,
326 )
327 if resp.status_code != 200:
328 return 0.0
330 data = resp.json()
331 text = data["choices"][0]["message"]["content"].strip()
333 # Extract first float from response
334 import re as _re
335 m = _re.search(r"(\d+\.?\d*)", text)
336 if m:
337 return max(0.0, min(1.0, float(m.group(1))))
339 return 0.0
340 except Exception:
341 return 0.0
344# ── Strategy w/ LLM‑Judge ───────────────────────
346STRATEGY_QA_JUDGE = ScoringStrategy(
347 name="qa_with_judge",
348 weights={"rouge_l": 0.2, "contains": 0.3, "exact": 0.1, "judge": 0.4},
349 pass_threshold=0.55,
350)
352STRATEGY_SUMMARY_JUDGE = ScoringStrategy(
353 name="summary_with_judge",
354 weights={"rouge_l": 0.3, "bleu": 0.1, "judge": 0.6},
355 pass_threshold=0.55,
356)
358STRATEGY_CODE_JUDGE = ScoringStrategy(
359 name="code_with_judge",
360 weights={"rouge_l": 0.05, "bleu": 0.05, "exact": 0.2, "contains": 0.3, "judge": 0.4},
361 pass_threshold=0.55,
362)
365class CompositeScorerV2(CompositeScorer):
366 """v2 scorer with optional LLM‑as‑Judge."""
368 def __init__(self, strategy: ScoringStrategy | None = None,
369 llm_model: str = "gpt-4o-mini"):
370 super().__init__(strategy)
371 self._llm_model = llm_model
373 def score(self, reference: str, candidate: str, embedder: Any = None,
374 task: str = "general") -> ScoreResult:
375 scores: dict[str, float] = {}
377 if "rouge_l" in self.strategy.weights:
378 scores["rouge_l"] = rouge_l(reference, candidate)
379 if "bleu" in self.strategy.weights:
380 scores["bleu"] = bleu(reference, candidate)
381 if "exact" in self.strategy.weights:
382 scores["exact"] = exact_match(reference, candidate)
383 if "contains" in self.strategy.weights:
384 scores["contains"] = contains_match(reference, candidate)
385 if "semantic" in self.strategy.weights:
386 scores["semantic"] = semantic_similarity(candidate, reference, embedder)
387 if "judge" in self.strategy.weights:
388 scores["judge"] = llm_judge(reference, candidate, task=task, model=self._llm_model)
390 weighted = sum(scores.get(k, 0) * w for k, w in self.strategy.weights.items())
391 passed = weighted >= self.strategy.pass_threshold
392 details = ", ".join(f"{k}={v:.3f}" for k, v in scores.items())
394 return ScoreResult(
395 reference=reference, candidate=candidate,
396 scores=scores, weighted_score=weighted,
397 passed=passed, details=details,
398 )