heaven_base.utils.token_counter

Token counting utilities for precise context management.

Provides accurate token counting for OpenAI models using tiktoken, with fallback approximations for other models.

  1"""
  2Token counting utilities for precise context management.
  3
  4Provides accurate token counting for OpenAI models using tiktoken,
  5with fallback approximations for other models.
  6"""
  7
  8import tiktoken
  9from typing import Optional, List, Union
 10from langchain_core.messages import BaseMessage
 11
 12
 13def count_tokens(
 14    text: str, 
 15    model: str = "gpt-4o-mini"
 16) -> int:
 17    """
 18    Count tokens in text using tiktoken for OpenAI models.
 19    
 20    Args:
 21        text: Text to count tokens for
 22        model: Model name for encoding selection
 23        
 24    Returns:
 25        int: Number of tokens in the text
 26    """
 27    try:
 28        # Map model names to tiktoken encodings
 29        if "gpt-4" in model.lower():
 30            encoding = tiktoken.encoding_for_model("gpt-4")
 31        elif "gpt-3.5" in model.lower():
 32            encoding = tiktoken.encoding_for_model("gpt-3.5-turbo")
 33        elif "o1" in model.lower() or "o3" in model.lower() or "o4" in model.lower():
 34            # Use gpt-4 encoding for o-series models
 35            encoding = tiktoken.encoding_for_model("gpt-4")
 36        else:
 37            # Default to gpt-4 encoding for other models (Claude, Gemini, etc.)
 38            encoding = tiktoken.encoding_for_model("gpt-4")
 39        
 40        return len(encoding.encode(text))
 41        
 42    except Exception as e:
 43        # Fallback to rough approximation if tiktoken fails
 44        # OpenAI's rule of thumb: ~4 characters per token
 45        return max(1, len(text) // 4)
 46
 47
 48def count_tokens_in_messages(
 49    messages: List[Union[BaseMessage, dict]], 
 50    model: str = "gpt-4o-mini"
 51) -> int:
 52    """
 53    Count tokens in a list of messages.
 54    
 55    Args:
 56        messages: List of LangChain messages or dict messages
 57        model: Model name for encoding selection
 58        
 59    Returns:
 60        int: Total number of tokens in all messages
 61    """
 62    total_tokens = 0
 63    
 64    for message in messages:
 65        if isinstance(message, BaseMessage):
 66            # LangChain message
 67            text = str(message.content)
 68        elif isinstance(message, dict):
 69            # Dict message (uni-api format)
 70            text = message.get("content", "")
 71        else:
 72            # Fallback: convert to string
 73            text = str(message)
 74        
 75        total_tokens += count_tokens(text, model)
 76    
 77    return total_tokens
 78
 79
 80def estimate_tokens_for_model(text: str, model: str) -> int:
 81    """
 82    Estimate tokens for different model families with family-specific approximations.
 83    
 84    Args:
 85        text: Text to estimate tokens for
 86        model: Model name
 87        
 88    Returns:
 89        int: Estimated number of tokens
 90    """
 91    model_lower = model.lower()
 92    
 93    if "gpt" in model_lower or "openai" in model_lower:
 94        # Use tiktoken for OpenAI models
 95        return count_tokens(text, model)
 96    elif "claude" in model_lower:
 97        # Claude tokenization is similar to GPT-4 but slightly different
 98        # Rough approximation: 3.8 chars per token
 99        return max(1, len(text) // 4)
100    elif "gemini" in model_lower:
101        # Gemini has different tokenization
102        # Rough approximation: 4.2 chars per token  
103        return max(1, len(text) // 4)
104    else:
105        # Generic fallback
106        return max(1, len(text) // 4)
107
108
109class TokenBudget:
110    """
111    Utility class for managing token budgets across conversation sections.
112    """
113    
114    def __init__(self, total_budget: int, model: str = "gpt-4o-mini"):
115        self.total_budget = total_budget
116        self.model = model
117        self.allocations = {}
118        self.used_tokens = {}
119    
120    def allocate(self, section: str, tokens: int) -> None:
121        """Allocate tokens to a section."""
122        self.allocations[section] = tokens
123        if section not in self.used_tokens:
124            self.used_tokens[section] = 0
125    
126    def use_tokens(self, section: str, text: str) -> bool:
127        """
128        Use tokens for a section. Returns True if within budget.
129        
130        Args:
131            section: Section name
132            text: Text to count tokens for
133            
134        Returns:
135            bool: True if within section budget, False otherwise
136        """
137        tokens_needed = count_tokens(text, self.model)
138        
139        if section not in self.allocations:
140            return False
141        
142        if self.used_tokens.get(section, 0) + tokens_needed <= self.allocations[section]:
143            self.used_tokens[section] = self.used_tokens.get(section, 0) + tokens_needed
144            return True
145        
146        return False
147    
148    def get_remaining(self, section: str) -> int:
149        """Get remaining tokens for a section."""
150        allocated = self.allocations.get(section, 0)
151        used = self.used_tokens.get(section, 0)
152        return max(0, allocated - used)
153    
154    def get_total_used(self) -> int:
155        """Get total tokens used across all sections."""
156        return sum(self.used_tokens.values())
157    
158    def get_total_remaining(self) -> int:
159        """Get total remaining budget."""
160        return max(0, self.total_budget - self.get_total_used())
161    
162    def reset_section(self, section: str) -> None:
163        """Reset usage for a section."""
164        self.used_tokens[section] = 0
165    
166    def get_budget_summary(self) -> dict:
167        """Get a summary of budget usage."""
168        return {
169            "total_budget": self.total_budget,
170            "total_used": self.get_total_used(),
171            "total_remaining": self.get_total_remaining(),
172            "sections": {
173                section: {
174                    "allocated": self.allocations.get(section, 0),
175                    "used": self.used_tokens.get(section, 0),
176                    "remaining": self.get_remaining(section)
177                }
178                for section in set(list(self.allocations.keys()) + list(self.used_tokens.keys()))
179            }
180        }
181
182
183# Convenience functions for common use cases
184def count_conversation_tokens(messages: List[Union[BaseMessage, dict]], model: str = "gpt-4o-mini") -> int:
185    """Count tokens in a full conversation."""
186    return count_tokens_in_messages(messages, model)
187
188
189def check_token_limit(text: str, limit: int, model: str = "gpt-4o-mini") -> bool:
190    """Check if text is within token limit."""
191    return count_tokens(text, model) <= limit
192
193
194def truncate_to_token_limit(text: str, limit: int, model: str = "gpt-4o-mini") -> str:
195    """
196    Truncate text to fit within token limit.
197    
198    This is a rough approximation - for precise truncation,
199    you'd need to decode tokens back to text.
200    """
201    current_tokens = count_tokens(text, model)
202    
203    if current_tokens <= limit:
204        return text
205    
206    # Rough truncation based on character ratio
207    char_ratio = len(text) / current_tokens
208    target_chars = int(limit * char_ratio * 0.9)  # 10% safety margin
209    
210    return text[:target_chars] + "..."
211
212
213if __name__ == "__main__":
214    # Example usage
215    test_text = "Hello world! This is a test of the token counting system."
216    
217    print(f"Text: {test_text}")
218    print(f"Tokens (GPT-4): {count_tokens(test_text, 'gpt-4o-mini')}")
219    print(f"Tokens (Claude): {estimate_tokens_for_model(test_text, 'claude-3-5-sonnet')}")
220    
221    # Test budget system
222    budget = TokenBudget(1000, "gpt-4o-mini")
223    budget.allocate("summary", 300)
224    budget.allocate("history", 600)
225    budget.allocate("prompt", 100)
226    
227    print(f"\nBudget summary: {budget.get_budget_summary()}")
def count_tokens(text: str, model: str = 'gpt-4o-mini') -> int:
14def count_tokens(
15    text: str, 
16    model: str = "gpt-4o-mini"
17) -> int:
18    """
19    Count tokens in text using tiktoken for OpenAI models.
20    
21    Args:
22        text: Text to count tokens for
23        model: Model name for encoding selection
24        
25    Returns:
26        int: Number of tokens in the text
27    """
28    try:
29        # Map model names to tiktoken encodings
30        if "gpt-4" in model.lower():
31            encoding = tiktoken.encoding_for_model("gpt-4")
32        elif "gpt-3.5" in model.lower():
33            encoding = tiktoken.encoding_for_model("gpt-3.5-turbo")
34        elif "o1" in model.lower() or "o3" in model.lower() or "o4" in model.lower():
35            # Use gpt-4 encoding for o-series models
36            encoding = tiktoken.encoding_for_model("gpt-4")
37        else:
38            # Default to gpt-4 encoding for other models (Claude, Gemini, etc.)
39            encoding = tiktoken.encoding_for_model("gpt-4")
40        
41        return len(encoding.encode(text))
42        
43    except Exception as e:
44        # Fallback to rough approximation if tiktoken fails
45        # OpenAI's rule of thumb: ~4 characters per token
46        return max(1, len(text) // 4)

Count tokens in text using tiktoken for OpenAI models.

Args: text: Text to count tokens for model: Model name for encoding selection

Returns: int: Number of tokens in the text

def count_tokens_in_messages( messages: List[Union[langchain_core.messages.base.BaseMessage, dict]], model: str = 'gpt-4o-mini') -> int:
49def count_tokens_in_messages(
50    messages: List[Union[BaseMessage, dict]], 
51    model: str = "gpt-4o-mini"
52) -> int:
53    """
54    Count tokens in a list of messages.
55    
56    Args:
57        messages: List of LangChain messages or dict messages
58        model: Model name for encoding selection
59        
60    Returns:
61        int: Total number of tokens in all messages
62    """
63    total_tokens = 0
64    
65    for message in messages:
66        if isinstance(message, BaseMessage):
67            # LangChain message
68            text = str(message.content)
69        elif isinstance(message, dict):
70            # Dict message (uni-api format)
71            text = message.get("content", "")
72        else:
73            # Fallback: convert to string
74            text = str(message)
75        
76        total_tokens += count_tokens(text, model)
77    
78    return total_tokens

Count tokens in a list of messages.

Args: messages: List of LangChain messages or dict messages model: Model name for encoding selection

Returns: int: Total number of tokens in all messages

def estimate_tokens_for_model(text: str, model: str) -> int:
 81def estimate_tokens_for_model(text: str, model: str) -> int:
 82    """
 83    Estimate tokens for different model families with family-specific approximations.
 84    
 85    Args:
 86        text: Text to estimate tokens for
 87        model: Model name
 88        
 89    Returns:
 90        int: Estimated number of tokens
 91    """
 92    model_lower = model.lower()
 93    
 94    if "gpt" in model_lower or "openai" in model_lower:
 95        # Use tiktoken for OpenAI models
 96        return count_tokens(text, model)
 97    elif "claude" in model_lower:
 98        # Claude tokenization is similar to GPT-4 but slightly different
 99        # Rough approximation: 3.8 chars per token
100        return max(1, len(text) // 4)
101    elif "gemini" in model_lower:
102        # Gemini has different tokenization
103        # Rough approximation: 4.2 chars per token  
104        return max(1, len(text) // 4)
105    else:
106        # Generic fallback
107        return max(1, len(text) // 4)

Estimate tokens for different model families with family-specific approximations.

Args: text: Text to estimate tokens for model: Model name

Returns: int: Estimated number of tokens

class TokenBudget:
110class TokenBudget:
111    """
112    Utility class for managing token budgets across conversation sections.
113    """
114    
115    def __init__(self, total_budget: int, model: str = "gpt-4o-mini"):
116        self.total_budget = total_budget
117        self.model = model
118        self.allocations = {}
119        self.used_tokens = {}
120    
121    def allocate(self, section: str, tokens: int) -> None:
122        """Allocate tokens to a section."""
123        self.allocations[section] = tokens
124        if section not in self.used_tokens:
125            self.used_tokens[section] = 0
126    
127    def use_tokens(self, section: str, text: str) -> bool:
128        """
129        Use tokens for a section. Returns True if within budget.
130        
131        Args:
132            section: Section name
133            text: Text to count tokens for
134            
135        Returns:
136            bool: True if within section budget, False otherwise
137        """
138        tokens_needed = count_tokens(text, self.model)
139        
140        if section not in self.allocations:
141            return False
142        
143        if self.used_tokens.get(section, 0) + tokens_needed <= self.allocations[section]:
144            self.used_tokens[section] = self.used_tokens.get(section, 0) + tokens_needed
145            return True
146        
147        return False
148    
149    def get_remaining(self, section: str) -> int:
150        """Get remaining tokens for a section."""
151        allocated = self.allocations.get(section, 0)
152        used = self.used_tokens.get(section, 0)
153        return max(0, allocated - used)
154    
155    def get_total_used(self) -> int:
156        """Get total tokens used across all sections."""
157        return sum(self.used_tokens.values())
158    
159    def get_total_remaining(self) -> int:
160        """Get total remaining budget."""
161        return max(0, self.total_budget - self.get_total_used())
162    
163    def reset_section(self, section: str) -> None:
164        """Reset usage for a section."""
165        self.used_tokens[section] = 0
166    
167    def get_budget_summary(self) -> dict:
168        """Get a summary of budget usage."""
169        return {
170            "total_budget": self.total_budget,
171            "total_used": self.get_total_used(),
172            "total_remaining": self.get_total_remaining(),
173            "sections": {
174                section: {
175                    "allocated": self.allocations.get(section, 0),
176                    "used": self.used_tokens.get(section, 0),
177                    "remaining": self.get_remaining(section)
178                }
179                for section in set(list(self.allocations.keys()) + list(self.used_tokens.keys()))
180            }
181        }

Utility class for managing token budgets across conversation sections.

TokenBudget(total_budget: int, model: str = 'gpt-4o-mini')
115    def __init__(self, total_budget: int, model: str = "gpt-4o-mini"):
116        self.total_budget = total_budget
117        self.model = model
118        self.allocations = {}
119        self.used_tokens = {}
total_budget
model
allocations
used_tokens
def allocate(self, section: str, tokens: int) -> None:
121    def allocate(self, section: str, tokens: int) -> None:
122        """Allocate tokens to a section."""
123        self.allocations[section] = tokens
124        if section not in self.used_tokens:
125            self.used_tokens[section] = 0

Allocate tokens to a section.

def use_tokens(self, section: str, text: str) -> bool:
127    def use_tokens(self, section: str, text: str) -> bool:
128        """
129        Use tokens for a section. Returns True if within budget.
130        
131        Args:
132            section: Section name
133            text: Text to count tokens for
134            
135        Returns:
136            bool: True if within section budget, False otherwise
137        """
138        tokens_needed = count_tokens(text, self.model)
139        
140        if section not in self.allocations:
141            return False
142        
143        if self.used_tokens.get(section, 0) + tokens_needed <= self.allocations[section]:
144            self.used_tokens[section] = self.used_tokens.get(section, 0) + tokens_needed
145            return True
146        
147        return False

Use tokens for a section. Returns True if within budget.

Args: section: Section name text: Text to count tokens for

Returns: bool: True if within section budget, False otherwise

def get_remaining(self, section: str) -> int:
149    def get_remaining(self, section: str) -> int:
150        """Get remaining tokens for a section."""
151        allocated = self.allocations.get(section, 0)
152        used = self.used_tokens.get(section, 0)
153        return max(0, allocated - used)

Get remaining tokens for a section.

def get_total_used(self) -> int:
155    def get_total_used(self) -> int:
156        """Get total tokens used across all sections."""
157        return sum(self.used_tokens.values())

Get total tokens used across all sections.

def get_total_remaining(self) -> int:
159    def get_total_remaining(self) -> int:
160        """Get total remaining budget."""
161        return max(0, self.total_budget - self.get_total_used())

Get total remaining budget.

def reset_section(self, section: str) -> None:
163    def reset_section(self, section: str) -> None:
164        """Reset usage for a section."""
165        self.used_tokens[section] = 0

Reset usage for a section.

def get_budget_summary(self) -> dict:
167    def get_budget_summary(self) -> dict:
168        """Get a summary of budget usage."""
169        return {
170            "total_budget": self.total_budget,
171            "total_used": self.get_total_used(),
172            "total_remaining": self.get_total_remaining(),
173            "sections": {
174                section: {
175                    "allocated": self.allocations.get(section, 0),
176                    "used": self.used_tokens.get(section, 0),
177                    "remaining": self.get_remaining(section)
178                }
179                for section in set(list(self.allocations.keys()) + list(self.used_tokens.keys()))
180            }
181        }

Get a summary of budget usage.

def count_conversation_tokens( messages: List[Union[langchain_core.messages.base.BaseMessage, dict]], model: str = 'gpt-4o-mini') -> int:
185def count_conversation_tokens(messages: List[Union[BaseMessage, dict]], model: str = "gpt-4o-mini") -> int:
186    """Count tokens in a full conversation."""
187    return count_tokens_in_messages(messages, model)

Count tokens in a full conversation.

def check_token_limit(text: str, limit: int, model: str = 'gpt-4o-mini') -> bool:
190def check_token_limit(text: str, limit: int, model: str = "gpt-4o-mini") -> bool:
191    """Check if text is within token limit."""
192    return count_tokens(text, model) <= limit

Check if text is within token limit.

def truncate_to_token_limit(text: str, limit: int, model: str = 'gpt-4o-mini') -> str:
195def truncate_to_token_limit(text: str, limit: int, model: str = "gpt-4o-mini") -> str:
196    """
197    Truncate text to fit within token limit.
198    
199    This is a rough approximation - for precise truncation,
200    you'd need to decode tokens back to text.
201    """
202    current_tokens = count_tokens(text, model)
203    
204    if current_tokens <= limit:
205        return text
206    
207    # Rough truncation based on character ratio
208    char_ratio = len(text) / current_tokens
209    target_chars = int(limit * char_ratio * 0.9)  # 10% safety margin
210    
211    return text[:target_chars] + "..."

Truncate text to fit within token limit.

This is a rough approximation - for precise truncation, you'd need to decode tokens back to text.