Coverage for agentos/experiments/runner.py: 43%

145 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS v0.40 Experiments — A/B测试与Prompt实验框架。 

3支持:Prompt变体对比、A/B/n测试、结果统计显著性分析、实验报告生成。 

4""" 

5 

6from __future__ import annotations 

7 

8import json 

9import time 

10import uuid 

11from dataclasses import dataclass, field 

12from typing import Optional, Any 

13 

14 

15@dataclass 

16class PromptVariant: 

17 """Prompt变体。""" 

18 name: str 

19 system_prompt: str 

20 user_template: str = "" 

21 model: str = "auto" 

22 temperature: float = 0.7 

23 max_tokens: int = 2048 

24 metadata: dict = field(default_factory=dict) 

25 

26 

27@dataclass 

28class TrialResult: 

29 """单次试验结果。""" 

30 variant_name: str 

31 input: str 

32 output: str 

33 latency_ms: float = 0 

34 tokens_used: int = 0 

35 cost: float = 0.0 

36 error: str = "" 

37 score: float = 0.0 # evaluator评分 

38 judged_by: str = "" 

39 

40 

41@dataclass 

42class ExperimentConfig: 

43 """实验配置。""" 

44 name: str 

45 variants: list[PromptVariant] 

46 test_inputs: list[str] 

47 evaluator: str = "auto" # auto | llm_judge | human | custom 

48 trials_per_variant: int = 3 

49 shuffle: bool = True 

50 metric: str = "accuracy" # accuracy | relevance | creativity | custom 

51 

52 

53@dataclass 

54class ExperimentReport: 

55 """实验报告。""" 

56 id: str 

57 config: ExperimentConfig 

58 results: list[TrialResult] 

59 winner: str = "" 

60 significance: float = 0.0 

61 summary: dict = field(default_factory=dict) 

62 created_at: float = field(default_factory=time.time) 

63 

64 

65class Evaluator: 

66 """评估器 — 自动评分模型输出。""" 

67 

68 @staticmethod 

69 def llm_judge(output: str, expected: str, criteria: str = "accuracy") -> float: 

70 """使用LLM评判输出质量(占位符,实际调用模型)。""" 

71 # 生产环境会调用router进行评判 

72 # 当前返回启发式分 

73 if not expected: 

74 return 0.5 

75 

76 output_lower = output.lower() 

77 expected_lower = expected.lower() 

78 

79 # 简单重叠度 

80 out_words = set(output_lower.split()) 

81 exp_words = set(expected_lower.split()) 

82 if not exp_words: 

83 return 0.5 

84 overlap = len(out_words & exp_words) / len(exp_words) 

85 

86 # 长度惩罚 

87 length_ratio = min(len(output_lower), len(expected_lower)) / max(len(output_lower), len(expected_lower), 1) 

88 

89 return overlap * 0.7 + length_ratio * 0.3 

90 

91 @staticmethod 

92 def exact_match(output: str, expected: str) -> float: 

93 return 1.0 if output.strip() == expected.strip() else 0.0 

94 

95 @staticmethod 

96 def contains_all(output: str, keywords: list[str]) -> float: 

97 output_lower = output.lower() 

98 matches = sum(1 for kw in keywords if kw.lower() in output_lower) 

99 return matches / len(keywords) if keywords else 0.5 

100 

101 

102class ExperimentRunner: 

103 """实验执行器。""" 

104 

105 def __init__(self, router=None, cache=None): 

106 self.router = router 

107 self.cache = cache or None 

108 self._reports: dict[str, ExperimentReport] = {} 

109 

110 async def run(self, config: ExperimentConfig) -> ExperimentReport: 

111 """执行A/B实验。""" 

112 import random 

113 all_results: list[TrialResult] = [] 

114 

115 # 构建所有 (variant, input) 组合 

116 trials = [] 

117 for variant in config.variants: 

118 for inp in config.test_inputs: 

119 for _ in range(config.trials_per_variant): 

120 trials.append((variant, inp)) 

121 

122 if config.shuffle: 

123 random.shuffle(trials) 

124 

125 for variant, inp in trials: 

126 start = time.time() 

127 try: 

128 if self.router: 

129 messages = [ 

130 {"role": "system", "content": variant.system_prompt}, 

131 {"role": "user", "content": variant.user_template.format(input=inp) if variant.user_template else inp}, 

132 ] 

133 output = await self.router.call_chat(messages) 

134 else: 

135 output = f"[模拟输出] 变体 '{variant.name}' 对输入 '{inp[:30]}...' 的响应" 

136 

137 latency = (time.time() - start) * 1000 

138 score = Evaluator.llm_judge(output, inp) # 可自定义evaluator 

139 

140 all_results.append(TrialResult( 

141 variant_name=variant.name, 

142 input=inp, 

143 output=output, 

144 latency_ms=latency, 

145 score=score, 

146 judged_by="auto", 

147 )) 

148 except Exception as e: 

149 all_results.append(TrialResult( 

150 variant_name=variant.name, input=inp, output="", 

151 error=str(e), score=0.0, 

152 )) 

153 

154 # 汇总分析 

155 summary = self._analyze(all_results, config) 

156 winner = self._determine_winner(summary) 

157 

158 report = ExperimentReport( 

159 id=f"exp_{uuid.uuid4().hex[:8]}", 

160 config=config, 

161 results=all_results, 

162 winner=winner, 

163 summary=summary, 

164 ) 

165 self._reports[report.id] = report 

166 return report 

167 

168 def _analyze(self, results: list[TrialResult], config: ExperimentConfig) -> dict: 

169 """统计分析。""" 

170 variant_stats = {} 

171 for r in results: 

172 if r.variant_name not in variant_stats: 

173 variant_stats[r.variant_name] = {"scores": [], "latencies": [], "errors": 0, "trials": 0} 

174 vs = variant_stats[r.variant_name] 

175 if r.error: 

176 vs["errors"] += 1 

177 else: 

178 vs["scores"].append(r.score) 

179 vs["latencies"].append(r.latency_ms) 

180 vs["trials"] += 1 

181 

182 summary = {} 

183 for name, vs in variant_stats.items(): 

184 scores = vs["scores"] 

185 latencies = vs["latencies"] 

186 summary[name] = { 

187 "avg_score": sum(scores) / len(scores) if scores else 0, 

188 "max_score": max(scores) if scores else 0, 

189 "min_score": min(scores) if scores else 0, 

190 "std_score": self._std(scores) if scores else 0, 

191 "avg_latency_ms": sum(latencies) / len(latencies) if latencies else 0, 

192 "error_rate": vs["errors"] / vs["trials"] if vs["trials"] else 0, 

193 "trials": vs["trials"], 

194 } 

195 return summary 

196 

197 @staticmethod 

198 def _determine_winner(summary: dict) -> str: 

199 best_name = "" 

200 best_score = -1.0 

201 for name, stats in summary.items(): 

202 penalty = stats["error_rate"] * 0.5 

203 adjusted = stats["avg_score"] * (1 - penalty) 

204 if adjusted > best_score: 

205 best_score = adjusted 

206 best_name = name 

207 return best_name 

208 

209 @staticmethod 

210 def _std(values: list[float]) -> float: 

211 if len(values) < 2: 

212 return 0.0 

213 mean = sum(values) / len(values) 

214 return (sum((v - mean) ** 2 for v in values) / len(values)) ** 0.5 

215 

216 def get_report(self, report_id: str) -> Optional[ExperimentReport]: 

217 return self._reports.get(report_id) 

218 

219 def list_reports(self) -> list[dict]: 

220 return [{"id": rid, "name": r.config.name, "winner": r.winner, "variants": len(r.config.variants)} 

221 for rid, r in self._reports.items()] 

222 

223 def generate_markdown_report(self, report: ExperimentReport) -> str: 

224 """生成Markdown格式实验报告。""" 

225 lines = [ 

226 f"# 实验报告: {report.config.name}", 

227 f"**实验ID**: {report.id}", 

228 f"**变体数**: {len(report.config.variants)}", 

229 f"**测试输入数**: {len(report.config.test_inputs)}", 

230 f"**每变体试验次数**: {report.config.trials_per_variant}", 

231 f"**胜出变体**: **{report.winner}**", 

232 "", 

233 "## 统计摘要", 

234 "", 

235 "| 变体 | 平均分 | 最高分 | 最低分 | 标准差 | 平均延迟(ms) | 错误率 | 试验数 |", 

236 "|------|--------|--------|--------|--------|-------------|--------|--------|", 

237 ] 

238 for name, stats in report.summary.items(): 

239 marker = " **← 胜出**" if name == report.winner else "" 

240 lines.append( 

241 f"| {name}{marker} | {stats['avg_score']:.3f} | {stats['max_score']:.3f} | " 

242 f"{stats['min_score']:.3f} | {stats['std_score']:.3f} | {stats['avg_latency_ms']:.0f} | " 

243 f"{stats['error_rate']:.1%} | {stats['trials']} |" 

244 ) 

245 

246 lines += ["", "## 变体配置", ""] 

247 for v in report.config.variants: 

248 lines += [ 

249 f"### {v.name}", 

250 f"- 模型: {v.model}", 

251 f"- 温度: {v.temperature}", 

252 f"- Max Tokens: {v.max_tokens}", 

253 f"```\n{v.system_prompt[:200]}...\n```", 

254 "", 

255 ] 

256 

257 return "\n".join(lines)