Coverage for agentos/cost/tracker.py: 45%

159 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2v1.10.0: Cost Tracker — token counting & pricing across all providers. 

3 

4Tracks token usage and cost for: OpenAI, Anthropic, Google, DeepSeek, Groq. 

5Features: per-request tracking, budget management, usage reporting. 

6""" 

7 

8from __future__ import annotations 

9 

10import time 

11from collections import defaultdict 

12from dataclasses import dataclass, field 

13from datetime import datetime, timezone 

14from enum import Enum 

15from typing import Any 

16 

17 

18# ── Data Classes ────────────────────────────────────────────────── 

19 

20class ProviderPricing(str, Enum): 

21 OPENAI = "openai" 

22 ANTHROPIC = "anthropic" 

23 GOOGLE = "google" 

24 DEEPSEEK = "deepseek" 

25 GROQ = "groq" 

26 CUSTOM = "custom" 

27 

28 

29@dataclass 

30class TokenPricing: 

31 """Pricing per 1M tokens (input/output).""" 

32 provider: ProviderPricing 

33 model: str 

34 input_price_per_1m: float # USD per 1M input tokens 

35 output_price_per_1m: float # USD per 1M output tokens 

36 cache_write_price_per_1m: float = 0.0 

37 cache_read_price_per_1m: float = 0.0 

38 

39 def cost(self, input_tokens: int, output_tokens: int, 

40 cache_write: int = 0, cache_read: int = 0) -> float: 

41 return ( 

42 (input_tokens / 1_000_000) * self.input_price_per_1m 

43 + (output_tokens / 1_000_000) * self.output_price_per_1m 

44 + (cache_write / 1_000_000) * self.cache_write_price_per_1m 

45 + (cache_read / 1_000_000) * self.cache_read_price_per_1m 

46 ) 

47 

48 

49@dataclass 

50class TokenUsage: 

51 """Token usage for a single API call.""" 

52 model: str 

53 input_tokens: int = 0 

54 output_tokens: int = 0 

55 cache_write_tokens: int = 0 

56 cache_read_tokens: int = 0 

57 total_tokens: int = 0 

58 cost: float = 0.0 

59 latency_ms: float = 0.0 

60 timestamp: str = "" 

61 

62 def __post_init__(self): 

63 if not self.total_tokens: 

64 self.total_tokens = self.input_tokens + self.output_tokens 

65 if not self.timestamp: 

66 self.timestamp = datetime.now(timezone.utc).isoformat() 

67 

68 

69@dataclass 

70class Budget: 

71 """Spending budget configuration.""" 

72 name: str 

73 limit: float # USD 

74 period: str = "monthly" # daily / weekly / monthly / total 

75 current_spend: float = 0.0 

76 alert_threshold: float = 0.8 # Alert at 80% of limit 

77 hard_stop: bool = False # Block requests when exceeded 

78 

79 @property 

80 def remaining(self) -> float: 

81 return max(0.0, self.limit - self.current_spend) 

82 

83 @property 

84 def pct_used(self) -> float: 

85 return (self.current_spend / self.limit * 100) if self.limit > 0 else 0.0 

86 

87 @property 

88 def exceeded(self) -> bool: 

89 return self.current_spend >= self.limit 

90 

91 @property 

92 def should_alert(self) -> bool: 

93 return self.pct_used >= self.alert_threshold * 100 

94 

95 

96# ── Default Pricing (as of 2025-07) ──────────────────────────────── 

97 

98DEFAULT_PRICING: dict[str, TokenPricing] = { 

99 # OpenAI 

100 "gpt-4o": TokenPricing(ProviderPricing.OPENAI, "gpt-4o", 2.50, 10.00), 

101 "gpt-4o-mini": TokenPricing(ProviderPricing.OPENAI, "gpt-4o-mini", 0.15, 0.60), 

102 "gpt-4-turbo": TokenPricing(ProviderPricing.OPENAI, "gpt-4-turbo", 10.00, 30.00), 

103 "gpt-3.5-turbo": TokenPricing(ProviderPricing.OPENAI, "gpt-3.5-turbo", 0.50, 1.50), 

104 "o3-mini": TokenPricing(ProviderPricing.OPENAI, "o3-mini", 1.10, 4.40), 

105 # Anthropic 

106 "claude-3-5-sonnet": TokenPricing(ProviderPricing.ANTHROPIC, "claude-3-5-sonnet", 3.00, 15.00, 

107 cache_write_price_per_1m=3.75, cache_read_price_per_1m=0.30), 

108 "claude-3-haiku": TokenPricing(ProviderPricing.ANTHROPIC, "claude-3-haiku", 0.25, 1.25), 

109 "claude-3-opus": TokenPricing(ProviderPricing.ANTHROPIC, "claude-3-opus", 15.00, 75.00), 

110 # Google 

111 "gemini-2.0-flash": TokenPricing(ProviderPricing.GOOGLE, "gemini-2.0-flash", 0.10, 0.40), 

112 "gemini-2.0-pro": TokenPricing(ProviderPricing.GOOGLE, "gemini-2.0-pro", 1.25, 5.00), 

113 "gemini-1.5-pro": TokenPricing(ProviderPricing.GOOGLE, "gemini-1.5-pro", 1.25, 5.00), 

114 # DeepSeek 

115 "deepseek-chat": TokenPricing(ProviderPricing.DEEPSEEK, "deepseek-chat", 0.27, 1.10), 

116 "deepseek-reasoner": TokenPricing(ProviderPricing.DEEPSEEK, "deepseek-reasoner", 0.55, 2.19), 

117 # Groq 

118 "llama-3.3-70b": TokenPricing(ProviderPricing.GROQ, "llama-3.3-70b", 0.59, 0.79), 

119 "mixtral-8x7b": TokenPricing(ProviderPricing.GROQ, "mixtral-8x7b", 0.24, 0.24), 

120 "gemma2-9b-it": TokenPricing(ProviderPricing.GROQ, "gemma2-9b-it", 0.20, 0.20), 

121} 

122 

123 

124# ── Token Counter (heuristic-based, provider-agnostic) ──────────── 

125 

126class TokenCounter: 

127 """Approximate token counter based on word count + code heuristics. 

128 

129 For exact counts, use provider-specific tokenizers (tiktoken, etc.). 

130 This provides fast, offline estimates within ~10% accuracy. 

131 """ 

132 

133 # Rough tokens-per-word ratios (language-dependent) 

134 TOKENS_PER_WORD: dict[str, float] = { 

135 "en": 1.3, # ~4 chars/token for English 

136 "zh": 0.5, # ~2 chars/token for Chinese (character-based) 

137 "ja": 0.6, 

138 "ko": 0.6, 

139 "code": 0.7, # Code tends to be denser in tokens per word 

140 "default": 1.0, 

141 } 

142 

143 @classmethod 

144 def count(cls, text: str, source: str = "default") -> int: 

145 """Estimate token count.""" 

146 if not text: 

147 return 0 

148 

149 ratio = cls.TOKENS_PER_WORD.get(source, cls.TOKENS_PER_WORD["default"]) 

150 chars = len(text) 

151 

152 # For Chinese (high CJK ratio), use character-based estimation 

153 cjk_chars = sum(1 for c in text if '\u4e00' <= c <= '\u9fff' 

154 or '\u3040' <= c <= '\u30ff') 

155 cjk_ratio = cjk_chars / max(chars, 1) 

156 

157 if cjk_ratio > 0.3: 

158 # Mostly Chinese/Japanese — use CJK character ratio 

159 non_cjk = chars - cjk_chars 

160 return int(cjk_chars * cls.TOKENS_PER_WORD["zh"] + non_cjk * 0.25) 

161 

162 if source == "code" or cls._is_code(text): 

163 ratio = cls.TOKENS_PER_WORD["code"] 

164 

165 words = len(text.split()) 

166 return max(1, int(words * ratio)) 

167 

168 @staticmethod 

169 def _is_code(text: str) -> bool: 

170 """Heuristic: detect if text is code.""" 

171 code_indicators = ["def ", "class ", "import ", "from ", "function", 

172 "const ", "let ", "var ", "{", "}", "=>", "return "] 

173 count = sum(1 for ind in code_indicators if ind in text) 

174 return count >= 3 

175 

176 

177# ── Cost Tracker ─────────────────────────────────────────────────── 

178 

179class CostTracker: 

180 """Track token usage and costs across all provider calls. 

181 

182 Usage: 

183 tracker = CostTracker() 

184 tracker.record("gpt-4o", input_tokens=500, output_tokens=200) 

185 tracker.record("claude-3-5-sonnet", input_tokens=1000, output_tokens=500) 

186 report = tracker.report() 

187 """ 

188 

189 def __init__( 

190 self, 

191 custom_pricing: dict[str, TokenPricing] | None = None, 

192 budgets: list[Budget] | None = None, 

193 ): 

194 self.pricing: dict[str, TokenPricing] = {**DEFAULT_PRICING} 

195 if custom_pricing: 

196 self.pricing.update(custom_pricing) 

197 

198 self.budgets: dict[str, Budget] = {} 

199 if budgets: 

200 for b in budgets: 

201 self.budgets[b.name] = b 

202 

203 self.usage_log: list[TokenUsage] = [] 

204 self._model_totals: dict[str, dict[str, float]] = defaultdict( 

205 lambda: {"input_tokens": 0, "output_tokens": 0, "cost": 0.0, "calls": 0} 

206 ) 

207 

208 def get_price(self, model: str) -> TokenPricing: 

209 """Get pricing for a model. Falls back to default if unknown.""" 

210 if model in self.pricing: 

211 return self.pricing[model] 

212 

213 # Best-effort fallback based on model name 

214 if "gpt-4" in model: 

215 return TokenPricing(ProviderPricing.OPENAI, model, 2.50, 10.00) 

216 if "gpt-3" in model: 

217 return TokenPricing(ProviderPricing.OPENAI, model, 0.50, 1.50) 

218 if "claude" in model: 

219 return TokenPricing(ProviderPricing.ANTHROPIC, model, 3.00, 15.00) 

220 if "gemini" in model: 

221 return TokenPricing(ProviderPricing.GOOGLE, model, 0.10, 0.40) 

222 if "deepseek" in model: 

223 return TokenPricing(ProviderPricing.DEEPSEEK, model, 0.27, 1.10) 

224 if any(m in model for m in ["llama", "mixtral", "gemma"]): 

225 return TokenPricing(ProviderPricing.GROQ, model, 0.20, 0.20) 

226 

227 return TokenPricing(ProviderPricing.CUSTOM, model, 1.00, 1.00) 

228 

229 def record( 

230 self, 

231 model: str, 

232 input_tokens: int = 0, 

233 output_tokens: int = 0, 

234 cache_write_tokens: int = 0, 

235 cache_read_tokens: int = 0, 

236 latency_ms: float = 0.0, 

237 ) -> TokenUsage: 

238 """Record a token usage event. Returns the TokenUsage with cost.""" 

239 pricing = self.get_price(model) 

240 cost = pricing.cost(input_tokens, output_tokens, cache_write_tokens, cache_read_tokens) 

241 

242 usage = TokenUsage( 

243 model=model, 

244 input_tokens=input_tokens, 

245 output_tokens=output_tokens, 

246 cache_write_tokens=cache_write_tokens, 

247 cache_read_tokens=cache_read_tokens, 

248 cost=cost, 

249 latency_ms=latency_ms, 

250 ) 

251 self.usage_log.append(usage) 

252 

253 # Update model totals 

254 mt = self._model_totals[model] 

255 mt["input_tokens"] += input_tokens 

256 mt["output_tokens"] += output_tokens 

257 mt["cost"] += cost 

258 mt["calls"] += 1 

259 

260 # Update budgets 

261 for budget in self.budgets.values(): 

262 budget.current_spend += cost 

263 

264 return usage 

265 

266 def check_budget(self) -> list[str]: 

267 """Check all budgets. Returns list of alert messages.""" 

268 alerts = [] 

269 for budget in self.budgets.values(): 

270 if budget.exceeded and budget.hard_stop: 

271 alerts.append(f"BUDGET EXCEEDED: {budget.name} (${budget.current_spend:.2f}/${budget.limit:.2f})") 

272 elif budget.should_alert: 

273 alerts.append(f"Budget alert: {budget.name} at {budget.pct_used:.0f}% (${budget.current_spend:.2f}/${budget.limit:.2f})") 

274 return alerts 

275 

276 def report(self) -> str: 

277 """Generate a human-readable cost report.""" 

278 total_cost = sum(u.cost for u in self.usage_log) 

279 total_tokens = sum(u.total_tokens for u in self.usage_log) 

280 total_calls = len(self.usage_log) 

281 

282 lines = [ 

283 f"╔══ Cost Report ══╗", 

284 f"║ Total calls: {total_calls}", 

285 f"║ Total tokens: {total_tokens:,}", 

286 f"║ Total cost: ${total_cost:.4f}", 

287 f"╚════════════════╝", 

288 "", 

289 "By model:", 

290 ] 

291 for model, totals in sorted(self._model_totals.items(), key=lambda x: -x[1]["cost"]): 

292 lines.append( 

293 f" {model:<30} {totals['calls']:>4} calls " 

294 f"{totals['input_tokens']+totals['output_tokens']:>12,} tokens " 

295 f"${totals['cost']:>8.4f}" 

296 ) 

297 

298 if self.budgets: 

299 lines.append("\nBudgets:") 

300 for budget in self.budgets.values(): 

301 status = "EXCEEDED" if budget.exceeded else "OK" 

302 lines.append( 

303 f" {budget.name:<20} ${budget.current_spend:.2f}/${budget.limit:.2f} " 

304 f"({budget.pct_used:.0f}%) [{status}]" 

305 ) 

306 

307 return "\n".join(lines) 

308 

309 def report_dict(self) -> dict[str, Any]: 

310 """Generate a machine-readable cost report.""" 

311 return { 

312 "total_calls": len(self.usage_log), 

313 "total_tokens": sum(u.total_tokens for u in self.usage_log), 

314 "total_cost": sum(u.cost for u in self.usage_log), 

315 "by_model": { 

316 model: dict(totals) 

317 for model, totals in self._model_totals.items() 

318 }, 

319 "recent": [ 

320 { 

321 "model": u.model, 

322 "input_tokens": u.input_tokens, 

323 "output_tokens": u.output_tokens, 

324 "cost": u.cost, 

325 "timestamp": u.timestamp, 

326 } 

327 for u in self.usage_log[-20:] # Last 20 calls 

328 ], 

329 } 

330 

331 def reset(self) -> None: 

332 """Reset all counters (keeps pricing and budgets).""" 

333 self.usage_log.clear() 

334 self._model_totals.clear() 

335 for budget in self.budgets.values(): 

336 budget.current_spend = 0.0 

337 

338 def set_budget(self, name: str, limit: float, hard_stop: bool = False) -> Budget: 

339 """Create or update a budget.""" 

340 budget = Budget(name=name, limit=limit, hard_stop=hard_stop) 

341 self.budgets[name] = budget 

342 return budget 

343 

344 

345# ── Backward Compatibility Aliases (v1.2.7-) ────────────────────── 

346# Old names → new equivalents 

347RunCostSession = CostTracker # CostTracker was RunCostSession 

348ModelPricing = TokenPricing # ModelPricing → TokenPricing 

349UsageRecord = TokenUsage # UsageRecord → TokenUsage 

350PRICING = DEFAULT_PRICING # PRICING → DEFAULT_PRICING