Metadata-Version: 2.4
Name: apipool-ng
Version: 1.0.7
Summary: Multiple API Key Manager (Next Generation, sqlalchemy_mate free)
Home-page: https://github.com/apipool-ng/apipool-project
Author: apipool-ng Contributors
Author-email: 
Maintainer: apipool-ng Contributors
Maintainer-email: 
License: MIT
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Natural Language :: English
Classifier: Operating System :: Microsoft :: Windows
Classifier: Operating System :: MacOS
Classifier: Operating System :: Unix
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE.txt
Requires-Dist: sqlalchemy>=1.0.0
Requires-Dist: httpx>=0.24.0
Dynamic: author
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: license
Dynamic: license-file
Dynamic: maintainer
Dynamic: requires-dist
Dynamic: requires-python
Dynamic: summary

# apipool-ng

`apipool-ng` is a next-generation **Multiple API Key Manager**.

It allows developers to manage multiple API keys simultaneously. For example,
if a single API key has 1000/day quota, you can register 10 API keys and let
`apipool-ng` automatically rotate them.

## Features

- Automatically rotate API keys across multiple credentials.
- Built-in usage statistics, searchable by time, status, and apikey.
  Stats collector can be deployed on any relational database (SQLite by default).
- Clean API, minimal code required to implement complex features.
- **Async support** — `adummyclient` provides full async chain-proxy with `await`.
- **Server SDK mode** — connect to an `apipool-server` instance and proxy all API
  calls through it, with transparent key rotation and zero local key material.
- **Dynamic scaling** — `DynamicKeyManager` auto-refreshes keys from server,
  expanding or shrinking the pool in real time.
- Lightweight dependencies — `sqlalchemy` + `httpx`.

## Installation

```bash
pip install apipool-ng
```

## Quick Start

### Library Mode (Local Keys)

Implement an `ApiKey` subclass, then create an `ApiKeyManager`:

```python
from apipool import ApiKey, ApiKeyManager

class MyApiKey(ApiKey):
    def __init__(self, key):
        self.key = key

    def get_primary_key(self):
        return self.key

    def create_client(self):
        return MyApiClient(api_key=self.key)

    def test_usability(self, client):
        return client.test_connection()

apikeys = [MyApiKey(k) for k in ["key1", "key2", "key3"]]
manager = ApiKeyManager(apikey_list=apikeys)
manager.check_usable()

# Synchronous calls — keys auto-rotate
result = manager.dummyclient.some_api_method(arg)
```

### Server SDK Mode (Remote Proxy)

Connect to an `apipool-server` instance — no API keys stored locally:

```python
import os
from apipool import connect, login

# 1. Authenticate
tokens = login(
    service_url="http://localhost:8000",
    username="alice",
    password="password",
)

# 2. Connect to a pool
manager = connect(
    service_url="http://localhost:8000",
    pool_identifier="google-geocoding",
    auth_token=tokens["access_token"],
)

# 3. Use exactly like library mode
result = manager.dummyclient.geocode("1600 Amphitheatre Parkway")
```

### Fetch Raw Keys from Server

Retrieve decrypted keys and build a local `ApiKeyManager`:

```python
from apipool import login, get_keys, ApiKeyManager

tokens = login("http://localhost:8000", "alice", "password")
raw_keys = get_keys(
    service_url="http://localhost:8000",
    client_type="coingecko",
    auth_token=tokens["access_token"],
)
# raw_keys = ["CG-xxx", "CG-yyy", ...]
```

## DummyClient & AsyncDummyClient

Use `manager.dummyclient` (sync) or `manager.adummyclient` (async) just like
your original API client. Under the hood, they automatically select a usable
key, record usage events, and rotate keys on rate-limit errors.

```python
# Sync — use dummyclient
result = manager.dummyclient.some_method()

# Async — use adummyclient
result = await manager.adummyclient.some_method()

# Multi-level attribute chains are supported natively
result = await manager.adummyclient.coins.simple.price.get(ids="bitcoin")
```

### Rate Limit Handling

Specify the exception type that indicates rate-limit exhaustion:

```python
from apipool import ApiKeyManager

manager = ApiKeyManager(
    apikey_list=apikeys,
    reach_limit_exc=RateLimitError,  # auto-rotate on this exception
)
```

When a call raises `reach_limit_exc`, the key is automatically removed from
the pool and the exception is re-raised. When all keys are exhausted,
`PoolExhaustedError` is raised on the next call.

```python
from apipool import ApiKeyManager, PoolExhaustedError

try:
    result = manager.dummyclient.some_method()
except PoolExhaustedError:
    print("All API keys exhausted")
```

## Server SDK Mode (Full API)

| Function | Sync | Async |
|---|---|---|
| Authenticate | `login(service_url, username, password)` | `await alogin(...)` |
| Connect to pool | `connect(service_url, pool_identifier, auth_token)` | `await async_connect(...)` |
| Fetch raw keys | `get_keys(service_url, client_type, auth_token)` | `await aget_keys(...)` |

Async example:

```python
from apipool import async_connect, alogin

tokens = await alogin("http://localhost:8000", "alice", "password")
manager = await async_connect(
    service_url="http://localhost:8000",
    pool_identifier="coingecko",
    auth_token=tokens["access_token"],
)
result = await manager.adummyclient.coins.simple.price.get(ids="bitcoin")
```

## Dynamic Key Manager (Auto-Refresh)

`DynamicKeyManager` extends `ApiKeyManager` with a background thread that
periodically fetches the latest key list from `apipool-server` and reconciles
the local pool — **adding new keys** and **removing deleted ones** automatically.

### Sync: DynamicKeyManager

```python
from apipool import DynamicKeyManager, get_keys, login

tokens = login("http://localhost:8000", "alice", "password")

manager = DynamicKeyManager(
    key_fetcher=lambda: get_keys(
        service_url="http://localhost:8000",
        client_type="coingecko",
        auth_token=tokens["access_token"],
    ),
    api_key_factory=lambda raw_key: CoinGeckoApiKey(raw_key),
    refresh_interval=120,  # seconds
    on_keys_added=lambda keys: print(f"Added: {keys}"),
    on_keys_removed=lambda keys: print(f"Removed: {keys}"),
)

# Use like a normal ApiKeyManager
result = manager.dummyclient.ping()
print(f"Pool size: {manager.pool_size}")

# Graceful shutdown
manager.shutdown()
```

### Async: AsyncDynamicKeyManager

```python
from apipool import AsyncDynamicKeyManager, aget_keys, alogin

tokens = await alogin("http://localhost:8000", "alice", "password")

manager = AsyncDynamicKeyManager(
    key_fetcher=lambda: aget_keys(
        service_url="http://localhost:8000",
        client_type="coingecko",
        auth_token=tokens["access_token"],
    ),
    api_key_factory=lambda raw_key: AsyncCoinGeckoKey(raw_key),
    refresh_interval=120,
)

await manager.astart()  # initial fetch + auto-refresh task

result = await manager.adummyclient.coins.simple.price.get(ids="bitcoin")
print(f"Pool size: {manager.pool_size}")

await manager.ashutdown()
```

### How It Works

| Step | Description |
|---|---|
| 1. Fetch | Call `key_fetcher()` to get the latest `list[str]` from the server |
| 2. Diff | Compare server keys vs. local active + archived keys |
| 3. Add | New keys → create via `api_key_factory` → add to pool |
| 4. Restore | Archived keys that reappear on server → restore to active pool |
| 5. Remove | Keys gone from server → remove from active pool to archive |
| 6. Callback | `on_keys_added` / `on_keys_removed` fired if keys changed |

### Parameters

| Parameter | Type | Description |
|---|---|---|
| `key_fetcher` | `Callable[[], list[str]]` | Returns current raw keys from server |
| `api_key_factory` | `Callable[[str], ApiKey]` | Converts raw key to `ApiKey` instance |
| `refresh_interval` | `float` | Seconds between refreshes (default 60) |
| `on_keys_added` | `Callable[[list[str]], None]` | Callback after keys are added |
| `on_keys_removed` | `Callable[[list[str]], None]` | Callback after keys are removed |
| `config_fetcher` | `Callable[[], PoolConfig]` | Returns pool config from server (optional) |

## Configuration Sync

`DynamicKeyManager` can automatically sync configuration from the server.
Pass a `config_fetcher` (typically `get_config` bound with your credentials)
to enable automatic config synchronization on each refresh cycle.

### PoolConfig

Server-side pool configuration is stored as `pool_config` JSON and synced to
the client as a `PoolConfig` dataclass:

| Field | Type | Default | Description |
|---|---|---|---|
| `concurrency` | `int` | `0` | Max concurrent calls (0 = unlimited) |
| `timeout` | `float` | `30.0` | Per-request timeout in seconds |
| `rate_limit` | `int` | `0` | Max requests per key per interval |
| `rate_limit_interval` | `int` | `60` | Interval for rate limit counting |
| `retry_on_failure` | `bool` | `False` | Retry failed calls on another key |
| `max_retries` | `int` | `0` | Maximum retry attempts |
| `custom` | `dict` | `{}` | Arbitrary key-value settings |
| `batch_retry_on_failure` | `bool \| None` | `None` | Batch retry (falls back to `retry_on_failure`) |
| `batch_max_retries` | `int \| None` | `None` | Batch retries (falls back to `max_retries`) |
| `ban_threshold` | `int` | `3` | Consecutive failures before key ban |
| `ban_duration` | `float` | `300.0` | Key ban duration in seconds |
| `reach_limit_exception` | `str` | `None` | Dotted path to exception class |
| `rotation_strategy` | `str` | `"random"` | Key rotation strategy |

### Sync Example

```python
from apipool import DynamicKeyManager, get_keys, get_config, login

tokens = login("http://localhost:8000", "alice", "password")

manager = DynamicKeyManager(
    key_fetcher=lambda: get_keys(
        service_url="http://localhost:8000",
        client_type="coingecko",
        auth_token=tokens["access_token"],
    ),
    api_key_factory=lambda raw_key: CoinGeckoApiKey(raw_key),
    config_fetcher=lambda: get_config(
        service_url="http://localhost:8000",
        pool_identifier="my-pool",
        auth_token=tokens["access_token"],
    ),
    refresh_interval=120,
)

# Config is auto-synced. Access it anytime:
print(f"Concurrency: {manager.config.concurrency}")
print(f"Timeout: {manager.config.timeout}")
```

### Manual Config Fetch

```python
from apipool import get_config, aget_config

# Sync
config = get_config("http://localhost:8000", "my-pool", tokens["access_token"])
manager.apply_config(config)

# Async
config = await aget_config("http://localhost:8000", "my-pool", tokens["access_token"])
```

## Concurrent Execution

Execute the same method across multiple argument sets with bounded concurrency:

### Sync: `call_concurrent`

```python
# Execute 100 API calls with max 10 concurrent
results = manager.call_concurrent(
    method_name="some_api_method",
    args_list=[(arg1,), (arg2,), (arg3,)],
    kwargs_list=[{"key": "a"}, {"key": "b"}, {"key": "c"}],
    max_concurrency=10,   # overrides config.concurrency
    timeout=15.0,         # overrides config.timeout
)
```

### Async: `acall_concurrent`

```python
results = await manager.acall_concurrent(
    method_name="coins.simple.price.get",
    args_list=[(), (), ()],
    kwargs_list=[
        {"ids": "bitcoin"},
        {"ids": "ethereum"},
        {"ids": "solana"},
    ],
    max_concurrency=5,
    timeout=10.0,
)
```

Concurrency and timeout default to `manager.config` values when not specified.

## Batch Execution

`batch_exec` and `abatch_exec` are designed for **high-volume workloads**
such as fetching 10 000 token prices from CoinGecko.  Key features:

- **Deduplication** — each `item_id` is guaranteed to execute at most once.
- **Retry with rotation** — when an API call fails, the item is retried on a
  *different* key, honouring the key rotation strategy.
- **Temporary banning** — keys that accumulate `ban_threshold` consecutive
  failures are temporarily excluded from the batch group for `ban_duration`
  seconds, then automatically re-admitted.
- **Server-configurable** — all tuning parameters can be set centrally in
  `pool_config` on the server and synced to clients via `PoolConfig`.

### Sync: `batch_exec`

```python
from apipool import ApiKeyManager, BatchResult

manager = ApiKeyManager(apikey_list=apikeys)

# Each item is (item_id, args_tuple, kwargs_dict)
items = [
    ("bitcoin",  (), {"ids": "bitcoin",  "vs_currencies": "usd"}),
    ("ethereum", (), {"ids": "ethereum", "vs_currencies": "usd"}),
    # ... up to 10 000 items
]

result: BatchResult = manager.batch_exec(
    method_name="coins.simple.price.get",
    items=items,
    max_concurrency=20,    # 20 parallel calls
    timeout=10.0,          # per-call timeout
    retry_on_failure=True, # retry on another key
    max_retries=3,         # up to 3 retries per item
    ban_threshold=3,       # ban key after 3 consecutive failures
    ban_duration=300.0,    # ban for 5 minutes
)

print(f"Success: {result.succeeded}/{result.total} ({result.success_rate:.1%})")
print(f"Failed items: {list(result.errors.keys())}")
print(f"Banned keys: {list(result.banned_keys.keys())}")

# Access individual results
btc_price = result.results["bitcoin"]
```

### Async: `abatch_exec`

```python
result: BatchResult = await manager.abatch_exec(
    method_name="coins.simple.price.get",
    items=items,
    max_concurrency=50,
    retry_on_failure=True,
    max_retries=3,
)
```

### BatchResult

| Field | Type | Description |
|---|---|---|
| `total` | `int` | Total items submitted |
| `succeeded` | `int` | Items that completed successfully |
| `failed` | `int` | Items that failed after all retries |
| `results` | `dict` | `item_id → result` for successful items |
| `errors` | `dict` | `item_id → Exception` for failed items |
| `banned_keys` | `dict` | `primary_key → ban_expiry_timestamp` |
| `elapsed` | `float` | Wall-clock seconds for the batch |
| `success_rate` | `float` | Property: `succeeded / total` |

### How It Works

| Step | Description |
|---|---|
| 1. Deduplicate | Each `item_id` is unique — no item is executed twice |
| 2. Dispatch | Items are dispatched to `ThreadPoolExecutor` (sync) or `asyncio.Semaphore` (async) |
| 3. Execute | Each call goes through the normal key selection → ChainProxy → stats pipeline |
| 4. Fail → Retry | On failure, the item is retried on a *different* key (up to `max_retries`) |
| 5. Ban | Keys hitting `ban_threshold` consecutive failures are banned for `ban_duration` |
| 6. Collect | Successful results and final errors are collected into `BatchResult` |

### Batch Config Fields

These `PoolConfig` fields control batch behaviour from the server:

| Field | Type | Default | Description |
|---|---|---|---|
| `batch_retry_on_failure` | `bool \| None` | `None` (→ `retry_on_failure`) | Retry failed items on another key |
| `batch_max_retries` | `int \| None` | `None` (→ `max_retries`) | Max retries per item in batch |
| `ban_threshold` | `int` | `3` | Consecutive failures before banning a key |
| `ban_duration` | `float` | `300.0` | Seconds a banned key is excluded |

## ApiKey Abstract Class

Subclass `ApiKey` and implement three methods:

| Method | Description |
|---|---|
| `get_primary_key()` | Return a unique identifier for this key |
| `create_client()` | Create and return the SDK client instance |
| `test_usability(client)` | Test if the key is usable; return `bool` |

Optional async methods on `ApiKey`:

| Method | Description |
|---|---|
| `aconnect_client()` | Async version of `connect_client()` |
| `ais_usable()` | Async version of `is_usable()` |

## StatsCollector

Query usage statistics through `manager.stats`:

```python
from apipool import StatusCollection

# Usage count per key in last hour
manager.stats.usage_count_stats_in_recent_n_seconds(3600)
# {"key1": 3, "key2": 5, "key3": 2}

# Count specific events
count = manager.stats.usage_count_in_recent_n_seconds(
    n_seconds=3600,
    status_id=StatusCollection.c9_ReachLimit.id,
)
```

## API Reference

### Exported Symbols

```python
from apipool import (
    # Core
    ApiKey,
    ApiKeyManager,
    PoolExhaustedError,
    BatchResult,

    # Async chain proxy
    AsyncDummyClient,
    AsyncChainProxy,
    AsyncApiCaller,

    # Dynamic scaling
    DynamicKeyManager,
    AsyncDynamicKeyManager,

    # Stats
    StatusCollection,
    StatsCollector,

    # Configuration
    PoolConfig,

    # Server SDK mode — sync
    connect,
    login,
    get_keys,
    get_config,

    # Server SDK mode — async
    async_connect,
    alogin,
    aget_keys,
    aget_config,
)
```

## License

MIT License
