Coverage for agentos/cost/tracker.py: 45%
159 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2v1.10.0: Cost Tracker — token counting & pricing across all providers.
4Tracks token usage and cost for: OpenAI, Anthropic, Google, DeepSeek, Groq.
5Features: per-request tracking, budget management, usage reporting.
6"""
8from __future__ import annotations
10import time
11from collections import defaultdict
12from dataclasses import dataclass, field
13from datetime import datetime, timezone
14from enum import Enum
15from typing import Any
18# ── Data Classes ──────────────────────────────────────────────────
20class ProviderPricing(str, Enum):
21 OPENAI = "openai"
22 ANTHROPIC = "anthropic"
23 GOOGLE = "google"
24 DEEPSEEK = "deepseek"
25 GROQ = "groq"
26 CUSTOM = "custom"
29@dataclass
30class TokenPricing:
31 """Pricing per 1M tokens (input/output)."""
32 provider: ProviderPricing
33 model: str
34 input_price_per_1m: float # USD per 1M input tokens
35 output_price_per_1m: float # USD per 1M output tokens
36 cache_write_price_per_1m: float = 0.0
37 cache_read_price_per_1m: float = 0.0
39 def cost(self, input_tokens: int, output_tokens: int,
40 cache_write: int = 0, cache_read: int = 0) -> float:
41 return (
42 (input_tokens / 1_000_000) * self.input_price_per_1m
43 + (output_tokens / 1_000_000) * self.output_price_per_1m
44 + (cache_write / 1_000_000) * self.cache_write_price_per_1m
45 + (cache_read / 1_000_000) * self.cache_read_price_per_1m
46 )
49@dataclass
50class TokenUsage:
51 """Token usage for a single API call."""
52 model: str
53 input_tokens: int = 0
54 output_tokens: int = 0
55 cache_write_tokens: int = 0
56 cache_read_tokens: int = 0
57 total_tokens: int = 0
58 cost: float = 0.0
59 latency_ms: float = 0.0
60 timestamp: str = ""
62 def __post_init__(self):
63 if not self.total_tokens:
64 self.total_tokens = self.input_tokens + self.output_tokens
65 if not self.timestamp:
66 self.timestamp = datetime.now(timezone.utc).isoformat()
69@dataclass
70class Budget:
71 """Spending budget configuration."""
72 name: str
73 limit: float # USD
74 period: str = "monthly" # daily / weekly / monthly / total
75 current_spend: float = 0.0
76 alert_threshold: float = 0.8 # Alert at 80% of limit
77 hard_stop: bool = False # Block requests when exceeded
79 @property
80 def remaining(self) -> float:
81 return max(0.0, self.limit - self.current_spend)
83 @property
84 def pct_used(self) -> float:
85 return (self.current_spend / self.limit * 100) if self.limit > 0 else 0.0
87 @property
88 def exceeded(self) -> bool:
89 return self.current_spend >= self.limit
91 @property
92 def should_alert(self) -> bool:
93 return self.pct_used >= self.alert_threshold * 100
96# ── Default Pricing (as of 2025-07) ────────────────────────────────
98DEFAULT_PRICING: dict[str, TokenPricing] = {
99 # OpenAI
100 "gpt-4o": TokenPricing(ProviderPricing.OPENAI, "gpt-4o", 2.50, 10.00),
101 "gpt-4o-mini": TokenPricing(ProviderPricing.OPENAI, "gpt-4o-mini", 0.15, 0.60),
102 "gpt-4-turbo": TokenPricing(ProviderPricing.OPENAI, "gpt-4-turbo", 10.00, 30.00),
103 "gpt-3.5-turbo": TokenPricing(ProviderPricing.OPENAI, "gpt-3.5-turbo", 0.50, 1.50),
104 "o3-mini": TokenPricing(ProviderPricing.OPENAI, "o3-mini", 1.10, 4.40),
105 # Anthropic
106 "claude-3-5-sonnet": TokenPricing(ProviderPricing.ANTHROPIC, "claude-3-5-sonnet", 3.00, 15.00,
107 cache_write_price_per_1m=3.75, cache_read_price_per_1m=0.30),
108 "claude-3-haiku": TokenPricing(ProviderPricing.ANTHROPIC, "claude-3-haiku", 0.25, 1.25),
109 "claude-3-opus": TokenPricing(ProviderPricing.ANTHROPIC, "claude-3-opus", 15.00, 75.00),
110 # Google
111 "gemini-2.0-flash": TokenPricing(ProviderPricing.GOOGLE, "gemini-2.0-flash", 0.10, 0.40),
112 "gemini-2.0-pro": TokenPricing(ProviderPricing.GOOGLE, "gemini-2.0-pro", 1.25, 5.00),
113 "gemini-1.5-pro": TokenPricing(ProviderPricing.GOOGLE, "gemini-1.5-pro", 1.25, 5.00),
114 # DeepSeek
115 "deepseek-chat": TokenPricing(ProviderPricing.DEEPSEEK, "deepseek-chat", 0.27, 1.10),
116 "deepseek-reasoner": TokenPricing(ProviderPricing.DEEPSEEK, "deepseek-reasoner", 0.55, 2.19),
117 # Groq
118 "llama-3.3-70b": TokenPricing(ProviderPricing.GROQ, "llama-3.3-70b", 0.59, 0.79),
119 "mixtral-8x7b": TokenPricing(ProviderPricing.GROQ, "mixtral-8x7b", 0.24, 0.24),
120 "gemma2-9b-it": TokenPricing(ProviderPricing.GROQ, "gemma2-9b-it", 0.20, 0.20),
121}
124# ── Token Counter (heuristic-based, provider-agnostic) ────────────
126class TokenCounter:
127 """Approximate token counter based on word count + code heuristics.
129 For exact counts, use provider-specific tokenizers (tiktoken, etc.).
130 This provides fast, offline estimates within ~10% accuracy.
131 """
133 # Rough tokens-per-word ratios (language-dependent)
134 TOKENS_PER_WORD: dict[str, float] = {
135 "en": 1.3, # ~4 chars/token for English
136 "zh": 0.5, # ~2 chars/token for Chinese (character-based)
137 "ja": 0.6,
138 "ko": 0.6,
139 "code": 0.7, # Code tends to be denser in tokens per word
140 "default": 1.0,
141 }
143 @classmethod
144 def count(cls, text: str, source: str = "default") -> int:
145 """Estimate token count."""
146 if not text:
147 return 0
149 ratio = cls.TOKENS_PER_WORD.get(source, cls.TOKENS_PER_WORD["default"])
150 chars = len(text)
152 # For Chinese (high CJK ratio), use character-based estimation
153 cjk_chars = sum(1 for c in text if '\u4e00' <= c <= '\u9fff'
154 or '\u3040' <= c <= '\u30ff')
155 cjk_ratio = cjk_chars / max(chars, 1)
157 if cjk_ratio > 0.3:
158 # Mostly Chinese/Japanese — use CJK character ratio
159 non_cjk = chars - cjk_chars
160 return int(cjk_chars * cls.TOKENS_PER_WORD["zh"] + non_cjk * 0.25)
162 if source == "code" or cls._is_code(text):
163 ratio = cls.TOKENS_PER_WORD["code"]
165 words = len(text.split())
166 return max(1, int(words * ratio))
168 @staticmethod
169 def _is_code(text: str) -> bool:
170 """Heuristic: detect if text is code."""
171 code_indicators = ["def ", "class ", "import ", "from ", "function",
172 "const ", "let ", "var ", "{", "}", "=>", "return "]
173 count = sum(1 for ind in code_indicators if ind in text)
174 return count >= 3
177# ── Cost Tracker ───────────────────────────────────────────────────
179class CostTracker:
180 """Track token usage and costs across all provider calls.
182 Usage:
183 tracker = CostTracker()
184 tracker.record("gpt-4o", input_tokens=500, output_tokens=200)
185 tracker.record("claude-3-5-sonnet", input_tokens=1000, output_tokens=500)
186 report = tracker.report()
187 """
189 def __init__(
190 self,
191 custom_pricing: dict[str, TokenPricing] | None = None,
192 budgets: list[Budget] | None = None,
193 ):
194 self.pricing: dict[str, TokenPricing] = {**DEFAULT_PRICING}
195 if custom_pricing:
196 self.pricing.update(custom_pricing)
198 self.budgets: dict[str, Budget] = {}
199 if budgets:
200 for b in budgets:
201 self.budgets[b.name] = b
203 self.usage_log: list[TokenUsage] = []
204 self._model_totals: dict[str, dict[str, float]] = defaultdict(
205 lambda: {"input_tokens": 0, "output_tokens": 0, "cost": 0.0, "calls": 0}
206 )
208 def get_price(self, model: str) -> TokenPricing:
209 """Get pricing for a model. Falls back to default if unknown."""
210 if model in self.pricing:
211 return self.pricing[model]
213 # Best-effort fallback based on model name
214 if "gpt-4" in model:
215 return TokenPricing(ProviderPricing.OPENAI, model, 2.50, 10.00)
216 if "gpt-3" in model:
217 return TokenPricing(ProviderPricing.OPENAI, model, 0.50, 1.50)
218 if "claude" in model:
219 return TokenPricing(ProviderPricing.ANTHROPIC, model, 3.00, 15.00)
220 if "gemini" in model:
221 return TokenPricing(ProviderPricing.GOOGLE, model, 0.10, 0.40)
222 if "deepseek" in model:
223 return TokenPricing(ProviderPricing.DEEPSEEK, model, 0.27, 1.10)
224 if any(m in model for m in ["llama", "mixtral", "gemma"]):
225 return TokenPricing(ProviderPricing.GROQ, model, 0.20, 0.20)
227 return TokenPricing(ProviderPricing.CUSTOM, model, 1.00, 1.00)
229 def record(
230 self,
231 model: str,
232 input_tokens: int = 0,
233 output_tokens: int = 0,
234 cache_write_tokens: int = 0,
235 cache_read_tokens: int = 0,
236 latency_ms: float = 0.0,
237 ) -> TokenUsage:
238 """Record a token usage event. Returns the TokenUsage with cost."""
239 pricing = self.get_price(model)
240 cost = pricing.cost(input_tokens, output_tokens, cache_write_tokens, cache_read_tokens)
242 usage = TokenUsage(
243 model=model,
244 input_tokens=input_tokens,
245 output_tokens=output_tokens,
246 cache_write_tokens=cache_write_tokens,
247 cache_read_tokens=cache_read_tokens,
248 cost=cost,
249 latency_ms=latency_ms,
250 )
251 self.usage_log.append(usage)
253 # Update model totals
254 mt = self._model_totals[model]
255 mt["input_tokens"] += input_tokens
256 mt["output_tokens"] += output_tokens
257 mt["cost"] += cost
258 mt["calls"] += 1
260 # Update budgets
261 for budget in self.budgets.values():
262 budget.current_spend += cost
264 return usage
266 def check_budget(self) -> list[str]:
267 """Check all budgets. Returns list of alert messages."""
268 alerts = []
269 for budget in self.budgets.values():
270 if budget.exceeded and budget.hard_stop:
271 alerts.append(f"BUDGET EXCEEDED: {budget.name} (${budget.current_spend:.2f}/${budget.limit:.2f})")
272 elif budget.should_alert:
273 alerts.append(f"Budget alert: {budget.name} at {budget.pct_used:.0f}% (${budget.current_spend:.2f}/${budget.limit:.2f})")
274 return alerts
276 def report(self) -> str:
277 """Generate a human-readable cost report."""
278 total_cost = sum(u.cost for u in self.usage_log)
279 total_tokens = sum(u.total_tokens for u in self.usage_log)
280 total_calls = len(self.usage_log)
282 lines = [
283 f"╔══ Cost Report ══╗",
284 f"║ Total calls: {total_calls}",
285 f"║ Total tokens: {total_tokens:,}",
286 f"║ Total cost: ${total_cost:.4f}",
287 f"╚════════════════╝",
288 "",
289 "By model:",
290 ]
291 for model, totals in sorted(self._model_totals.items(), key=lambda x: -x[1]["cost"]):
292 lines.append(
293 f" {model:<30} {totals['calls']:>4} calls "
294 f"{totals['input_tokens']+totals['output_tokens']:>12,} tokens "
295 f"${totals['cost']:>8.4f}"
296 )
298 if self.budgets:
299 lines.append("\nBudgets:")
300 for budget in self.budgets.values():
301 status = "EXCEEDED" if budget.exceeded else "OK"
302 lines.append(
303 f" {budget.name:<20} ${budget.current_spend:.2f}/${budget.limit:.2f} "
304 f"({budget.pct_used:.0f}%) [{status}]"
305 )
307 return "\n".join(lines)
309 def report_dict(self) -> dict[str, Any]:
310 """Generate a machine-readable cost report."""
311 return {
312 "total_calls": len(self.usage_log),
313 "total_tokens": sum(u.total_tokens for u in self.usage_log),
314 "total_cost": sum(u.cost for u in self.usage_log),
315 "by_model": {
316 model: dict(totals)
317 for model, totals in self._model_totals.items()
318 },
319 "recent": [
320 {
321 "model": u.model,
322 "input_tokens": u.input_tokens,
323 "output_tokens": u.output_tokens,
324 "cost": u.cost,
325 "timestamp": u.timestamp,
326 }
327 for u in self.usage_log[-20:] # Last 20 calls
328 ],
329 }
331 def reset(self) -> None:
332 """Reset all counters (keeps pricing and budgets)."""
333 self.usage_log.clear()
334 self._model_totals.clear()
335 for budget in self.budgets.values():
336 budget.current_spend = 0.0
338 def set_budget(self, name: str, limit: float, hard_stop: bool = False) -> Budget:
339 """Create or update a budget."""
340 budget = Budget(name=name, limit=limit, hard_stop=hard_stop)
341 self.budgets[name] = budget
342 return budget
345# ── Backward Compatibility Aliases (v1.2.7-) ──────────────────────
346# Old names → new equivalents
347RunCostSession = CostTracker # CostTracker was RunCostSession
348ModelPricing = TokenPricing # ModelPricing → TokenPricing
349UsageRecord = TokenUsage # UsageRecord → TokenUsage
350PRICING = DEFAULT_PRICING # PRICING → DEFAULT_PRICING