Metadata-Version: 2.4
Name: kafkars
Version: 0.0.6
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Requires-Dist: pyarrow>=14
Requires-Dist: mkdocs-material>=9 ; extra == 'docs'
Requires-Dist: mkdocs-mermaid2-plugin>=1 ; extra == 'docs'
Requires-Dist: mkdocstrings[python]>=0.24 ; extra == 'docs'
Provides-Extra: docs
License-File: LICENSE
Summary: Rust based, Arrow powered, python kafka client
License: Apache-2.0
Requires-Python: >=3.10
Description-Content-Type: text/markdown; charset=UTF-8; variant=GFM

# kafkars

[![PyPI](https://img.shields.io/pypi/v/kafkars.svg)](https://pypi.org/project/kafkars/)
[![Downloads](https://img.shields.io/pypi/dm/kafkars.svg)](https://pypi.org/project/kafkars/)
[![Documentation](https://readthedocs.org/projects/kafkars/badge/?version=latest)](https://kafkars.readthedocs.io/en/latest/?badge=latest)
[![Python](https://img.shields.io/pypi/pyversions/kafkars.svg)](https://pypi.org/project/kafkars/)
[![License](https://img.shields.io/pypi/l/kafkars.svg)](https://github.com/0x26res/kafkars/blob/main/LICENSE)
[![GitHub stars](https://img.shields.io/github/stars/0x26res/kafkars?style=social)](https://github.com/0x26res/kafkars)
[![GitHub repo size](https://img.shields.io/github/repo-size/0x26res/kafkars)](https://github.com/0x26res/kafkars)
[![GitHub issues](https://img.shields.io/github/issues/0x26res/kafkars)](https://github.com/0x26res/kafkars/issues)
[![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg)](https://github.com/0x26res/kafkars/pulls)
[![Rust](https://img.shields.io/badge/rust-%23000000.svg?logo=rust&logoColor=white)](https://www.rust-lang.org/)
[![Apache Arrow](https://img.shields.io/badge/Apache%20Arrow-powered-orange.svg)](https://arrow.apache.org/)
[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)
[![prek](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/j178/prek/master/docs/assets/badge-v0.json)](https://github.com/j178/prek)

Rust-based, Arrow-powered Python Kafka client for high-throughput data pipelines.

## Motivation

Python's Global Interpreter Lock (GIL) and memory management create bottlenecks when consuming high-volume Kafka streams.
Traditional Python Kafka clients process messages one at a time,
requiring serialization/deserialization overhead for each message and limiting throughput.

**kafkars** solves this by:

- **Rust core**: All Kafka operations (polling, buffering, ordering) happen in Rust, bypassing the GIL
- **Batch processing**: Messages are accumulated and returned as Apache Arrow RecordBatches, not individual Python objects
- **Zero-copy where possible**: Arrow's columnar format enables efficient data transfer between Rust and Python
- **Vectorized operations**: Process thousands of messages at once with pandas, polars, or any Arrow-compatible library

This architecture is ideal for:

- Real-time analytics pipelines
- ML feature stores consuming from Kafka
- High-volume event processing
- Data lake ingestion

## Important: Analytics-Focused Design

**kafkars does not commit offsets.** It is designed for analytics and high-throughput batch processing, not transactional workloads.

- **No exactly-once semantics**: Messages may be reprocessed if your application restarts
- **No offset tracking**: You control where to start reading via offset policies
- **Stateless consumers**: Each consumer instance starts fresh based on the configured policy

If you need exactly-once processing, transactional guarantees, or automatic offset management, use a traditional Kafka client like `confluent-kafka-python`.

## Features

- **Ordered delivery**: Messages released in timestamp order across all partitions
- **Flexible offset policies**: Start from earliest, latest, or any timestamp
- **Backpressure management**: Automatically pauses fast partitions to prevent memory overflow
- **Arrow-native output**: Returns PyArrow RecordBatch for efficient downstream processing

## Installation

```bash
pip install kafkars
```

## Quick Start

```python
from kafkars import ConsumerManager, SourceTopic

# Define source topics with offset policies
topics = [
    SourceTopic.from_earliest("events"),
    SourceTopic.from_relative_time("metrics", 3600_000),  # 1 hour ago
]

# Create consumer
manager = ConsumerManager(
    config={"bootstrap.servers": "localhost:9092"},
    topics=topics,
)

# Poll returns PyArrow RecordBatch
while True:
    batch = manager.poll(timeout_ms=1000)
    if batch.num_rows > 0:
        # Convert to pandas/polars for processing
        df = batch.to_pandas()
        print(f"Received {len(df)} messages")

    if manager.is_live():
        print("Caught up to real-time")
        break
```

## Message Schema

Each poll returns a RecordBatch with the following schema:

| Column      | Type                 | Description                |
|-------------|----------------------|----------------------------|
| `key`       | `binary`             | Message key (nullable)     |
| `value`     | `binary`             | Message payload (nullable) |
| `topic`     | `utf8`               | Source topic name          |
| `partition` | `int32`              | Partition number           |
| `offset`    | `int64`              | Message offset             |
| `timestamp` | `timestamp[ms, UTC]` | Message timestamp          |

## Offset Policies

| Policy                          | Description                            |
|---------------------------------|----------------------------------------|
| `from_earliest(topic)`          | Start from the beginning               |
| `from_latest(topic)`            | Start from the end (new messages only) |
| `from_relative_time(topic, ms)` | Start from N milliseconds ago          |
| `from_absolute_time(topic, ms)` | Start from specific Unix timestamp     |

## Documentation

Full documentation is available at [kafkars.readthedocs.io](https://kafkars.readthedocs.io).

## License

Apache 2.0

