Coverage for agentos/benchmarks/runner.py: 47%
88 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""v0.80 — 性能基准测试运行器:延迟/吞吐/并发。"""
3from __future__ import annotations
5import asyncio
6import json
7import time
8from dataclasses import dataclass, field
9from typing import Any, Callable, Awaitable
12@dataclass
13class BenchmarkScenario:
14 """单个基准测试场景。"""
15 name: str
16 description: str = ""
17 setup: Callable[[], Any] | None = None
18 teardown: Callable[[Any], None] | None = None
21@dataclass
22class BenchmarkConfig:
23 """基准测试配置。"""
24 warmup_iterations: int = 3
25 measure_iterations: int = 10
26 concurrency_levels: list[int] = field(default_factory=lambda: [1, 4, 8])
27 timeout_per_run: float = 30.0
30@dataclass
31class _LatencyStats:
32 min_ms: float = 0
33 max_ms: float = 0
34 avg_ms: float = 0
35 p50_ms: float = 0
36 p95_ms: float = 0
37 p99_ms: float = 0
39 @staticmethod
40 def compute(latencies_ms: list[float]) -> "_LatencyStats":
41 if not latencies_ms:
42 return _LatencyStats()
43 s = sorted(latencies_ms)
44 n = len(s)
45 return _LatencyStats(
46 min_ms=s[0],
47 max_ms=s[-1],
48 avg_ms=sum(s) / n,
49 p50_ms=s[int(n * 0.5)],
50 p95_ms=s[int(n * 0.95)] if int(n * 0.95) < n else s[-1],
51 p99_ms=s[int(n * 0.99)] if int(n * 0.99) < n else s[-1],
52 )
55@dataclass
56class BenchmarkReport:
57 """基准测试报告。"""
58 scenario: str = ""
59 description: str = ""
60 config: BenchmarkConfig = field(default_factory=BenchmarkConfig)
61 results: list[dict[str, Any]] = field(default_factory=list)
62 summary: str = ""
64 def to_json(self) -> str:
65 return json.dumps({
66 "scenario": self.scenario,
67 "description": self.description,
68 "results": self.results,
69 "summary": self.summary,
70 }, indent=2, ensure_ascii=False)
72 def to_markdown(self) -> str:
73 lines = [
74 f"# Benchmark: {self.scenario}",
75 "",
76 f"_{self.description}_",
77 "",
78 "| 并发 | 总调用 | 总耗时(s) | 吞吐(QPS) | 平均延迟(ms) | P50(ms) | P95(ms) | P99(ms) | 成功率 |",
79 "|------|--------|-----------|-----------|-------------|---------|---------|---------|--------|",
80 ]
81 for r in self.results:
82 lines.append(
83 f"| {r['concurrency']} | {r['total_calls']} | {r['total_time_s']:.2f} | "
84 f"{r['throughput_qps']:.1f} | {r['latency_stats']['avg_ms']:.1f} | "
85 f"{r['latency_stats']['p50_ms']:.1f} | {r['latency_stats']['p95_ms']:.1f} | "
86 f"{r['latency_stats']['p99_ms']:.1f} | {r['success_rate']*100:.0f}% |"
87 )
88 if self.summary:
89 lines.extend(["", f"> {self.summary}"])
90 return "\n".join(lines)
93class BenchmarkRunner:
94 """基准测试运行器。"""
96 def __init__(self, config: BenchmarkConfig | None = None):
97 self.config = config or BenchmarkConfig()
99 async def run(
100 self,
101 scenario: BenchmarkScenario,
102 callable_fn: Callable[[], Any],
103 async_callable_fn: Callable[[], Awaitable[Any]] | None = None,
104 ) -> BenchmarkReport:
105 """运行基准测试。
107 Args:
108 scenario: 测试场景。
109 callable_fn: 同步测试函数。
110 async_callable_fn: 异步测试函数(用于并发测试)。
111 """
112 setup_state = scenario.setup() if scenario.setup else None
114 results: list[dict[str, Any]] = []
116 for concurrency in self.config.concurrency_levels:
117 total_calls = concurrency * self.config.measure_iterations
118 total_start = time.perf_counter()
119 success = 0
120 latencies_ms: list[float] = []
122 async def _one_call():
123 nonlocal success
124 t0 = time.perf_counter()
125 try:
126 if async_callable_fn:
127 await async_callable_fn()
128 else:
129 callable_fn()
130 success += 1
131 except Exception:
132 pass
133 latencies_ms.append((time.perf_counter() - t0) * 1000)
135 # warmup
136 for _ in range(self.config.warmup_iterations):
137 try:
138 callable_fn()
139 except Exception:
140 pass
142 # measure
143 tasks = [_one_call() for _ in range(total_calls)]
144 await asyncio.gather(*tasks)
146 total_time = time.perf_counter() - total_start
147 stats = _LatencyStats.compute(latencies_ms)
149 results.append({
150 "concurrency": concurrency,
151 "total_calls": total_calls,
152 "total_time_s": round(total_time, 3),
153 "throughput_qps": round(total_calls / total_time, 1) if total_time > 0 else 0,
154 "latency_stats": {
155 "min_ms": round(stats.min_ms, 2),
156 "max_ms": round(stats.max_ms, 2),
157 "avg_ms": round(stats.avg_ms, 2),
158 "p50_ms": round(stats.p50_ms, 2),
159 "p95_ms": round(stats.p95_ms, 2),
160 "p99_ms": round(stats.p99_ms, 2),
161 },
162 "success_rate": round(success / total_calls, 4) if total_calls else 0,
163 })
165 if scenario.teardown and setup_state is not None:
166 scenario.teardown(setup_state)
168 avg_throughput = sum(r["throughput_qps"] for r in results) / len(results) if results else 0
169 return BenchmarkReport(
170 scenario=scenario.name,
171 description=scenario.description,
172 config=self.config,
173 results=results,
174 summary=f"平均吞吐: {avg_throughput:.1f} QPS | 并发级别: {self.config.concurrency_levels}",
175 )
178async def run_benchmark(
179 scenario_name: str,
180 callable_fn: Callable[[], Any],
181 config: BenchmarkConfig | None = None,
182) -> BenchmarkReport:
183 """便捷函数:运行一次基准测试并返回 Markdown 报告。"""
184 runner = BenchmarkRunner(config)
185 scenario = BenchmarkScenario(name=scenario_name)
186 return await runner.run(scenario, callable_fn)