Coverage for agentos/observability/cost_analytics.py: 31%

237 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS v0.70 — 成本分析与运营仪表板。 

3基因来源: OpenAI Usage Dashboard + Grafana 

4 

5提供: 

6- 按模型/按天/按session的多维度成本统计 

7- Token消耗趋势分析 

8- 预算预警系统 

9- 成本预测(简单滑动平均) 

10""" 

11 

12from __future__ import annotations 

13 

14import json 

15import os 

16import time 

17import threading 

18from collections import defaultdict 

19from dataclasses import dataclass, field 

20from typing import Any 

21 

22from agentos.cost.tracker import CostTracker, PRICING 

23 

24 

25@dataclass 

26class CostEntry: 

27 """单次调用的成本记录。""" 

28 timestamp: float 

29 model: str 

30 session_id: str 

31 input_tokens: int 

32 output_tokens: int 

33 cost_usd: float 

34 duration_ms: float = 0.0 

35 

36 

37@dataclass 

38class DailySummary: 

39 """日成本摘要。""" 

40 date: str 

41 model: str 

42 calls: int = 0 

43 total_input_tokens: int = 0 

44 total_output_tokens: int = 0 

45 total_cost_usd: float = 0.0 

46 avg_duration_ms: float = 0.0 

47 

48 

49@dataclass 

50class CostBreakdown: 

51 """单次调用的详细成本分解。""" 

52 model: str 

53 input_tokens: int 

54 output_tokens: int 

55 input_cost_usd: float 

56 output_cost_usd: float 

57 total_cost_usd: float 

58 token_cost_ratio: str = "" # e.g. "1:2.5" 

59 

60 def __post_init__(self): 

61 if self.output_cost_usd > 0: 

62 r = self.input_cost_usd / self.output_cost_usd 

63 self.token_cost_ratio = f"1:{r:.1f}" if r > 1 else f"{1/r:.1f}:1" 

64 

65 

66@dataclass 

67class CostSession: 

68 """单次会话的成本摘要。""" 

69 session_id: str 

70 model: str = "" 

71 calls: int = 0 

72 total_input_tokens: int = 0 

73 total_output_tokens: int = 0 

74 total_cost_usd: float = 0.0 

75 start_time: float = 0.0 

76 end_time: float = 0.0 

77 status: str = "active" 

78 

79 

80@dataclass 

81class BudgetAlert: 

82 """预算告警。""" 

83 triggered: bool 

84 current_cost: float 

85 budget: float 

86 pct_used: float 

87 projected_daily: float 

88 message: str 

89 

90 

91class CostAnalytics: 

92 """ 

93 成本分析引擎 — 多维度聚合、趋势、预算管理。 

94 """ 

95 

96 def __init__( 

97 self, 

98 cost_tracker: CostTracker, 

99 budget_monthly: float = 0.0, 

100 warn_threshold: float = 0.8, 

101 persist_path: str = "", 

102 ): 

103 self.tracker = cost_tracker 

104 self.budget_monthly = budget_monthly 

105 self.warn_threshold = warn_threshold 

106 self.persist_path = persist_path 

107 self._entries: list[CostEntry] = [] 

108 self._lock = threading.Lock() 

109 self._load() 

110 

111 def record( 

112 self, 

113 model: str, 

114 session_id: str, 

115 input_tokens: int, 

116 output_tokens: int, 

117 duration_ms: float = 0.0, 

118 ): 

119 """记录一次调用成本。""" 

120 price = PRICING.get(model, {}) 

121 cost = ( 

122 input_tokens / 1_000_000 * price.get("input", 0) + 

123 output_tokens / 1_000_000 * price.get("output", 0) 

124 ) 

125 entry = CostEntry( 

126 timestamp=time.time(), 

127 model=model, 

128 session_id=session_id, 

129 input_tokens=input_tokens, 

130 output_tokens=output_tokens, 

131 cost_usd=cost, 

132 duration_ms=duration_ms, 

133 ) 

134 with self._lock: 

135 self._entries.append(entry) 

136 

137 # Periodic save (every 50 entries) 

138 if len(self._entries) % 50 == 0: 

139 self._save() 

140 

141 # ── 按模型汇总 ─────────────────────────────── 

142 

143 def by_model(self, hours: float = 24.0) -> list[dict]: 

144 """最近N小时的模型成本分布。""" 

145 cutoff = time.time() - hours * 3600 

146 agg: dict[str, dict] = defaultdict(lambda: { 

147 "model": "", "calls": 0, "input_tokens": 0, "output_tokens": 0, "cost_usd": 0.0, 

148 }) 

149 with self._lock: 

150 for e in self._entries: 

151 if e.timestamp < cutoff: 

152 continue 

153 d = agg[e.model] 

154 d["model"] = e.model 

155 d["calls"] += 1 

156 d["input_tokens"] += e.input_tokens 

157 d["output_tokens"] += e.output_tokens 

158 d["cost_usd"] += e.cost_usd 

159 

160 return sorted(agg.values(), key=lambda x: x["cost_usd"], reverse=True) 

161 

162 # ── 按天汇总 ──────────────────────────────── 

163 

164 def daily_breakdown(self, days: int = 7) -> list[DailySummary]: 

165 """最近N天的每日成本明细。""" 

166 from datetime import datetime, timedelta 

167 

168 today = datetime.now().date() 

169 summaries: dict[tuple[str, str], DailySummary] = {} 

170 

171 with self._lock: 

172 for e in self._entries: 

173 d = datetime.fromtimestamp(e.timestamp).strftime("%Y-%m-%d") 

174 key = (d, e.model) 

175 if key not in summaries: 

176 summaries[key] = DailySummary(date=d, model=e.model) 

177 s = summaries[key] 

178 s.calls += 1 

179 s.total_input_tokens += e.input_tokens 

180 s.total_output_tokens += e.output_tokens 

181 s.total_cost_usd += e.cost_usd 

182 if s.avg_duration_ms == 0: 

183 s.avg_duration_ms = e.duration_ms 

184 else: 

185 s.avg_duration_ms = (s.avg_duration_ms + e.duration_ms) / 2 

186 

187 # Sort by date desc, model 

188 result = sorted(summaries.values(), key=lambda x: (x.date, x.model), reverse=True) 

189 return result 

190 

191 # ── 按Session汇总 ──────────────────────────── 

192 

193 def by_session(self, top_n: int = 10) -> list[dict]: 

194 """最贵的N个session。""" 

195 agg: dict[str, dict] = defaultdict(lambda: {"session_id": "", "calls": 0, "cost_usd": 0.0, "tokens": 0}) 

196 with self._lock: 

197 for e in self._entries: 

198 d = agg[e.session_id] 

199 d["session_id"] = e.session_id 

200 d["calls"] += 1 

201 d["cost_usd"] += e.cost_usd 

202 d["tokens"] += e.input_tokens + e.output_tokens 

203 

204 sorted_sessions = sorted(agg.values(), key=lambda x: x["cost_usd"], reverse=True) 

205 return sorted_sessions[:top_n] 

206 

207 # ── 趋势分析 ───────────────────────────────── 

208 

209 def trend(self, metric: str = "cost", window: int = 7) -> list[dict]: 

210 """ 

211 趋势分析(滑动平均)。 

212 metric: cost | tokens | calls 

213 """ 

214 daily = self.daily_breakdown(days=window * 2) 

215 # Aggregate by date 

216 by_date: dict[str, dict] = defaultdict(lambda: {"date": "", "cost": 0.0, "tokens": 0, "calls": 0}) 

217 for s in daily: 

218 d = by_date[s.date] 

219 d["date"] = s.date 

220 d["cost"] += s.total_cost_usd 

221 d["tokens"] += s.total_input_tokens + s.total_output_tokens 

222 d["calls"] += s.calls 

223 

224 dates = sorted(by_date.keys())[-window * 2:] 

225 values = [by_date[d].get(metric, 0) for d in dates] 

226 

227 # Simple moving average (window=3) 

228 trend_data = [] 

229 for i, d in enumerate(dates): 

230 w_start = max(0, i - 2) 

231 w_vals = values[w_start:i + 1] 

232 trend_data.append({ 

233 "date": d, 

234 "value": values[i], 

235 "sma": sum(w_vals) / len(w_vals), 

236 }) 

237 return trend_data 

238 

239 # ── 预算预警 ───────────────────────────────── 

240 

241 def check_budget(self) -> BudgetAlert: 

242 """检查预算是否超过阈值。""" 

243 if self.budget_monthly <= 0: 

244 return BudgetAlert(False, self.tracker.total_cost, 0, 0, 0, "") 

245 

246 current = self.tracker.total_cost 

247 pct = current / self.budget_monthly 

248 

249 # Projection: based on past 7 days average 

250 daily_data = self.daily_breakdown(days=7) 

251 daily_costs = defaultdict(float) 

252 for s in daily_data: 

253 daily_costs[s.date] += s.total_cost_usd 

254 if daily_costs: 

255 avg_daily = sum(daily_costs.values()) / len(daily_costs) 

256 else: 

257 avg_daily = 0 

258 

259 from datetime import datetime 

260 days_left = 31 - datetime.now().day 

261 projected = avg_daily * max(days_left, 1) 

262 

263 triggered = pct > self.warn_threshold 

264 message = "" 

265 if triggered: 

266 projected_total = current + projected 

267 message = ( 

268 f"成本告警: 已消耗 ${current:.4f} ({pct:.0%})," 

269 f"预计月末 ${projected_total:.4f}" 

270 ) 

271 

272 return BudgetAlert( 

273 triggered=triggered, 

274 current_cost=current, 

275 budget=self.budget_monthly, 

276 pct_used=pct, 

277 projected_daily=avg_daily, 

278 message=message, 

279 ) 

280 

281 # ── Summary ────────────────────────────────── 

282 

283 @property 

284 def total_cost(self) -> float: 

285 return self.tracker.total_cost 

286 

287 @property 

288 def total_calls(self) -> int: 

289 with self._lock: 

290 return len(self._entries) 

291 

292 def summary(self) -> str: 

293 models = self.by_model(hours=24) 

294 lines = [ 

295 f"总成本: ${self.total_cost:.4f}", 

296 f"总调用: {self.total_calls} 次", 

297 "", 

298 "最近24h模型分布:", 

299 ] 

300 for m in models[:5]: 

301 lines.append( 

302 f" {m['model']}: ${m['cost_usd']:.4f} " 

303 f"({m['calls']}次, {m['input_tokens']}+{m['output_tokens']} tokens)" 

304 ) 

305 if self.budget_monthly > 0: 

306 alert = self.check_budget() 

307 lines.append(f"\n月度预算: ${self.budget_monthly:.2f} (已用 {alert.pct_used:.1%})") 

308 if alert.triggered: 

309 lines.append(f" {alert.message}") 

310 return "\n".join(lines) 

311 

312 # ── 详细分解 ───────────────────────────────── 

313 

314 def get_breakdown(self, session_id: str = "", hours: float = 24.0) -> list[CostBreakdown]: 

315 """获取指定会话或最近的详细成本分解。""" 

316 cutoff = time.time() - hours * 3600 

317 results = [] 

318 with self._lock: 

319 entries = self._entries 

320 if session_id: 

321 entries = [e for e in entries if e.session_id == session_id] 

322 for e in entries: 

323 if e.timestamp < cutoff: 

324 continue 

325 price = PRICING.get(e.model, {}) 

326 input_cost = e.input_tokens / 1_000_000 * price.get("input", 0) 

327 output_cost = e.output_tokens / 1_000_000 * price.get("output", 0) 

328 results.append(CostBreakdown( 

329 model=e.model, 

330 input_tokens=e.input_tokens, 

331 output_tokens=e.output_tokens, 

332 input_cost_usd=input_cost, 

333 output_cost_usd=output_cost, 

334 total_cost_usd=e.cost_usd, 

335 )) 

336 return sorted(results, key=lambda x: x.total_cost_usd, reverse=True) 

337 

338 def get_session(self, session_id: str) -> CostSession | None: 

339 """获取指定会话的成本摘要。""" 

340 with self._lock: 

341 matches = [e for e in self._entries if e.session_id == session_id] 

342 if not matches: 

343 return None 

344 models = set(e.model for e in matches) 

345 timestamps = [e.timestamp for e in matches] 

346 total_input = sum(e.input_tokens for e in matches) 

347 total_output = sum(e.output_tokens for e in matches) 

348 total_cost = sum(e.cost_usd for e in matches) 

349 return CostSession( 

350 session_id=session_id, 

351 model=", ".join(sorted(models)), 

352 calls=len(matches), 

353 total_input_tokens=total_input, 

354 total_output_tokens=total_output, 

355 total_cost_usd=total_cost, 

356 start_time=min(timestamps), 

357 end_time=max(timestamps), 

358 ) 

359 

360 # ── Scores 成本关联 ────────────────────────── 

361 

362 def cost_by_score_tier(self, scores: dict[float, list[str]]) -> dict[str, float]: 

363 """按评分层级聚合成本(与 ScoringEngine 联动)。""" 

364 tiers = {} 

365 for score, session_ids in scores.items(): 

366 tier_name = f"score_{score:.1f}" 

367 tier_cost = 0.0 

368 with self._lock: 

369 for e in self._entries: 

370 if e.session_id in session_ids: 

371 tier_cost += e.cost_usd 

372 tiers[tier_name] = tier_cost 

373 return tiers 

374 

375 # ── Persistence ────────────────────────────── 

376 

377 def _save(self): 

378 if not self.persist_path: 

379 return 

380 try: 

381 data = [] 

382 with self._lock: 

383 for e in self._entries[-10000:]: # Keep last 10k 

384 data.append({ 

385 "ts": e.timestamp, 

386 "model": e.model, 

387 "session_id": e.session_id, 

388 "input_tokens": e.input_tokens, 

389 "output_tokens": e.output_tokens, 

390 "cost_usd": e.cost_usd, 

391 "duration_ms": e.duration_ms, 

392 }) 

393 os.makedirs(os.path.dirname(self.persist_path) or ".", exist_ok=True) 

394 with open(self.persist_path, "w") as f: 

395 json.dump(data, f) 

396 except Exception: 

397 pass 

398 

399 def _load(self): 

400 if not self.persist_path or not os.path.exists(self.persist_path): 

401 return 

402 try: 

403 with open(self.persist_path) as f: 

404 data = json.load(f) 

405 with self._lock: 

406 self._entries = [ 

407 CostEntry( 

408 timestamp=d["ts"], model=d["model"], 

409 session_id=d["session_id"], 

410 input_tokens=d["input_tokens"], 

411 output_tokens=d["output_tokens"], 

412 cost_usd=d["cost_usd"], 

413 duration_ms=d.get("duration_ms", 0), 

414 ) 

415 for d in data 

416 ] 

417 except Exception: 

418 pass