Metadata-Version: 2.4
Name: qode-task-worker
Version: 1.0.3
Summary: A Python library for task management with RabbitMQ and polling support
Author-email: Bao Ninh <bao.ninh@qode.world>
Maintainer-email: Bao Ninh <bao.ninh@qode.world>
License-Expression: MIT
Keywords: task,worker,rabbitmq,polling,queue
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: requests>=2.25.0
Requires-Dist: pika>=1.2.0
Dynamic: license-file

# Task Worker Library

A Python library for task management with RabbitMQ and polling support. This library provides a unified interface for consuming tasks from a task management API with automatic fallback between RabbitMQ and polling modes.

## Features

- **Dual Mode Support**: RabbitMQ with manual acknowledgment/nack and polling fallback
- **Automatic Fallback**: Seamlessly switches to polling if RabbitMQ is unavailable
- **Manual Acknowledgment**: Proper message handling with ack/nack for RabbitMQ
- **Configurable**: Flexible configuration for different environments
- **Error Handling**: Robust error handling with retry mechanisms
- **Logging**: Comprehensive logging for debugging and monitoring

## Installation

```bash
pip install task-worker
```

For development:
```bash
pip install task-worker[dev]
```

## Quick Start

```python
import logging
from task_worker import TaskWorker, TaskWorkerConfig

# Configure logging
logging.basicConfig(level=logging.INFO)

# Define your task processing function
def process_my_task(task):
    """Your custom task processing logic"""
    task_id = task.get('id')
    metadata = task.get('metadata', {})
    
    print(f"Processing task {task_id}: {metadata}")
    
    # Your business logic here
    
    # The library handles task registration and result saving
    # You just need to process the task data
    return result;

# Configure the worker
config = TaskWorkerConfig(
    task_management_endpoint="https://your-task-api.com",
    rabbitmq_consumer_host="amqps://user:pass@your-rabbitmq.com/vhost",
    task_type="your-task-type",
    worker_name="my-worker",
    polling_interval=5
)

# Create and start the worker
worker = TaskWorker(config, process_my_task)
worker.start()  # This will run indefinitely
```

## Configuration

### TaskWorkerConfig Parameters

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `task_management_endpoint` | str | ENV or default URL | Task management API endpoint |
| `rabbitmq_consumer_host` | str | ENV or None | RabbitMQ connection URL |
| `task_type` | str | "default" | Type of tasks to process |
| `worker_name` | str | hostname | Worker identifier |
| `polling_interval` | int | 5 | Seconds between polls in polling mode |
| `rabbitmq_queue_prefix` | str | "task.available" | RabbitMQ queue prefix |
| `prefetch_count` | int | 1 | RabbitMQ prefetch count |
| `max_retry_attempts` | int | 3 | Max retry attempts for API calls |

### Environment Variables

The library automatically reads from these environment variables:

- `TASK_MANAGEMENT_ENDPOINT`: Task management API URL
- `RABBITMQ_CONSUMER_HOST`: RabbitMQ connection string

## Advanced Usage

### Status Monitoring

```python
worker = TaskWorker(config, process_task)

# Get worker status
status = worker.get_status()
print(f"Mode: {status['mode']}")
print(f"Task Type: {status['task_type']}")
print(f"RabbitMQ Connected: {status['rabbitmq_connected']}")

# Check if running in RabbitMQ mode
if worker.is_running_rabbitmq_mode():
    print("Running with RabbitMQ")
else:
    print("Running in polling mode")
```

## Error Handling

The library provides robust error handling:

### RabbitMQ Mode
- **Message Parsing Errors**: Messages are nacked without requeue to prevent infinite loops
- **Task Processing Errors**: Messages are nacked with requeue for retry by other workers
- **Connection Failures**: Automatic fallback to polling mode

### Polling Mode
- **API Failures**: Logged and retried after polling interval
- **Processing Errors**: Logged (no requeue mechanism in polling)

## Message Format

Expected RabbitMQ message format:

```json
{
  "taskType": "your-task-type",
  "metadata": {
    "additional": "data"
  }
}
```

Task API response format:

```json
{
  "data": {
    "id": "task-id-123",
    "taskType": "your-task-type", 
    "metadata": {
        "metadata": "metadata"
    },
    "webhookApi": "https://callback-url.com"
  }
}
```

## License

MIT License

## Contributing

1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Add tests
5. Submit a pull request 
