Coverage for agentos/benchmarks/runner.py: 47%

88 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1"""v0.80 — 性能基准测试运行器:延迟/吞吐/并发。""" 

2 

3from __future__ import annotations 

4 

5import asyncio 

6import json 

7import time 

8from dataclasses import dataclass, field 

9from typing import Any, Callable, Awaitable 

10 

11 

12@dataclass 

13class BenchmarkScenario: 

14 """单个基准测试场景。""" 

15 name: str 

16 description: str = "" 

17 setup: Callable[[], Any] | None = None 

18 teardown: Callable[[Any], None] | None = None 

19 

20 

21@dataclass 

22class BenchmarkConfig: 

23 """基准测试配置。""" 

24 warmup_iterations: int = 3 

25 measure_iterations: int = 10 

26 concurrency_levels: list[int] = field(default_factory=lambda: [1, 4, 8]) 

27 timeout_per_run: float = 30.0 

28 

29 

30@dataclass 

31class _LatencyStats: 

32 min_ms: float = 0 

33 max_ms: float = 0 

34 avg_ms: float = 0 

35 p50_ms: float = 0 

36 p95_ms: float = 0 

37 p99_ms: float = 0 

38 

39 @staticmethod 

40 def compute(latencies_ms: list[float]) -> "_LatencyStats": 

41 if not latencies_ms: 

42 return _LatencyStats() 

43 s = sorted(latencies_ms) 

44 n = len(s) 

45 return _LatencyStats( 

46 min_ms=s[0], 

47 max_ms=s[-1], 

48 avg_ms=sum(s) / n, 

49 p50_ms=s[int(n * 0.5)], 

50 p95_ms=s[int(n * 0.95)] if int(n * 0.95) < n else s[-1], 

51 p99_ms=s[int(n * 0.99)] if int(n * 0.99) < n else s[-1], 

52 ) 

53 

54 

55@dataclass 

56class BenchmarkReport: 

57 """基准测试报告。""" 

58 scenario: str = "" 

59 description: str = "" 

60 config: BenchmarkConfig = field(default_factory=BenchmarkConfig) 

61 results: list[dict[str, Any]] = field(default_factory=list) 

62 summary: str = "" 

63 

64 def to_json(self) -> str: 

65 return json.dumps({ 

66 "scenario": self.scenario, 

67 "description": self.description, 

68 "results": self.results, 

69 "summary": self.summary, 

70 }, indent=2, ensure_ascii=False) 

71 

72 def to_markdown(self) -> str: 

73 lines = [ 

74 f"# Benchmark: {self.scenario}", 

75 "", 

76 f"_{self.description}_", 

77 "", 

78 "| 并发 | 总调用 | 总耗时(s) | 吞吐(QPS) | 平均延迟(ms) | P50(ms) | P95(ms) | P99(ms) | 成功率 |", 

79 "|------|--------|-----------|-----------|-------------|---------|---------|---------|--------|", 

80 ] 

81 for r in self.results: 

82 lines.append( 

83 f"| {r['concurrency']} | {r['total_calls']} | {r['total_time_s']:.2f} | " 

84 f"{r['throughput_qps']:.1f} | {r['latency_stats']['avg_ms']:.1f} | " 

85 f"{r['latency_stats']['p50_ms']:.1f} | {r['latency_stats']['p95_ms']:.1f} | " 

86 f"{r['latency_stats']['p99_ms']:.1f} | {r['success_rate']*100:.0f}% |" 

87 ) 

88 if self.summary: 

89 lines.extend(["", f"> {self.summary}"]) 

90 return "\n".join(lines) 

91 

92 

93class BenchmarkRunner: 

94 """基准测试运行器。""" 

95 

96 def __init__(self, config: BenchmarkConfig | None = None): 

97 self.config = config or BenchmarkConfig() 

98 

99 async def run( 

100 self, 

101 scenario: BenchmarkScenario, 

102 callable_fn: Callable[[], Any], 

103 async_callable_fn: Callable[[], Awaitable[Any]] | None = None, 

104 ) -> BenchmarkReport: 

105 """运行基准测试。 

106 

107 Args: 

108 scenario: 测试场景。 

109 callable_fn: 同步测试函数。 

110 async_callable_fn: 异步测试函数(用于并发测试)。 

111 """ 

112 setup_state = scenario.setup() if scenario.setup else None 

113 

114 results: list[dict[str, Any]] = [] 

115 

116 for concurrency in self.config.concurrency_levels: 

117 total_calls = concurrency * self.config.measure_iterations 

118 total_start = time.perf_counter() 

119 success = 0 

120 latencies_ms: list[float] = [] 

121 

122 async def _one_call(): 

123 nonlocal success 

124 t0 = time.perf_counter() 

125 try: 

126 if async_callable_fn: 

127 await async_callable_fn() 

128 else: 

129 callable_fn() 

130 success += 1 

131 except Exception: 

132 pass 

133 latencies_ms.append((time.perf_counter() - t0) * 1000) 

134 

135 # warmup 

136 for _ in range(self.config.warmup_iterations): 

137 try: 

138 callable_fn() 

139 except Exception: 

140 pass 

141 

142 # measure 

143 tasks = [_one_call() for _ in range(total_calls)] 

144 await asyncio.gather(*tasks) 

145 

146 total_time = time.perf_counter() - total_start 

147 stats = _LatencyStats.compute(latencies_ms) 

148 

149 results.append({ 

150 "concurrency": concurrency, 

151 "total_calls": total_calls, 

152 "total_time_s": round(total_time, 3), 

153 "throughput_qps": round(total_calls / total_time, 1) if total_time > 0 else 0, 

154 "latency_stats": { 

155 "min_ms": round(stats.min_ms, 2), 

156 "max_ms": round(stats.max_ms, 2), 

157 "avg_ms": round(stats.avg_ms, 2), 

158 "p50_ms": round(stats.p50_ms, 2), 

159 "p95_ms": round(stats.p95_ms, 2), 

160 "p99_ms": round(stats.p99_ms, 2), 

161 }, 

162 "success_rate": round(success / total_calls, 4) if total_calls else 0, 

163 }) 

164 

165 if scenario.teardown and setup_state is not None: 

166 scenario.teardown(setup_state) 

167 

168 avg_throughput = sum(r["throughput_qps"] for r in results) / len(results) if results else 0 

169 return BenchmarkReport( 

170 scenario=scenario.name, 

171 description=scenario.description, 

172 config=self.config, 

173 results=results, 

174 summary=f"平均吞吐: {avg_throughput:.1f} QPS | 并发级别: {self.config.concurrency_levels}", 

175 ) 

176 

177 

178async def run_benchmark( 

179 scenario_name: str, 

180 callable_fn: Callable[[], Any], 

181 config: BenchmarkConfig | None = None, 

182) -> BenchmarkReport: 

183 """便捷函数:运行一次基准测试并返回 Markdown 报告。""" 

184 runner = BenchmarkRunner(config) 

185 scenario = BenchmarkScenario(name=scenario_name) 

186 return await runner.run(scenario, callable_fn)