Coverage for agentos/observability/cost_analytics.py: 31%
237 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS v0.70 — 成本分析与运营仪表板。
3基因来源: OpenAI Usage Dashboard + Grafana
5提供:
6- 按模型/按天/按session的多维度成本统计
7- Token消耗趋势分析
8- 预算预警系统
9- 成本预测(简单滑动平均)
10"""
12from __future__ import annotations
14import json
15import os
16import time
17import threading
18from collections import defaultdict
19from dataclasses import dataclass, field
20from typing import Any
22from agentos.cost.tracker import CostTracker, PRICING
25@dataclass
26class CostEntry:
27 """单次调用的成本记录。"""
28 timestamp: float
29 model: str
30 session_id: str
31 input_tokens: int
32 output_tokens: int
33 cost_usd: float
34 duration_ms: float = 0.0
37@dataclass
38class DailySummary:
39 """日成本摘要。"""
40 date: str
41 model: str
42 calls: int = 0
43 total_input_tokens: int = 0
44 total_output_tokens: int = 0
45 total_cost_usd: float = 0.0
46 avg_duration_ms: float = 0.0
49@dataclass
50class CostBreakdown:
51 """单次调用的详细成本分解。"""
52 model: str
53 input_tokens: int
54 output_tokens: int
55 input_cost_usd: float
56 output_cost_usd: float
57 total_cost_usd: float
58 token_cost_ratio: str = "" # e.g. "1:2.5"
60 def __post_init__(self):
61 if self.output_cost_usd > 0:
62 r = self.input_cost_usd / self.output_cost_usd
63 self.token_cost_ratio = f"1:{r:.1f}" if r > 1 else f"{1/r:.1f}:1"
66@dataclass
67class CostSession:
68 """单次会话的成本摘要。"""
69 session_id: str
70 model: str = ""
71 calls: int = 0
72 total_input_tokens: int = 0
73 total_output_tokens: int = 0
74 total_cost_usd: float = 0.0
75 start_time: float = 0.0
76 end_time: float = 0.0
77 status: str = "active"
80@dataclass
81class BudgetAlert:
82 """预算告警。"""
83 triggered: bool
84 current_cost: float
85 budget: float
86 pct_used: float
87 projected_daily: float
88 message: str
91class CostAnalytics:
92 """
93 成本分析引擎 — 多维度聚合、趋势、预算管理。
94 """
96 def __init__(
97 self,
98 cost_tracker: CostTracker,
99 budget_monthly: float = 0.0,
100 warn_threshold: float = 0.8,
101 persist_path: str = "",
102 ):
103 self.tracker = cost_tracker
104 self.budget_monthly = budget_monthly
105 self.warn_threshold = warn_threshold
106 self.persist_path = persist_path
107 self._entries: list[CostEntry] = []
108 self._lock = threading.Lock()
109 self._load()
111 def record(
112 self,
113 model: str,
114 session_id: str,
115 input_tokens: int,
116 output_tokens: int,
117 duration_ms: float = 0.0,
118 ):
119 """记录一次调用成本。"""
120 price = PRICING.get(model, {})
121 cost = (
122 input_tokens / 1_000_000 * price.get("input", 0) +
123 output_tokens / 1_000_000 * price.get("output", 0)
124 )
125 entry = CostEntry(
126 timestamp=time.time(),
127 model=model,
128 session_id=session_id,
129 input_tokens=input_tokens,
130 output_tokens=output_tokens,
131 cost_usd=cost,
132 duration_ms=duration_ms,
133 )
134 with self._lock:
135 self._entries.append(entry)
137 # Periodic save (every 50 entries)
138 if len(self._entries) % 50 == 0:
139 self._save()
141 # ── 按模型汇总 ───────────────────────────────
143 def by_model(self, hours: float = 24.0) -> list[dict]:
144 """最近N小时的模型成本分布。"""
145 cutoff = time.time() - hours * 3600
146 agg: dict[str, dict] = defaultdict(lambda: {
147 "model": "", "calls": 0, "input_tokens": 0, "output_tokens": 0, "cost_usd": 0.0,
148 })
149 with self._lock:
150 for e in self._entries:
151 if e.timestamp < cutoff:
152 continue
153 d = agg[e.model]
154 d["model"] = e.model
155 d["calls"] += 1
156 d["input_tokens"] += e.input_tokens
157 d["output_tokens"] += e.output_tokens
158 d["cost_usd"] += e.cost_usd
160 return sorted(agg.values(), key=lambda x: x["cost_usd"], reverse=True)
162 # ── 按天汇总 ────────────────────────────────
164 def daily_breakdown(self, days: int = 7) -> list[DailySummary]:
165 """最近N天的每日成本明细。"""
166 from datetime import datetime, timedelta
168 today = datetime.now().date()
169 summaries: dict[tuple[str, str], DailySummary] = {}
171 with self._lock:
172 for e in self._entries:
173 d = datetime.fromtimestamp(e.timestamp).strftime("%Y-%m-%d")
174 key = (d, e.model)
175 if key not in summaries:
176 summaries[key] = DailySummary(date=d, model=e.model)
177 s = summaries[key]
178 s.calls += 1
179 s.total_input_tokens += e.input_tokens
180 s.total_output_tokens += e.output_tokens
181 s.total_cost_usd += e.cost_usd
182 if s.avg_duration_ms == 0:
183 s.avg_duration_ms = e.duration_ms
184 else:
185 s.avg_duration_ms = (s.avg_duration_ms + e.duration_ms) / 2
187 # Sort by date desc, model
188 result = sorted(summaries.values(), key=lambda x: (x.date, x.model), reverse=True)
189 return result
191 # ── 按Session汇总 ────────────────────────────
193 def by_session(self, top_n: int = 10) -> list[dict]:
194 """最贵的N个session。"""
195 agg: dict[str, dict] = defaultdict(lambda: {"session_id": "", "calls": 0, "cost_usd": 0.0, "tokens": 0})
196 with self._lock:
197 for e in self._entries:
198 d = agg[e.session_id]
199 d["session_id"] = e.session_id
200 d["calls"] += 1
201 d["cost_usd"] += e.cost_usd
202 d["tokens"] += e.input_tokens + e.output_tokens
204 sorted_sessions = sorted(agg.values(), key=lambda x: x["cost_usd"], reverse=True)
205 return sorted_sessions[:top_n]
207 # ── 趋势分析 ─────────────────────────────────
209 def trend(self, metric: str = "cost", window: int = 7) -> list[dict]:
210 """
211 趋势分析(滑动平均)。
212 metric: cost | tokens | calls
213 """
214 daily = self.daily_breakdown(days=window * 2)
215 # Aggregate by date
216 by_date: dict[str, dict] = defaultdict(lambda: {"date": "", "cost": 0.0, "tokens": 0, "calls": 0})
217 for s in daily:
218 d = by_date[s.date]
219 d["date"] = s.date
220 d["cost"] += s.total_cost_usd
221 d["tokens"] += s.total_input_tokens + s.total_output_tokens
222 d["calls"] += s.calls
224 dates = sorted(by_date.keys())[-window * 2:]
225 values = [by_date[d].get(metric, 0) for d in dates]
227 # Simple moving average (window=3)
228 trend_data = []
229 for i, d in enumerate(dates):
230 w_start = max(0, i - 2)
231 w_vals = values[w_start:i + 1]
232 trend_data.append({
233 "date": d,
234 "value": values[i],
235 "sma": sum(w_vals) / len(w_vals),
236 })
237 return trend_data
239 # ── 预算预警 ─────────────────────────────────
241 def check_budget(self) -> BudgetAlert:
242 """检查预算是否超过阈值。"""
243 if self.budget_monthly <= 0:
244 return BudgetAlert(False, self.tracker.total_cost, 0, 0, 0, "")
246 current = self.tracker.total_cost
247 pct = current / self.budget_monthly
249 # Projection: based on past 7 days average
250 daily_data = self.daily_breakdown(days=7)
251 daily_costs = defaultdict(float)
252 for s in daily_data:
253 daily_costs[s.date] += s.total_cost_usd
254 if daily_costs:
255 avg_daily = sum(daily_costs.values()) / len(daily_costs)
256 else:
257 avg_daily = 0
259 from datetime import datetime
260 days_left = 31 - datetime.now().day
261 projected = avg_daily * max(days_left, 1)
263 triggered = pct > self.warn_threshold
264 message = ""
265 if triggered:
266 projected_total = current + projected
267 message = (
268 f"成本告警: 已消耗 ${current:.4f} ({pct:.0%}),"
269 f"预计月末 ${projected_total:.4f}"
270 )
272 return BudgetAlert(
273 triggered=triggered,
274 current_cost=current,
275 budget=self.budget_monthly,
276 pct_used=pct,
277 projected_daily=avg_daily,
278 message=message,
279 )
281 # ── Summary ──────────────────────────────────
283 @property
284 def total_cost(self) -> float:
285 return self.tracker.total_cost
287 @property
288 def total_calls(self) -> int:
289 with self._lock:
290 return len(self._entries)
292 def summary(self) -> str:
293 models = self.by_model(hours=24)
294 lines = [
295 f"总成本: ${self.total_cost:.4f}",
296 f"总调用: {self.total_calls} 次",
297 "",
298 "最近24h模型分布:",
299 ]
300 for m in models[:5]:
301 lines.append(
302 f" {m['model']}: ${m['cost_usd']:.4f} "
303 f"({m['calls']}次, {m['input_tokens']}+{m['output_tokens']} tokens)"
304 )
305 if self.budget_monthly > 0:
306 alert = self.check_budget()
307 lines.append(f"\n月度预算: ${self.budget_monthly:.2f} (已用 {alert.pct_used:.1%})")
308 if alert.triggered:
309 lines.append(f" {alert.message}")
310 return "\n".join(lines)
312 # ── 详细分解 ─────────────────────────────────
314 def get_breakdown(self, session_id: str = "", hours: float = 24.0) -> list[CostBreakdown]:
315 """获取指定会话或最近的详细成本分解。"""
316 cutoff = time.time() - hours * 3600
317 results = []
318 with self._lock:
319 entries = self._entries
320 if session_id:
321 entries = [e for e in entries if e.session_id == session_id]
322 for e in entries:
323 if e.timestamp < cutoff:
324 continue
325 price = PRICING.get(e.model, {})
326 input_cost = e.input_tokens / 1_000_000 * price.get("input", 0)
327 output_cost = e.output_tokens / 1_000_000 * price.get("output", 0)
328 results.append(CostBreakdown(
329 model=e.model,
330 input_tokens=e.input_tokens,
331 output_tokens=e.output_tokens,
332 input_cost_usd=input_cost,
333 output_cost_usd=output_cost,
334 total_cost_usd=e.cost_usd,
335 ))
336 return sorted(results, key=lambda x: x.total_cost_usd, reverse=True)
338 def get_session(self, session_id: str) -> CostSession | None:
339 """获取指定会话的成本摘要。"""
340 with self._lock:
341 matches = [e for e in self._entries if e.session_id == session_id]
342 if not matches:
343 return None
344 models = set(e.model for e in matches)
345 timestamps = [e.timestamp for e in matches]
346 total_input = sum(e.input_tokens for e in matches)
347 total_output = sum(e.output_tokens for e in matches)
348 total_cost = sum(e.cost_usd for e in matches)
349 return CostSession(
350 session_id=session_id,
351 model=", ".join(sorted(models)),
352 calls=len(matches),
353 total_input_tokens=total_input,
354 total_output_tokens=total_output,
355 total_cost_usd=total_cost,
356 start_time=min(timestamps),
357 end_time=max(timestamps),
358 )
360 # ── Scores 成本关联 ──────────────────────────
362 def cost_by_score_tier(self, scores: dict[float, list[str]]) -> dict[str, float]:
363 """按评分层级聚合成本(与 ScoringEngine 联动)。"""
364 tiers = {}
365 for score, session_ids in scores.items():
366 tier_name = f"score_{score:.1f}"
367 tier_cost = 0.0
368 with self._lock:
369 for e in self._entries:
370 if e.session_id in session_ids:
371 tier_cost += e.cost_usd
372 tiers[tier_name] = tier_cost
373 return tiers
375 # ── Persistence ──────────────────────────────
377 def _save(self):
378 if not self.persist_path:
379 return
380 try:
381 data = []
382 with self._lock:
383 for e in self._entries[-10000:]: # Keep last 10k
384 data.append({
385 "ts": e.timestamp,
386 "model": e.model,
387 "session_id": e.session_id,
388 "input_tokens": e.input_tokens,
389 "output_tokens": e.output_tokens,
390 "cost_usd": e.cost_usd,
391 "duration_ms": e.duration_ms,
392 })
393 os.makedirs(os.path.dirname(self.persist_path) or ".", exist_ok=True)
394 with open(self.persist_path, "w") as f:
395 json.dump(data, f)
396 except Exception:
397 pass
399 def _load(self):
400 if not self.persist_path or not os.path.exists(self.persist_path):
401 return
402 try:
403 with open(self.persist_path) as f:
404 data = json.load(f)
405 with self._lock:
406 self._entries = [
407 CostEntry(
408 timestamp=d["ts"], model=d["model"],
409 session_id=d["session_id"],
410 input_tokens=d["input_tokens"],
411 output_tokens=d["output_tokens"],
412 cost_usd=d["cost_usd"],
413 duration_ms=d.get("duration_ms", 0),
414 )
415 for d in data
416 ]
417 except Exception:
418 pass