Metadata-Version: 2.4
Name: workerlib
Version: 0.4.5
Summary: Async RabbitMQ worker utilities
Author: ametist-dev
Maintainer: ametist-dev
Project-URL: Homepage, https://github.com/ametist-dev/workerlib
Project-URL: Repository, https://github.com/ametist-dev/workerlib
Project-URL: Issues, https://github.com/ametist-dev/workerlib/issues
Keywords: async,rabbitmq,aio-pika,workers,queue,messaging,background-jobs
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Intended Audience :: Developers
Classifier: Topic :: Software Development :: Libraries
Classifier: Development Status :: 5 - Production/Stable
Requires-Python: >=3.10
Description-Content-Type: text/markdown
Requires-Dist: aio-pika<10,>=9.0.0

# WorkerLib - асинхронная работа с RabbitMQ
## Быстрый старт
```python
import asyncio
from workerlib import WorkerPool

async def task_handler(data: dict) -> bool:
    print(f"Обработка: {data}")
    return True

async def main():
    async with WorkerPool() as pool:
        pool.add_worker("tasks", task_handler)
        await pool.send("tasks", {"id": 1, "cmd": "start"})
        await asyncio.sleep(2)

asyncio.run(main())
```
## Формат сообщений
**JSON сообщение**
Библиотека автоматически сериализует dict в JSON при отправке:
```python
# Отправка простого сообщения
await pool.send("queue", {
    "event": "user_created",
    "user_id": 123,
    "email": "user@example.com",
    "timestamp": "2024-01-15T10:30:00Z"
})

# Отправка вложенных структур
await pool.send("queue", {
    "type": "order",
    "data": {
        "order_id": "ORD-12345",
        "items": [
            {"id": 1, "quantity": 2},
            {"id": 2, "quantity": 1}
        ],
        "total": 299.99
    },
    "metadata": {
        "source": "api",
        "version": "1.0"
    }
})
```
## Основные примеры
1. Пул с несколькими воркерами
```python
from workerlib import WorkerPool, ErrorHandlingStrategy

async def main():
    async with WorkerPool() as pool:
        # Email воркер с DLQ
        pool.add_worker(
            "emails",
            email_handler,
            error_strategy=ErrorHandlingStrategy.DLQ,
            prefetch_count=5
        )
        
        # Обработчик платежей
        pool.add_worker(
            "payments",
            payment_handler,
            error_strategy=ErrorHandlingStrategy.REQUEUE_END
        )
        
        # Отправка задач
        await pool.send("emails", {"to": "user@test.com"})
        await pool.send("payments", {"amount": 100})
```
2. Кастомное подключение и retry
```python
from workerlib import ConnectionParams, RetryConfig

params = ConnectionParams(
    host="rabbit.local",
    username="admin",
    password="secret"
)

retry_config = RetryConfig(
    max_attempts=3,
    initial_delay=1.0,
    backoff_factor=2.0
)

async with WorkerPool(connection_params=params) as pool:
    pool.add_worker(
        "critical",
        critical_handler,
        retry_config=retry_config
    )
```
3. Обработка ошибок
```python
from workerlib import ErrorHandlingStrategy

# Варианты:
# IGNORE - проигнорировать ошибку
# REQUEUE_END - в конец очереди с задержкой
# REQUEUE_FRONT - в начало очереди
# DLQ - в Dead Letter Queue

pool.add_worker(
    "tasks",
    my_handler,
    error_strategy=ErrorHandlingStrategy.DLQ,
    dlq_enabled=True,
    requeue_delay=5.0  # задержка повторной обработки
)
```
4. Метрики
```python
async with WorkerPool() as pool:
    pool.add_worker("monitored", handler)
    
    # Отправляем задачи
    for i in range(10):
        await pool.send("monitored", {"task": i})
    
    # Получаем метрики
    metrics = pool.get_metrics("monitored")
    print(f"Обработано: {metrics['consumer']['processed']}")
    print(f"Ошибок: {metrics['consumer']['failed']}")
```
5. FastAPI интеграция
```python
from fastapi import FastAPI
from workerlib import WorkerPool

app = FastAPI()
worker_pool = WorkerPool(auto_start=False)

@app.on_event("startup")
async def startup():
    await worker_pool.start()
    worker_pool.add_worker("api_tasks", task_handler)

@app.on_event("shutdown")
async def shutdown():
    await worker_pool.stop()

@app.post("/task")
async def create_task(data: dict):
    await worker_pool.send("api_tasks", data)
    return {"status": "queued"}
```
## Конфигурация
ConnectionParams
```python
ConnectionParams(
    host="127.0.0.1",
    port=5672,
    username="guest",
    password="guest",
    heartbeat=60,
    timeout=10
)
```
QueueConfig
```python
QueueConfig(
    name="queue_name",
    durable=True,
    prefetch_count=1
)
```
RetryConfig
```python
RetryConfig(
    max_attempts=3,
    initial_delay=1.0,
    backoff_factor=2.0,
    max_delay=60.0
)
```
## Установка
```bash
pip install workerlib
```
Требования: Python 3.10+, aio_pika
