heaven_base.utils.token_counter
Token counting utilities for precise context management.
Provides accurate token counting for OpenAI models using tiktoken, with fallback approximations for other models.
1""" 2Token counting utilities for precise context management. 3 4Provides accurate token counting for OpenAI models using tiktoken, 5with fallback approximations for other models. 6""" 7 8import tiktoken 9from typing import Optional, List, Union 10from langchain_core.messages import BaseMessage 11 12 13def count_tokens( 14 text: str, 15 model: str = "gpt-4o-mini" 16) -> int: 17 """ 18 Count tokens in text using tiktoken for OpenAI models. 19 20 Args: 21 text: Text to count tokens for 22 model: Model name for encoding selection 23 24 Returns: 25 int: Number of tokens in the text 26 """ 27 try: 28 # Map model names to tiktoken encodings 29 if "gpt-4" in model.lower(): 30 encoding = tiktoken.encoding_for_model("gpt-4") 31 elif "gpt-3.5" in model.lower(): 32 encoding = tiktoken.encoding_for_model("gpt-3.5-turbo") 33 elif "o1" in model.lower() or "o3" in model.lower() or "o4" in model.lower(): 34 # Use gpt-4 encoding for o-series models 35 encoding = tiktoken.encoding_for_model("gpt-4") 36 else: 37 # Default to gpt-4 encoding for other models (Claude, Gemini, etc.) 38 encoding = tiktoken.encoding_for_model("gpt-4") 39 40 return len(encoding.encode(text)) 41 42 except Exception as e: 43 # Fallback to rough approximation if tiktoken fails 44 # OpenAI's rule of thumb: ~4 characters per token 45 return max(1, len(text) // 4) 46 47 48def count_tokens_in_messages( 49 messages: List[Union[BaseMessage, dict]], 50 model: str = "gpt-4o-mini" 51) -> int: 52 """ 53 Count tokens in a list of messages. 54 55 Args: 56 messages: List of LangChain messages or dict messages 57 model: Model name for encoding selection 58 59 Returns: 60 int: Total number of tokens in all messages 61 """ 62 total_tokens = 0 63 64 for message in messages: 65 if isinstance(message, BaseMessage): 66 # LangChain message 67 text = str(message.content) 68 elif isinstance(message, dict): 69 # Dict message (uni-api format) 70 text = message.get("content", "") 71 else: 72 # Fallback: convert to string 73 text = str(message) 74 75 total_tokens += count_tokens(text, model) 76 77 return total_tokens 78 79 80def estimate_tokens_for_model(text: str, model: str) -> int: 81 """ 82 Estimate tokens for different model families with family-specific approximations. 83 84 Args: 85 text: Text to estimate tokens for 86 model: Model name 87 88 Returns: 89 int: Estimated number of tokens 90 """ 91 model_lower = model.lower() 92 93 if "gpt" in model_lower or "openai" in model_lower: 94 # Use tiktoken for OpenAI models 95 return count_tokens(text, model) 96 elif "claude" in model_lower: 97 # Claude tokenization is similar to GPT-4 but slightly different 98 # Rough approximation: 3.8 chars per token 99 return max(1, len(text) // 4) 100 elif "gemini" in model_lower: 101 # Gemini has different tokenization 102 # Rough approximation: 4.2 chars per token 103 return max(1, len(text) // 4) 104 else: 105 # Generic fallback 106 return max(1, len(text) // 4) 107 108 109class TokenBudget: 110 """ 111 Utility class for managing token budgets across conversation sections. 112 """ 113 114 def __init__(self, total_budget: int, model: str = "gpt-4o-mini"): 115 self.total_budget = total_budget 116 self.model = model 117 self.allocations = {} 118 self.used_tokens = {} 119 120 def allocate(self, section: str, tokens: int) -> None: 121 """Allocate tokens to a section.""" 122 self.allocations[section] = tokens 123 if section not in self.used_tokens: 124 self.used_tokens[section] = 0 125 126 def use_tokens(self, section: str, text: str) -> bool: 127 """ 128 Use tokens for a section. Returns True if within budget. 129 130 Args: 131 section: Section name 132 text: Text to count tokens for 133 134 Returns: 135 bool: True if within section budget, False otherwise 136 """ 137 tokens_needed = count_tokens(text, self.model) 138 139 if section not in self.allocations: 140 return False 141 142 if self.used_tokens.get(section, 0) + tokens_needed <= self.allocations[section]: 143 self.used_tokens[section] = self.used_tokens.get(section, 0) + tokens_needed 144 return True 145 146 return False 147 148 def get_remaining(self, section: str) -> int: 149 """Get remaining tokens for a section.""" 150 allocated = self.allocations.get(section, 0) 151 used = self.used_tokens.get(section, 0) 152 return max(0, allocated - used) 153 154 def get_total_used(self) -> int: 155 """Get total tokens used across all sections.""" 156 return sum(self.used_tokens.values()) 157 158 def get_total_remaining(self) -> int: 159 """Get total remaining budget.""" 160 return max(0, self.total_budget - self.get_total_used()) 161 162 def reset_section(self, section: str) -> None: 163 """Reset usage for a section.""" 164 self.used_tokens[section] = 0 165 166 def get_budget_summary(self) -> dict: 167 """Get a summary of budget usage.""" 168 return { 169 "total_budget": self.total_budget, 170 "total_used": self.get_total_used(), 171 "total_remaining": self.get_total_remaining(), 172 "sections": { 173 section: { 174 "allocated": self.allocations.get(section, 0), 175 "used": self.used_tokens.get(section, 0), 176 "remaining": self.get_remaining(section) 177 } 178 for section in set(list(self.allocations.keys()) + list(self.used_tokens.keys())) 179 } 180 } 181 182 183# Convenience functions for common use cases 184def count_conversation_tokens(messages: List[Union[BaseMessage, dict]], model: str = "gpt-4o-mini") -> int: 185 """Count tokens in a full conversation.""" 186 return count_tokens_in_messages(messages, model) 187 188 189def check_token_limit(text: str, limit: int, model: str = "gpt-4o-mini") -> bool: 190 """Check if text is within token limit.""" 191 return count_tokens(text, model) <= limit 192 193 194def truncate_to_token_limit(text: str, limit: int, model: str = "gpt-4o-mini") -> str: 195 """ 196 Truncate text to fit within token limit. 197 198 This is a rough approximation - for precise truncation, 199 you'd need to decode tokens back to text. 200 """ 201 current_tokens = count_tokens(text, model) 202 203 if current_tokens <= limit: 204 return text 205 206 # Rough truncation based on character ratio 207 char_ratio = len(text) / current_tokens 208 target_chars = int(limit * char_ratio * 0.9) # 10% safety margin 209 210 return text[:target_chars] + "..." 211 212 213if __name__ == "__main__": 214 # Example usage 215 test_text = "Hello world! This is a test of the token counting system." 216 217 print(f"Text: {test_text}") 218 print(f"Tokens (GPT-4): {count_tokens(test_text, 'gpt-4o-mini')}") 219 print(f"Tokens (Claude): {estimate_tokens_for_model(test_text, 'claude-3-5-sonnet')}") 220 221 # Test budget system 222 budget = TokenBudget(1000, "gpt-4o-mini") 223 budget.allocate("summary", 300) 224 budget.allocate("history", 600) 225 budget.allocate("prompt", 100) 226 227 print(f"\nBudget summary: {budget.get_budget_summary()}")
14def count_tokens( 15 text: str, 16 model: str = "gpt-4o-mini" 17) -> int: 18 """ 19 Count tokens in text using tiktoken for OpenAI models. 20 21 Args: 22 text: Text to count tokens for 23 model: Model name for encoding selection 24 25 Returns: 26 int: Number of tokens in the text 27 """ 28 try: 29 # Map model names to tiktoken encodings 30 if "gpt-4" in model.lower(): 31 encoding = tiktoken.encoding_for_model("gpt-4") 32 elif "gpt-3.5" in model.lower(): 33 encoding = tiktoken.encoding_for_model("gpt-3.5-turbo") 34 elif "o1" in model.lower() or "o3" in model.lower() or "o4" in model.lower(): 35 # Use gpt-4 encoding for o-series models 36 encoding = tiktoken.encoding_for_model("gpt-4") 37 else: 38 # Default to gpt-4 encoding for other models (Claude, Gemini, etc.) 39 encoding = tiktoken.encoding_for_model("gpt-4") 40 41 return len(encoding.encode(text)) 42 43 except Exception as e: 44 # Fallback to rough approximation if tiktoken fails 45 # OpenAI's rule of thumb: ~4 characters per token 46 return max(1, len(text) // 4)
Count tokens in text using tiktoken for OpenAI models.
Args: text: Text to count tokens for model: Model name for encoding selection
Returns: int: Number of tokens in the text
49def count_tokens_in_messages( 50 messages: List[Union[BaseMessage, dict]], 51 model: str = "gpt-4o-mini" 52) -> int: 53 """ 54 Count tokens in a list of messages. 55 56 Args: 57 messages: List of LangChain messages or dict messages 58 model: Model name for encoding selection 59 60 Returns: 61 int: Total number of tokens in all messages 62 """ 63 total_tokens = 0 64 65 for message in messages: 66 if isinstance(message, BaseMessage): 67 # LangChain message 68 text = str(message.content) 69 elif isinstance(message, dict): 70 # Dict message (uni-api format) 71 text = message.get("content", "") 72 else: 73 # Fallback: convert to string 74 text = str(message) 75 76 total_tokens += count_tokens(text, model) 77 78 return total_tokens
Count tokens in a list of messages.
Args: messages: List of LangChain messages or dict messages model: Model name for encoding selection
Returns: int: Total number of tokens in all messages
81def estimate_tokens_for_model(text: str, model: str) -> int: 82 """ 83 Estimate tokens for different model families with family-specific approximations. 84 85 Args: 86 text: Text to estimate tokens for 87 model: Model name 88 89 Returns: 90 int: Estimated number of tokens 91 """ 92 model_lower = model.lower() 93 94 if "gpt" in model_lower or "openai" in model_lower: 95 # Use tiktoken for OpenAI models 96 return count_tokens(text, model) 97 elif "claude" in model_lower: 98 # Claude tokenization is similar to GPT-4 but slightly different 99 # Rough approximation: 3.8 chars per token 100 return max(1, len(text) // 4) 101 elif "gemini" in model_lower: 102 # Gemini has different tokenization 103 # Rough approximation: 4.2 chars per token 104 return max(1, len(text) // 4) 105 else: 106 # Generic fallback 107 return max(1, len(text) // 4)
Estimate tokens for different model families with family-specific approximations.
Args: text: Text to estimate tokens for model: Model name
Returns: int: Estimated number of tokens
110class TokenBudget: 111 """ 112 Utility class for managing token budgets across conversation sections. 113 """ 114 115 def __init__(self, total_budget: int, model: str = "gpt-4o-mini"): 116 self.total_budget = total_budget 117 self.model = model 118 self.allocations = {} 119 self.used_tokens = {} 120 121 def allocate(self, section: str, tokens: int) -> None: 122 """Allocate tokens to a section.""" 123 self.allocations[section] = tokens 124 if section not in self.used_tokens: 125 self.used_tokens[section] = 0 126 127 def use_tokens(self, section: str, text: str) -> bool: 128 """ 129 Use tokens for a section. Returns True if within budget. 130 131 Args: 132 section: Section name 133 text: Text to count tokens for 134 135 Returns: 136 bool: True if within section budget, False otherwise 137 """ 138 tokens_needed = count_tokens(text, self.model) 139 140 if section not in self.allocations: 141 return False 142 143 if self.used_tokens.get(section, 0) + tokens_needed <= self.allocations[section]: 144 self.used_tokens[section] = self.used_tokens.get(section, 0) + tokens_needed 145 return True 146 147 return False 148 149 def get_remaining(self, section: str) -> int: 150 """Get remaining tokens for a section.""" 151 allocated = self.allocations.get(section, 0) 152 used = self.used_tokens.get(section, 0) 153 return max(0, allocated - used) 154 155 def get_total_used(self) -> int: 156 """Get total tokens used across all sections.""" 157 return sum(self.used_tokens.values()) 158 159 def get_total_remaining(self) -> int: 160 """Get total remaining budget.""" 161 return max(0, self.total_budget - self.get_total_used()) 162 163 def reset_section(self, section: str) -> None: 164 """Reset usage for a section.""" 165 self.used_tokens[section] = 0 166 167 def get_budget_summary(self) -> dict: 168 """Get a summary of budget usage.""" 169 return { 170 "total_budget": self.total_budget, 171 "total_used": self.get_total_used(), 172 "total_remaining": self.get_total_remaining(), 173 "sections": { 174 section: { 175 "allocated": self.allocations.get(section, 0), 176 "used": self.used_tokens.get(section, 0), 177 "remaining": self.get_remaining(section) 178 } 179 for section in set(list(self.allocations.keys()) + list(self.used_tokens.keys())) 180 } 181 }
Utility class for managing token budgets across conversation sections.
121 def allocate(self, section: str, tokens: int) -> None: 122 """Allocate tokens to a section.""" 123 self.allocations[section] = tokens 124 if section not in self.used_tokens: 125 self.used_tokens[section] = 0
Allocate tokens to a section.
127 def use_tokens(self, section: str, text: str) -> bool: 128 """ 129 Use tokens for a section. Returns True if within budget. 130 131 Args: 132 section: Section name 133 text: Text to count tokens for 134 135 Returns: 136 bool: True if within section budget, False otherwise 137 """ 138 tokens_needed = count_tokens(text, self.model) 139 140 if section not in self.allocations: 141 return False 142 143 if self.used_tokens.get(section, 0) + tokens_needed <= self.allocations[section]: 144 self.used_tokens[section] = self.used_tokens.get(section, 0) + tokens_needed 145 return True 146 147 return False
Use tokens for a section. Returns True if within budget.
Args: section: Section name text: Text to count tokens for
Returns: bool: True if within section budget, False otherwise
149 def get_remaining(self, section: str) -> int: 150 """Get remaining tokens for a section.""" 151 allocated = self.allocations.get(section, 0) 152 used = self.used_tokens.get(section, 0) 153 return max(0, allocated - used)
Get remaining tokens for a section.
155 def get_total_used(self) -> int: 156 """Get total tokens used across all sections.""" 157 return sum(self.used_tokens.values())
Get total tokens used across all sections.
159 def get_total_remaining(self) -> int: 160 """Get total remaining budget.""" 161 return max(0, self.total_budget - self.get_total_used())
Get total remaining budget.
163 def reset_section(self, section: str) -> None: 164 """Reset usage for a section.""" 165 self.used_tokens[section] = 0
Reset usage for a section.
167 def get_budget_summary(self) -> dict: 168 """Get a summary of budget usage.""" 169 return { 170 "total_budget": self.total_budget, 171 "total_used": self.get_total_used(), 172 "total_remaining": self.get_total_remaining(), 173 "sections": { 174 section: { 175 "allocated": self.allocations.get(section, 0), 176 "used": self.used_tokens.get(section, 0), 177 "remaining": self.get_remaining(section) 178 } 179 for section in set(list(self.allocations.keys()) + list(self.used_tokens.keys())) 180 } 181 }
Get a summary of budget usage.
185def count_conversation_tokens(messages: List[Union[BaseMessage, dict]], model: str = "gpt-4o-mini") -> int: 186 """Count tokens in a full conversation.""" 187 return count_tokens_in_messages(messages, model)
Count tokens in a full conversation.
190def check_token_limit(text: str, limit: int, model: str = "gpt-4o-mini") -> bool: 191 """Check if text is within token limit.""" 192 return count_tokens(text, model) <= limit
Check if text is within token limit.
195def truncate_to_token_limit(text: str, limit: int, model: str = "gpt-4o-mini") -> str: 196 """ 197 Truncate text to fit within token limit. 198 199 This is a rough approximation - for precise truncation, 200 you'd need to decode tokens back to text. 201 """ 202 current_tokens = count_tokens(text, model) 203 204 if current_tokens <= limit: 205 return text 206 207 # Rough truncation based on character ratio 208 char_ratio = len(text) / current_tokens 209 target_chars = int(limit * char_ratio * 0.9) # 10% safety margin 210 211 return text[:target_chars] + "..."
Truncate text to fit within token limit.
This is a rough approximation - for precise truncation, you'd need to decode tokens back to text.