Metadata-Version: 2.3
Name: AioKafkaEngine
Version: 0.1.0
Summary: Wrapper around aio-kafka to use and receive via a asyncio queue
Project-URL: Homepage, https://github.com/Bot-detector/AioKafkaEngine
Project-URL: Bug Tracker, https://github.com/Bot-detector/AioKafkaEngine/issues
Author-email: Extreme4all <author@example.com>
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Requires-Python: >=3.9
Requires-Dist: aiokafka>=0.12.0
Requires-Dist: asyncio>=3.4.3
Description-Content-Type: text/markdown


# AsyncKafkaEngine
AsyncKafkaEngine is a wrapper around the aiokafka package.
 
its build with the idea of decoupling the reading & batching task from your main application task, this way the queue of batches is always full and reading from kafka is not being blocked by the work of your application logic.

# Installation
Installing the package using uv
```bash
uv add AioKafkaEngine
```
Install the package using pip:
```bash
pip install AioKafkaEngine
```

# Usage
## ConsumerEngine

The ConsumerEngine class manages the consumption of messages from Kafka topics asynchronously and places them into an queue. Your application can consume the queue.
Example

```python
import asyncio
from AioKafkaEngine import ConsumerEngine
from aiokafka import AIOKafkaConsumer
import json

async def work(queue):
    message = await queue.get()
    print(message)

async def main():
    """
    Test that the consumer fetches and processes a single batch of messages.
    """
    test_queue = asyncio.Queue()

    # Using the mock setup, getmany should return two messages
    engine = ConsumerEngine(
        consumer=AIOKafkaConsumer(
            *["test_topic"],
            bootstrap_servers="localhost:9092",
            group_id="my_group",
            value_deserializer=lambda x: json.loads(x.decode("utf-8")),
            auto_offset_reset="earliest",
        ),
        queue=test_queue,
        batch_size=10,
        timeout=1,
    )
    await engine.start()
    consume_task = asyncio.create_task(engine.consume())
    
    # create workers
    workers = [asyncio.create_task(work(queue)) for _ in range(10)]

    # will never exit
    await asyncio.gather(consume_task, *workers)

asyncio.run(main())
```
## ProducerEngine

The ProducerEngine class manages the production of messages to a Kafka topic asynchronously by retrieving messages from an internal queue. It also logs production statistics periodically.
Example

```python
import asyncio
from AioKafkaEngine import ProducerEngine
from aiokafka import AIOKafkaProducer
import json

async def work(queue):
    await queue.put(item={"key": 1})

async def main():
    """
    Test that the consumer fetches and processes a single batch of messages.
    """
    queue = asyncio.Queue()
    await queue.put(item={"key": "k", "key2": 2})

    # Using the mock setup, getmany should return two messages
    engine = ProducerEngine(
        producer=AIOKafkaProducer(
            bootstrap_servers="localhost:9092",
            value_serializer=lambda v: json.dumps(v).encode(),
            acks="all",
        ),
        queue=queue,
        topic="produce_topic",
    )
    await engine.start()

    produce_task = asyncio.create_task(engine.produce())
    
    # create workers
    workers = [asyncio.create_task(work(queue)) for _ in range(10)]

    # will never exit
    await asyncio.gather(produce_task, *workers)
asyncio.run(main())
```

# Contributing
Contributions are welcome! Please submit a pull request or open an issue on GitHub.

# License

This project is licensed under the BSD 2-Clause License.