Metadata-Version: 2.4
Name: token-limit
Version: 0.1.9
Summary: Usage metering and cost enforcement per tenant for LLM applications.
License: BUSL-1.1
License-File: LICENSE
Keywords: llm,openai,anthropic,google-ai,token-metering,cost-tracking
Author: Ali Ezatyar Ahmadyar
Author-email: aliezatyar@gmail.com
Requires-Python: >=3.10,<4.0
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: Other/Proprietary License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Classifier: Topic :: Internet
Classifier: Topic :: Software Development :: Libraries
Provides-Extra: all
Provides-Extra: anthropic
Provides-Extra: google
Provides-Extra: openai
Requires-Dist: anthropic (>=0.111.0,<0.112.0) ; extra == "anthropic" or extra == "all"
Requires-Dist: google-genai (>=2.9.0,<3.0.0) ; extra == "google" or extra == "all"
Requires-Dist: openai (>=2.43.0,<3.0.0) ; extra == "openai" or extra == "all"
Project-URL: Homepage, https://github.com/AliEzatyar/token-limit
Project-URL: Repository, https://github.com/AliEzatyar/token-limit
Description-Content-Type: text/markdown

# token-limit

Usage metering and cost enforcement for LLM calls, built for multi-tenant B2B applications.  
One call instruments every OpenAI, Anthropic, Google AI, DeepSeek, and OpenRouter request — no changes required in your LLM call sites.

---

## Installation

Install the base package, then add extras only for the providers you use:

```bash
# Core package (no provider SDKs included)
pip install token-limit

# With specific provider support
pip install token-limit[openai]      # OpenAI + DeepSeek + OpenRouter
pip install token-limit[anthropic]   # Anthropic Claude
pip install token-limit[google]      # Google AI (Gemini)

# Everything at once
pip install token-limit[all]
```

> **Note:** Provider extras install the corresponding official SDK as a dependency (`openai`, `anthropic`, `google-genai`). If you already have these SDKs pinned in your project, installing the extras is still safe — they will not downgrade your existing versions.

---

## How it works

`token-limit` monkey-patches the official provider SDKs at startup. Every LLM call your application makes is automatically intercepted, token usage is extracted from the response, and a lightweight event is queued and batched to your backend ingest endpoint in the background. Your LLM calls are never blocked or slowed down.

```
Your code  →  [patched SDK]  →  LLM provider
                    ↓
              LLMEvent captured
                    ↓
              EventQueue (in-memory, daemon thread)
                    ↓  (every 5s or 50 events)
              POST /v1/ingest  →  Your backend  →  Dashboard
```

---

### 1. Initialize once at application startup

```python
from token_limit import Meter, MeterConfig

meter = Meter(MeterConfig(
    api_key="sk-...", # Your TokenLimit API key
))

meter.patch_all()  # patches OpenAI, Anthropic, Google, DeepSeek — all at once
```

### 2. Tag requests per tenant

Use the context manager to scope a block of LLM calls to a tenant. Thread-safe and async-safe via `contextvars` — concurrent requests with different tenants are fully isolated.

```python
with meter.for_tenant("user5"):
    try:
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": "Hello"}],
        )
    except LimitExceededException:
        show_upgrade_message()
```

For middleware or request handlers where a context manager isn't convenient:

```python
meter.set_tenant(request.tenant_id)  # sets for current thread/async task
```

---

## Supported providers

| Provider | What gets patched | Tokens captured |
|---|---|---|
| **OpenAI** | `chat.completions.create`, `responses.create`, `completions.create` (legacy), `embeddings.create`, `audio.transcriptions.create`, `audio.translations.create`, `audio.speech.create`, `images.generate`, `images.edit` — all sync + async | input, output, cached, plus endpoint-specific fields (character count, image dimensions, audio duration) |
| **Anthropic** | `messages.Messages.create` (sync + async) | input, output, cached (cache read), cache_creation |
| **Google AI** | `Models.generate_content`, `Models.generate_content_stream`, `AsyncModels.generate_content`, `AsyncModels.generate_content_stream` — all via `google.genai` | input, output, total, cached |
| **DeepSeek** | `chat.completions.create`, `fim.completions.create`, `beta.chat.completions.create` — sync + async; covers both first-party `deepseek` SDK and `openai` client pointed at `api.deepseek.com` | input, output, cached (cache hit), cache_miss, reasoning (deepseek-reasoner) |
| **OpenRouter** | `chat.completions.create` on registered client instances — sync + async, streaming and non-streaming | input, output, cached, cost_usd (when billing header enabled), upstream_provider |

All patches are installed/uninstalled cleanly — original methods are always restored on `unpatch_all()` or process exit.

---

## Cost enforcement

Spend limits are configured per tenant in USD and enforced on every intercepted LLM call.

```python
# Per-month limit (default)
meter.set_limit("tenant-id-456", limit_usd=50.00)

# Per-day limit
meter.set_limit("tenant-id-456", limit_usd=5.00, frequency="per_day")
```

When a tenant reaches its configured limit, the next intercepted LLM call raises `LimitExceededException` before any API traffic is sent. Handle it and show an upgrade prompt:

```python
with meter.for_tenant("acme-corp"):
    try:
        response = client.chat.completions.create(...)
    except LimitExceededException:
        return {"detail": "upgrade_plan"}
```

`set_limit()` immediately invalidates the local cache for that tenant so the new threshold is honored on the very next call, without waiting for the TTL to expire.

**Limit check caching.** `check_limit()` and `async_check_limit()` are called on every patched SDK call. Results are cached per tenant for `limit_check_cache_ttl` seconds (default 5 s) to avoid a network round-trip on every LLM call. A local token trip-wire also catches runaway bursts within the TTL window without waiting for the next backend sync.

---

## OpenAI patch details

Patches SDK methods at the **class level**, so every `openai.OpenAI` / `openai.AsyncOpenAI` client created before or after patching is automatically covered.

### Patched surfaces

**`chat.completions.create`** (sync + async)  
ChatCompletions for all models. Handles `stream=True` transparently: forces `stream_options={"include_usage": True}` so the final chunk carries a usage summary, then proxies the iterator to the caller while capturing that summary in a `finally` block.  
Fields: `input_tokens`, `output_tokens`, `total_tokens`, `cached_tokens`, `request_id`, `model`, `stream`, `duration_ms`.

**`responses.create`** (sync + async, openai >= 1.30)  
Responses API. Also captures five image-billing dimensions when the `image_generation` tool is active, and `image_count` for audit.  
Fields: `input_tokens`, `output_tokens`, `total_tokens`, `cached_tokens`, image billing dimensions, `image_count`.

**`completions.create`** (legacy `/v1/completions`, sync + async)  
Legacy text-completion endpoint for models such as `gpt-3.5-turbo-instruct`. Streaming handled identically to chat completions.  
Fields: `input_tokens`, `output_tokens`, `total_tokens`, `model`, `stream`, `duration_ms`.

**`embeddings.create`** (sync + async)  
Text-embedding endpoint (`text-embedding-3-*`, `ada-002`, etc.). `output_tokens` is always 0.  
Fields: `input_tokens`, `output_tokens` (0), `total_tokens`, `model`, `duration_ms`.

**`audio.transcriptions.create`** (sync + async)  
Whisper STT. Two billing modes handled automatically:
- Per-minute models (`whisper-1`): reads `response.duration` (requires `response_format="verbose_json"`; emits a warning if omitted).
- Per-token models (`gpt-4o-transcribe`, `gpt-4o-mini-transcribe`): reads `usage.input_tokens` / `usage.output_tokens`.

Fields: `input_tokens`, `output_tokens`, `audio_input_tokens`, `audio_output_tokens`, `duration_seconds`, `duration_unavailable`, `model`, `duration_ms`.

**`audio.translations.create`** (sync + async)  
Whisper translation. Identical billing logic to `audio.transcriptions`; endpoint tag differs.

**`audio.speech.create`** (TTS, sync + async)  
Two billing modes:
- Per-character models (`tts-1`, `tts-1-hd`): no `usage` object; `character_count` derived from the caller's `input` kwarg.
- Per-token models (`gpt-4o-mini-tts`): reads `usage.input_tokens` / `usage.output_tokens`; sets `character_count=0` to prevent double-billing.

Fields: `input_tokens`, `output_tokens`, `character_count`, `model`, `duration_ms`.

**`images.generate`** and **`images.edit`** (sync + async)  
Image generation and editing for `gpt-image-*` models. Captures five token billing dimensions from `usage.input_tokens_details` and `usage.output_tokens`, plus `image_count`.  
Fields: `input_text_tokens`, `cached_input_text_tokens`, `input_image_tokens`, `cached_input_image_tokens`, `output_image_tokens`, `total_tokens`, `image_count`.

### Not patched (OpenAI)

- `moderations.create` — free endpoint, no per-token cost.
- `fine_tuning.jobs.*` — billed on a separate training rate; not real-time.
- `beta.assistants.*` / `beta.threads.*` / `beta.runs.*` — usage only available after async run completion; not yet supported.
- `uploads.*` / `beta.vector_stores` / `files` — storage-billed, not token-billed.
- `realtime.*` — persistent WebSocket; no discrete `.create()` to wrap.
- `audio.transcriptions.create` with `stream=True` — streaming transcription path not yet captured.

---

## Anthropic patch details

Patches `Messages.create` and `AsyncMessages.create` at the **class level**, so all `anthropic.Anthropic` / `anthropic.AsyncAnthropic` clients created before or after patching are automatically covered.

### Patched surfaces

**`messages.Messages.create`** (sync) and **`messages.AsyncMessages.create`** (async)  
Claude chat/completion for all `claude-*` models. Both `stream=False` (default) and `stream=True` are handled. For streaming, a helper proxies the iterator unchanged while accumulating usage across events (`message_start` → input tokens; `message_delta` → output + cache tokens).  
Fields: `input_tokens`, `output_tokens`, `total_tokens`, `cached_tokens` (cache read hits), `cache_creation_tokens` (cache write), `request_id`, `model`, `stream`, `duration_ms`, `tenant_id`, `error`, `input_tokens_details` (SDK >= 0.26, model-dependent).

### Not patched (Anthropic)

- `beta.messages.batches.*` — asynchronous batch completion; results fetched separately from submission. Not yet supported.
- Embeddings — Anthropic does not offer a text-embedding API.
- Audio/TTS — Anthropic does not offer speech endpoints.
- Image generation — Claude is vision-input only; image tokens are already counted inside `input_tokens`.

---

## Google AI patch details

Patches four methods at the **class level** on `google.genai.models.Models` and `google.genai.models.AsyncModels`. Unlike OpenAI, the `google.genai` SDK exposes streaming as a **separate method** rather than a `stream=True` flag.

### Patched surfaces

**`Models.generate_content`** (sync, non-streaming)  
Usage read from `response.usage_metadata` directly after the call returns.

**`Models.generate_content_stream`** (sync, streaming)  
Returns a synchronous iterator of `GenerateContentResponse` chunks. Usage is only present on the last chunk; the helper tracks `last_chunk` across the full iteration and reads its `usage_metadata` in a `finally` block.

**`AsyncModels.generate_content`** (async, non-streaming)  
Awaits `meter.async_check_limit()` to avoid blocking the event loop.

**`AsyncModels.generate_content_stream`** (async, streaming)  
Handles both coroutine-returning and direct async-iterator forms via `inspect.isawaitable`.

Fields (all four surfaces): `input_tokens` (`prompt_token_count`), `output_tokens` (`candidates_token_count`, includes thinking tokens on the direct Gemini API), `total_tokens` (read from response, not derived), `cached_tokens` (`cached_content_token_count`), `stream`, `request_id`, `duration_ms`, `tenant_id`, `error`.

### Not patched (Google AI)

- Vertex AI SDK (`google.cloud.aiplatform`) — separate SDK, not yet supported.
- `models.embed_content` / `models.embed_content_batch` — not yet supported.
- `models.generate_images` / `models.upscale_image` — billed per image, not per token.
- `models.generate_videos` — billed per second of output, not yet supported.
- `live.*` — WebSocket-based session; no discrete call to wrap.

---

## DeepSeek patch details

Covers **both** integration paths: the first-party `deepseek` package and an `openai` client pointed at `api.deepseek.com`. Both paths are attempted independently — a failure in one does not prevent the other from being installed.

### Patched surfaces

**`chat.completions.create`** (sync + async)  
Standard chat completions. Streaming handled identically to OpenAI: forces `stream_options={"include_usage": True}` and captures usage from the final chunk.

**`fim.completions.create`** (sync + async)  
DeepSeek-specific fill-in-middle (FIM) endpoint. Records `fim_prefix` (from `kwargs["prompt"]` or `kwargs["prefix"]`) and `fim_suffix` alongside standard token counts.

**`beta.chat.completions.create`** (sync + async)  
Beta chat namespace alias present in SDK >= 1.x; uses the same extractor as the main chat surface.

### DeepSeek-specific fields

| Event field | Source |
|---|---|
| `cached_tokens` | `usage.prompt_cache_hit_tokens` |
| `cache_miss_tokens` | `usage.prompt_cache_miss_tokens` |
| `reasoning_tokens` | `usage.completion_tokens_details.reasoning_tokens` (deepseek-reasoner only) |
| `fim_prefix` | `kwargs["prompt"]` or `kwargs["prefix"]` |
| `fim_suffix` | `kwargs["suffix"]` |

### Not patched (DeepSeek)

- `models.list` — metadata endpoint, no token cost.
- `files.*` — file upload/management, not billed per token.

---

## OpenRouter patch details

OpenRouter exposes an OpenAI-compatible REST API, so developers typically point a standard `openai.OpenAI` (or `AsyncOpenAI`) client at `https://openrouter.ai/api/v1`. Unlike the other providers, **OpenRouter is patched at the instance level** rather than the class level — only the specific client instances you register are instrumented, leaving any other OpenAI clients untouched.

### Registration

```python
# Pattern 1 — sync factory (recommended)
client = meter.openrouter_client(api_key="sk-or-v1-...")

# Pattern 2 — async factory
client = meter.async_openrouter_client(api_key="sk-or-v1-...")

# Pattern 3 — register an existing client
meter.register_openrouter_client(existing_client)

# Pattern 4 — fully manual
meter.track_manually(provider="openrouter", model="...", input_tokens=..., output_tokens=...)
```

### Patched surfaces

**`chat.completions.create`** (sync + async, on registered instances only)  
Streaming handled identically to OpenAI: `stream_options={"include_usage": True}` is injected automatically so the final chunk carries usage. The wrapper is installed directly on `client.chat.completions.create` and is guarded against double-patching.

### OpenRouter-specific fields

| Event field | Source |
|---|---|
| `upstream_provider` | Portion before `/` in the model string, e.g. `"anthropic"` from `"anthropic/claude-3-5-sonnet"` |
| `cost_usd` | `usage.cost` — actual USD cost when the caller passes `X-Or-Billing: true` |

Fields (all calls): `input_tokens` (`usage.prompt_tokens`), `output_tokens` (`usage.completion_tokens`), `total_tokens`, `cached_tokens` (`usage.prompt_tokens_details.cached_tokens`), `cost_usd`, `upstream_provider`, `model`, `stream`, `request_id`, `duration_ms`, `tenant_id`.

---

## Configuration reference

All configuration lives in `MeterConfig`, passed once at startup:

```python
from token_limit import Meter, MeterConfig

meter = Meter(MeterConfig(
    # Required
    api_key="your-api-key",       # authenticates event ingest and limit checks
    url="https://...",             # POST endpoint that receives event batches

    # Batching — tune for your traffic volume
    flush_interval=5.0,           # seconds between background flushes
    max_batch_size=50,            # flush early when queue reaches this size
    max_queue_size=1000,          # drop oldest events if queue overflows

    # Limit checks
    limit_check_cache_ttl=5.0,   # seconds a check_limit() result is cached per tenant

    # Behaviour
    debug=False,                  # log every captured event to stdout
    raise_on_error=False,         # re-raise exceptions from within patches

    # Hooks
    on_event=None,                # Callable[[LLMEvent], None] — called after every capture
    on_flush_error=None,          # Callable[[Exception], None] — called on send failure

    # Which providers to patch (default = all four built-ins)
    patches=["openai", "anthropic", "langchain", "google"],
))
```

---

## Event shape

Every captured call produces an `LLMEvent`. Fields are sourced directly from `types.py`:

```python
@dataclass
class LLMEvent:
    # identity
    event_id: str                        # UUID, auto-generated
    tenant_id: str                       # set via for_tenant() or set_tenant()
    session_id: Optional[str]

    # provider / model
    provider: str                        # "openai" | "anthropic" | "google" | "deepseek" | "openrouter"
    model: str                           # e.g. "gpt-4o", "claude-3-5-sonnet-20241022"
    endpoint: str                        # e.g. "chat.completions", "messages", "fim.completions"

    # text token usage
    input_tokens: int
    output_tokens: int
    total_tokens: int
    cached_tokens: int                   # OpenAI: sub-field of input_tokens
                                         # Anthropic/Google: separate pool, not in input_tokens

    # latency
    duration_ms: float                   # wall-clock time of the LLM call
    timestamp: float                     # unix timestamp of capture

    # request metadata
    request_id: Optional[str]            # x-request-id from provider response headers
    stream: bool
    error: Optional[str]                 # set if the LLM call raised an exception

    # audio (transcription / translation)
    duration_seconds: Optional[float]    # per-minute path (whisper-1, verbose_json only)
    audio_input_tokens: Optional[int]    # per-token path (gpt-4o-transcribe etc.)
    audio_output_tokens: Optional[int]

    # speech / TTS
    character_count: Optional[int]       # per-character path (tts-1, tts-1-hd)

    # images (gpt-image-* models)
    input_text_tokens: Optional[int]           # text prompt tokens
    cached_input_text_tokens: Optional[int]
    input_image_tokens: Optional[int]          # reference-image tokens (edit only)
    cached_input_image_tokens: Optional[int]
    output_image_tokens: Optional[int]         # generated-image tokens
    image_count: Optional[int]                 # number of images returned (audit only)

    # extras
    tags: dict                           # arbitrary metadata you can attach
```

All `Optional` fields are omitted from `to_dict()` when `None`, keeping ingest payloads lean.

---

## Advanced usage

### Selective patching

```python
# Specify providers in MeterConfig
meter = Meter(MeterConfig(
    api_key="...",
    url="...",
    patches=["openai", "anthropic"],  # skip google and deepseek
))
meter.patch_all()

# Or patch / unpatch one provider at a time
meter.patch("deepseek")
meter.unpatch("deepseek")

# Unpatch everything and restore original SDK methods
meter.unpatch_all()
```

### Use as a context manager

`Meter` supports the context manager protocol — `__exit__` calls `unpatch_all()` and shuts down the background flush queue automatically:

```python
with Meter(MeterConfig(api_key="...", url="...")).patch_all() as meter:
    with meter.for_tenant("acme-corp"):
        client.chat.completions.create(...)
# all patches restored, queue flushed on exit
```

### Manual tracking

For providers not yet patched, or custom logic:

```python
meter.track_manually(
    provider="cohere",
    model="command-r-plus",
    input_tokens=512,
    output_tokens=128,
    tenant_id="acme-corp",
)
```

Any extra keyword arguments are passed through as event fields (unknown fields are dropped with a debug log when `debug=True`).

### Event hook — real-time logging or custom logic

```python
def my_hook(event: LLMEvent) -> None:
    print(f"[{event.tenant_id}] {event.model}: {event.total_tokens} tokens")

meter = Meter(MeterConfig(
    api_key="...",
    url="...",
    on_event=my_hook,
))
```

### Flush error handling

```python
def on_flush_error(exc: Exception) -> None:
    sentry_sdk.capture_exception(exc)

meter = Meter(MeterConfig(
    api_key="...",
    url="...",
    on_flush_error=on_flush_error,
))
```

### Force a flush

```python
# Useful at the end of a batch job or CLI script
meter._queue.flush_now()
```

---

## Adding a custom provider patch

All provider patches inherit from `BasePatch`. Implement `_install` and an extractor function, then register in `PATCH_REGISTRY`:

```python
from token_limit.patches._base import BasePatch
from token_limit.patches import PATCH_REGISTRY

def _extract(response, args, kwargs, error):
    return {
        "provider": "cohere",
        "endpoint": "chat",
        "model": kwargs.get("model", ""),
        "input_tokens": getattr(response, "meta", {}).get("billed_units", {}).get("input_tokens", 0),
        "output_tokens": getattr(response, "meta", {}).get("billed_units", {}).get("output_tokens", 0),
    }

class CoherePatch(BasePatch):
    name = "cohere"

    def _install(self):
        import cohere
        self._swap(
            cohere.Client, "chat",
            self._make_sync_wrapper(cohere.Client.chat, _extract),
        )

PATCH_REGISTRY["cohere"] = CoherePatch

meter.patch("cohere")
```

---

## Project structure

```
token_limit/
├── __init__.py              ← public API: Meter, MeterConfig, LLMEvent
├── meter.py                 ← Meter class (patch_all, for_tenant, set_limit)
├── config.py                ← MeterConfig dataclass
├── types.py                 ← LLMEvent dataclass
├── exceptions.py            ← LimitExceededException exception class
├── patches/
│   ├── _base.py             ← BasePatch ABC + sync/async wrapper factories
│   ├── openai_patch.py
│   ├── anthropic_patch.py
│   ├── google_patch.py
│   ├── deepseek_patch.py
│   └── openrouter_patch.py
└── transport/
    ├── queue.py             ← thread-safe EventQueue with background flush
    └── http_client.py       ← gzip POST, auto-selects httpx/requests/urllib
```

---

## License

Business Source License 1.1
