# batchit

> Batch any Python iterator by count, elapsed time, or both. Zero dependencies. Python 3.10+.

## Install

```bash
pip install batchit
```

## When to use

Use `batchit` whenever you need to consume a stream in fixed-size or time-bounded chunks:
- Kafka / queue consumers (flush every N items *or* every T seconds)
- Database cursors (bulk insert without loading everything into memory)
- API result streams (rate-limit downstream calls)
- Any `for` loop where you want batching without boilerplate

## API

### `batcher(iterable, *, size=None, timeout=None) -> Generator[list[T], None, None]`

Synchronous. Works with any `Iterable` — generators, file objects, database cursors, Kafka consumers.

- `size` — max items per batch (`int`, optional)
- `timeout` — max seconds to accumulate a batch, measured from the **first item** (`float`, optional)
- At least one of `size` / `timeout` is required, or `ValueError` is raised
- Timeout is checked **on item arrival** — no threads, no background tasks
- Remaining items are always yielded; nothing is silently dropped

### `async_batcher(aiterable, *, size=None, timeout=None) -> AsyncGenerator[list[T], None]`

Async. Works with any `AsyncIterable`.

- Same parameters as `batcher`
- Uses `asyncio.wait_for` internally — timeout fires **even when the source stalls** (no items need to arrive)
- Source exceptions are contained in the producer task; the consumer sees a clean end-of-stream

## Usage

```python
from batchit import batcher, async_batcher
```

### Sync — by size

```python
for batch in batcher(range(1000), size=100):
    db.bulk_insert(batch)
```

### Sync — by timeout (flush every 5 s regardless of count)

```python
for batch in batcher(kafka_consumer, timeout=5.0):
    send_to_api(batch)
```

### Sync — by both (whichever fires first)

```python
for batch in batcher(db_cursor, size=500, timeout=10.0):
    warehouse.insert_many(batch)
```

### Async — timeout fires even when source is stalled

```python
async for batch in async_batcher(websocket_stream, size=100, timeout=2.0):
    await db.bulk_insert(batch)
```

## Behaviour notes

| Scenario | sync `batcher` | async `async_batcher` |
|---|---|---|
| Timeout fires when source stalls | No — only on item arrival | Yes — uses `asyncio.wait_for` |
| Source exception propagates | Yes | No — consumer sees clean end |
| Thread safety | Single-threaded only | asyncio event loop only |
| Remaining items yielded | Always | Always |

## Real-world patterns

### Kafka consumer

```python
from kafka import KafkaConsumer
from batchit import batcher

consumer = KafkaConsumer("events")
for batch in batcher(consumer, size=500, timeout=10.0):
    db.bulk_insert([msg.value for msg in batch])
    consumer.commit()
```

### Database cursor

```python
cursor.execute("SELECT * FROM events")
for batch in batcher(cursor, size=1000):
    warehouse.insert_many(batch)
```

### Async HTTP / WebSocket stream

```python
async for batch in async_batcher(response.content, size=64, timeout=2.0):
    await storage.write(batch)
```

## Source

- `src/batchit/_sync.py` — sync implementation
- `src/batchit/_async.py` — async implementation (queue + `asyncio.wait_for`)
- `tests/test_sync.py`, `tests/test_async.py` — full test suite
