Metadata-Version: 2.4
Name: async-task-kit
Version: 0.1.1
Summary: A powerful async task processing kit based on RabbitMQ with Coroutine, Thread, and Process support.
Author-email: realwrtoff <realwrtoff@gmail.com>
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Requires-Python: >=3.12
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: aio-pika>=9.4.0
Requires-Dist: python-dotenv>=1.0.0
Dynamic: license-file

# async-task-kit

A powerful async task processing kit based on RabbitMQ with Coroutine, Thread, and Process support.

## Installation

```bash
pip install async-task-kit
```

## Features

- **RabbitMQ Client**: Robust connection pooling and delay/dead-letter queue support.
- **Multiple Consumer Models**: Support for Coroutine (asyncio), Thread, and Process consumers depending on your workload (I/O-bound vs CPU-bound).
- **Extensible Processor**: Easily define your task logic by inheriting `TaskProcessor`.
- **Built-in Logger & EnvLoader**: Useful utilities for production-ready applications.

## Quick Start

### 1. Configuration (`.env`)

Use the built-in `EnvLoader` to manage your environment variables. Create a `.env` file:

```env
RABBITMQ_URL=amqp://guest:guest@localhost/
TASK_IDS=demo_task

# You can also configure specific task settings using the {TASK_ID}_ prefix
DEMO_TASK_QUEUE_NAME=my_demo_queue
DEMO_TASK_CONCURRENCY=3
```

### 2. Define your Task Processor (`demo_processor.py`)

```python
import logging
from async_task_kit import TaskProcessor

logger = logging.getLogger(__name__)

class DemoProcessor(TaskProcessor):
    async def process(self, task: dict):
        logger.info(f"Processing task: {task}")
        # Return any truthy value (e.g., dict, object, True) for success and pass to callback. 
        # Return None or False to trigger retry.
        return {"status": "ok", "processed_data": task}

    async def callback(self, task: dict, result: any):
        logger.info(f"Task completed with result: {result}")
```

### 3. Main Consumer Application (`main.py`)

A production-ready setup with signal handling for graceful shutdown.

```python
import asyncio
import logging
import signal
from typing import List, Type

from demo_processor import DemoProcessor

# -------------- 只需要改这一行来切换并发模型 --------------
from async_task_kit import CoroutineConsumer as Consumer
# from async_task_kit import ThreadConsumer as Consumer
# from async_task_kit import ProcessConsumer as Consumer
# --------------------------------------------------------

from async_task_kit import TaskProcessor, EnvLoader, setup_logger

# Initialize logger
setup_logger()
logger = logging.getLogger(__name__)

# Register your processors
TASK_REGISTRY: dict[str, Type[TaskProcessor]] = {
    "demo_task": DemoProcessor,
}

consumers: List[Consumer] = []

async def run_all_consumers(amqp_url: str, task_ids: List[str]):
    tasks = []
    for task_id in task_ids:
        if task_id not in TASK_REGISTRY:
            continue

        processor_cls = TASK_REGISTRY[task_id]
        processor = processor_cls(task_id=task_id)

        consumer = Consumer(
            amqp_url=amqp_url,
            queue_name=processor.queue_name,
            processor=processor,
            concurrency=processor.concurrency,
        )
        consumers.append(consumer)
        tasks.append(consumer.start())

        logger.info(f"🚀 启动任务 [{task_id}] | queue={processor.queue_name} | 并发={processor.concurrency}")

    await asyncio.gather(*tasks)

async def shutdown_all():
    logger.info("🛑 优雅关闭所有消费者...")
    for consumer in consumers:
        await consumer.stop()
    logger.info("✅ 所有消费者已关闭")

def handle_exit_signal(*args, **kwargs):
    asyncio.create_task(shutdown_all())

async def main():
    env = EnvLoader()
    amqp_url = env.get("RABBITMQ_URL")
    task_ids_str = env.get("TASK_IDS", "").strip()

    if not task_ids_str:
        logger.warning("⚠️ 未配置 TASK_IDS")
        return

    task_ids = [t.strip() for t in task_ids_str.split(",") if t.strip()]
    valid_tasks = [t for t in task_ids if t in TASK_REGISTRY]

    loop = asyncio.get_running_loop()
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, handle_exit_signal)

    await run_all_consumers(amqp_url, valid_tasks)

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("👋 服务已安全退出")
```

### 4. Publishing Tasks (`publisher.py`)

```python
import asyncio
import logging
from async_task_kit import RabbitMQ, setup_logger

setup_logger()
logger = logging.getLogger(__name__)

async def publish():
    rmq = RabbitMQ("amqp://guest:guest@localhost/")
    await rmq.init()
    
    await rmq.push("my_demo_queue", {"message": "Hello from async-task-kit!"})
    logger.info("Task published successfully.")
    
    await rmq.close()

if __name__ == "__main__":
    asyncio.run(publish())
```

## License

MIT

## Contact & Support

If you have any questions, suggestions, or need help with this library, feel free to reach out!

**WeChat (微信)**: `realwrtoff`  
**Email**: `realwrtoff@gmail.com`
