Metadata-Version: 2.4
Name: workerlib
Version: 0.3.0
Summary: Async RabbitMQ worker utilities
Author-email: YOUR_NAME <your.email@example.com>
License: MIT
Project-URL: Homepage, https://github.com/ametist-dev/workerlib
Project-URL: Repository, https://github.com/ametist-dev/workerlib
Project-URL: Documentation, https://github.com/ametist-dev/workerlib#readme
Project-URL: Issues, https://github.com/ametist-dev/workerlib/issues
Project-URL: Changelog, https://github.com/ametist-dev/workerlib/releases
Keywords: rabbitmq,async,worker,amqp,aio-pika,message-queue
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Distributed Computing
Classifier: Framework :: AsyncIO
Classifier: Typing :: Typed
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: aio-pika>=9.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.21.0; extra == "dev"
Requires-Dist: pytest-cov>=4.0; extra == "dev"
Requires-Dist: pytest-xdist>=3.0; extra == "dev"
Requires-Dist: black>=23.0; extra == "dev"
Requires-Dist: ruff>=0.1.0; extra == "dev"
Requires-Dist: mypy>=1.0; extra == "dev"
Requires-Dist: pre-commit>=3.0; extra == "dev"
Provides-Extra: test
Requires-Dist: pytest>=7.0; extra == "test"
Requires-Dist: pytest-asyncio>=0.21.0; extra == "test"
Requires-Dist: pytest-cov>=4.0; extra == "test"
Provides-Extra: docs
Requires-Dist: mkdocs>=1.5; extra == "docs"
Requires-Dist: mkdocs-material>=9.0; extra == "docs"
Provides-Extra: all
Requires-Dist: workerlib[dev,docs,test]; extra == "all"
Dynamic: license-file

# WorkerLib - асинхронная работа с RabbitMQ
## Быстрый старт
```python
import asyncio
from workerlib import WorkerPool

async def task_handler(data: dict) -> bool:
    print(f"Обработка: {data}")
    return True

async def main():
    async with WorkerPool() as pool:
        pool.add_worker("tasks", task_handler)
        await pool.send("tasks", {"id": 1, "cmd": "start"})
        await asyncio.sleep(2)

asyncio.run(main())
```
## Формат сообщений
**JSON сообщение**
Библиотека автоматически сериализует dict в JSON при отправке:
```python
# Отправка простого сообщения
await pool.send("queue", {
    "event": "user_created",
    "user_id": 123,
    "email": "user@example.com",
    "timestamp": "2024-01-15T10:30:00Z"
})

# Отправка вложенных структур
await pool.send("queue", {
    "type": "order",
    "data": {
        "order_id": "ORD-12345",
        "items": [
            {"id": 1, "quantity": 2},
            {"id": 2, "quantity": 1}
        ],
        "total": 299.99
    },
    "metadata": {
        "source": "api",
        "version": "1.0"
    }
})
```
## Основные примеры
1. Пул с несколькими воркерами
```python
from workerlib import WorkerPool, ErrorHandlingStrategy

async def main():
    async with WorkerPool() as pool:
        # Email воркер с DLQ
        pool.add_worker(
            queue_name="emails",
            handler=email_handler,
            error_strategy=ErrorHandlingStrategy.RETRY_THEN_DLQ,
            prefetch_count=5
        )
        
        # Обработчик платежей
        pool.add_worker(
            queue_name="payments",
            handler=payment_handler,
            error_strategy=ErrorHandlingStrategy.REQUEUE_END
        )
        
        # Отправка задач
        await pool.send("emails", {"to": "user@test.com"})
        await pool.send("payments", {"amount": 100})
```
2. Кастомное подключение и retry
```python
from workerlib import ConnectionParams, RetryConfig

params = ConnectionParams(
    host="rabbit.local",
    username="admin",
    password="secret"
)

retry_config = RetryConfig(
    max_attempts=3,
    initial_delay=1.0,
    backoff_factor=2.0
)

async with WorkerPool(connection_params=params) as pool:
    pool.add_worker(
        queue_name="critical",
        handler=critical_handler,
        retry_config=retry_config
    )
```
3. Обработка ошибок
```python
from workerlib import ErrorHandlingStrategy

# Варианты:
# IGNORE - проигнорировать ошибку
# REQUEUE_END - в конец очереди с задержкой
# REQUEUE_FRONT - в начало очереди
# DLQ - в Dead Letter Queue
# RETRY_THEN_DLQ - повторить, затем в DLQ

pool.add_worker(
    queue_name="tasks",
    handler=my_handler,
    error_strategy=ErrorHandlingStrategy.RETRY_THEN_DLQ,
    dlq_enabled=True,
    requeue_delay=5.0  # задержка повторной обработки
)
```
4. Отдельные компоненты
```python
from workerlib import (
    RabbitMQConnection,
    RabbitMQQueue,
    RabbitMQConsumer,
    RabbitMQProducer
)

# Создание вручную
connection = RabbitMQConnection()
await connection.connect()

queue = RabbitMQQueue(connection, QueueConfig(name="my_queue"))

producer = RabbitMQProducer(connection, queue)
await producer.send({"test": "data"})

consumer = RabbitMQConsumer(queue, my_handler)
await consumer.consume()
```
5. Batch отправка
```python
async with WorkerPool() as pool:
    messages = [
        {"id": i, "data": f"item_{i}"}
        for i in range(100)
    ]
    
    tasks = [
        pool.send("batch_queue", msg)
        for msg in messages
    ]
    
    await asyncio.gather(*tasks)
```
6. Метрики
```python
async with WorkerPool() as pool:
    pool.add_worker("monitored", handler)
    
    # Отправляем задачи
    for i in range(10):
        await pool.send("monitored", {"task": i})
    
    # Получаем метрики
    metrics = pool.get_metrics("monitored")
    print(f"Обработано: {metrics['consumer']['processed']}")
    print(f"Ошибок: {metrics['consumer']['failed']}")
```
7. FastAPI интеграция
```python
from fastapi import FastAPI
from workerlib import WorkerPool

app = FastAPI()
worker_pool = WorkerPool(auto_start=False)

@app.on_event("startup")
async def startup():
    await worker_pool.start()
    worker_pool.add_worker("api_tasks", task_handler)

@app.on_event("shutdown")
async def shutdown():
    await worker_pool.stop()

@app.post("/task")
async def create_task(data: dict):
    await worker_pool.send("api_tasks", data)
    return {"status": "queued"}
```
## Конфигурация
ConnectionParams
```python
ConnectionParams(
    host="127.0.0.1",
    port=5672,
    username="guest",
    password="guest",
    heartbeat=60,
    timeout=10
)
```
QueueConfig
```python
QueueConfig(
    name="queue_name",
    durable=True,
    prefetch_count=1
)
```
RetryConfig
```python
RetryConfig(
    max_attempts=3,
    initial_delay=1.0,
    backoff_factor=2.0,
    max_delay=60.0
)
```
## Установка
```bash
pip install workerlib
```
Требования: Python 3.8+, aio_pika
