Coverage for agentos/experiments/runner.py: 43%
145 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS v0.40 Experiments — A/B测试与Prompt实验框架。
3支持:Prompt变体对比、A/B/n测试、结果统计显著性分析、实验报告生成。
4"""
6from __future__ import annotations
8import json
9import time
10import uuid
11from dataclasses import dataclass, field
12from typing import Optional, Any
15@dataclass
16class PromptVariant:
17 """Prompt变体。"""
18 name: str
19 system_prompt: str
20 user_template: str = ""
21 model: str = "auto"
22 temperature: float = 0.7
23 max_tokens: int = 2048
24 metadata: dict = field(default_factory=dict)
27@dataclass
28class TrialResult:
29 """单次试验结果。"""
30 variant_name: str
31 input: str
32 output: str
33 latency_ms: float = 0
34 tokens_used: int = 0
35 cost: float = 0.0
36 error: str = ""
37 score: float = 0.0 # evaluator评分
38 judged_by: str = ""
41@dataclass
42class ExperimentConfig:
43 """实验配置。"""
44 name: str
45 variants: list[PromptVariant]
46 test_inputs: list[str]
47 evaluator: str = "auto" # auto | llm_judge | human | custom
48 trials_per_variant: int = 3
49 shuffle: bool = True
50 metric: str = "accuracy" # accuracy | relevance | creativity | custom
53@dataclass
54class ExperimentReport:
55 """实验报告。"""
56 id: str
57 config: ExperimentConfig
58 results: list[TrialResult]
59 winner: str = ""
60 significance: float = 0.0
61 summary: dict = field(default_factory=dict)
62 created_at: float = field(default_factory=time.time)
65class Evaluator:
66 """评估器 — 自动评分模型输出。"""
68 @staticmethod
69 def llm_judge(output: str, expected: str, criteria: str = "accuracy") -> float:
70 """使用LLM评判输出质量(占位符,实际调用模型)。"""
71 # 生产环境会调用router进行评判
72 # 当前返回启发式分
73 if not expected:
74 return 0.5
76 output_lower = output.lower()
77 expected_lower = expected.lower()
79 # 简单重叠度
80 out_words = set(output_lower.split())
81 exp_words = set(expected_lower.split())
82 if not exp_words:
83 return 0.5
84 overlap = len(out_words & exp_words) / len(exp_words)
86 # 长度惩罚
87 length_ratio = min(len(output_lower), len(expected_lower)) / max(len(output_lower), len(expected_lower), 1)
89 return overlap * 0.7 + length_ratio * 0.3
91 @staticmethod
92 def exact_match(output: str, expected: str) -> float:
93 return 1.0 if output.strip() == expected.strip() else 0.0
95 @staticmethod
96 def contains_all(output: str, keywords: list[str]) -> float:
97 output_lower = output.lower()
98 matches = sum(1 for kw in keywords if kw.lower() in output_lower)
99 return matches / len(keywords) if keywords else 0.5
102class ExperimentRunner:
103 """实验执行器。"""
105 def __init__(self, router=None, cache=None):
106 self.router = router
107 self.cache = cache or None
108 self._reports: dict[str, ExperimentReport] = {}
110 async def run(self, config: ExperimentConfig) -> ExperimentReport:
111 """执行A/B实验。"""
112 import random
113 all_results: list[TrialResult] = []
115 # 构建所有 (variant, input) 组合
116 trials = []
117 for variant in config.variants:
118 for inp in config.test_inputs:
119 for _ in range(config.trials_per_variant):
120 trials.append((variant, inp))
122 if config.shuffle:
123 random.shuffle(trials)
125 for variant, inp in trials:
126 start = time.time()
127 try:
128 if self.router:
129 messages = [
130 {"role": "system", "content": variant.system_prompt},
131 {"role": "user", "content": variant.user_template.format(input=inp) if variant.user_template else inp},
132 ]
133 output = await self.router.call_chat(messages)
134 else:
135 output = f"[模拟输出] 变体 '{variant.name}' 对输入 '{inp[:30]}...' 的响应"
137 latency = (time.time() - start) * 1000
138 score = Evaluator.llm_judge(output, inp) # 可自定义evaluator
140 all_results.append(TrialResult(
141 variant_name=variant.name,
142 input=inp,
143 output=output,
144 latency_ms=latency,
145 score=score,
146 judged_by="auto",
147 ))
148 except Exception as e:
149 all_results.append(TrialResult(
150 variant_name=variant.name, input=inp, output="",
151 error=str(e), score=0.0,
152 ))
154 # 汇总分析
155 summary = self._analyze(all_results, config)
156 winner = self._determine_winner(summary)
158 report = ExperimentReport(
159 id=f"exp_{uuid.uuid4().hex[:8]}",
160 config=config,
161 results=all_results,
162 winner=winner,
163 summary=summary,
164 )
165 self._reports[report.id] = report
166 return report
168 def _analyze(self, results: list[TrialResult], config: ExperimentConfig) -> dict:
169 """统计分析。"""
170 variant_stats = {}
171 for r in results:
172 if r.variant_name not in variant_stats:
173 variant_stats[r.variant_name] = {"scores": [], "latencies": [], "errors": 0, "trials": 0}
174 vs = variant_stats[r.variant_name]
175 if r.error:
176 vs["errors"] += 1
177 else:
178 vs["scores"].append(r.score)
179 vs["latencies"].append(r.latency_ms)
180 vs["trials"] += 1
182 summary = {}
183 for name, vs in variant_stats.items():
184 scores = vs["scores"]
185 latencies = vs["latencies"]
186 summary[name] = {
187 "avg_score": sum(scores) / len(scores) if scores else 0,
188 "max_score": max(scores) if scores else 0,
189 "min_score": min(scores) if scores else 0,
190 "std_score": self._std(scores) if scores else 0,
191 "avg_latency_ms": sum(latencies) / len(latencies) if latencies else 0,
192 "error_rate": vs["errors"] / vs["trials"] if vs["trials"] else 0,
193 "trials": vs["trials"],
194 }
195 return summary
197 @staticmethod
198 def _determine_winner(summary: dict) -> str:
199 best_name = ""
200 best_score = -1.0
201 for name, stats in summary.items():
202 penalty = stats["error_rate"] * 0.5
203 adjusted = stats["avg_score"] * (1 - penalty)
204 if adjusted > best_score:
205 best_score = adjusted
206 best_name = name
207 return best_name
209 @staticmethod
210 def _std(values: list[float]) -> float:
211 if len(values) < 2:
212 return 0.0
213 mean = sum(values) / len(values)
214 return (sum((v - mean) ** 2 for v in values) / len(values)) ** 0.5
216 def get_report(self, report_id: str) -> Optional[ExperimentReport]:
217 return self._reports.get(report_id)
219 def list_reports(self) -> list[dict]:
220 return [{"id": rid, "name": r.config.name, "winner": r.winner, "variants": len(r.config.variants)}
221 for rid, r in self._reports.items()]
223 def generate_markdown_report(self, report: ExperimentReport) -> str:
224 """生成Markdown格式实验报告。"""
225 lines = [
226 f"# 实验报告: {report.config.name}",
227 f"**实验ID**: {report.id}",
228 f"**变体数**: {len(report.config.variants)}",
229 f"**测试输入数**: {len(report.config.test_inputs)}",
230 f"**每变体试验次数**: {report.config.trials_per_variant}",
231 f"**胜出变体**: **{report.winner}**",
232 "",
233 "## 统计摘要",
234 "",
235 "| 变体 | 平均分 | 最高分 | 最低分 | 标准差 | 平均延迟(ms) | 错误率 | 试验数 |",
236 "|------|--------|--------|--------|--------|-------------|--------|--------|",
237 ]
238 for name, stats in report.summary.items():
239 marker = " **← 胜出**" if name == report.winner else ""
240 lines.append(
241 f"| {name}{marker} | {stats['avg_score']:.3f} | {stats['max_score']:.3f} | "
242 f"{stats['min_score']:.3f} | {stats['std_score']:.3f} | {stats['avg_latency_ms']:.0f} | "
243 f"{stats['error_rate']:.1%} | {stats['trials']} |"
244 )
246 lines += ["", "## 变体配置", ""]
247 for v in report.config.variants:
248 lines += [
249 f"### {v.name}",
250 f"- 模型: {v.model}",
251 f"- 温度: {v.temperature}",
252 f"- Max Tokens: {v.max_tokens}",
253 f"```\n{v.system_prompt[:200]}...\n```",
254 "",
255 ]
257 return "\n".join(lines)