Metadata-Version: 2.4
Name: bullmq-py
Version: 1.2.6
Summary: BullMQ manager logic
Home-page: https://github.com/tredops-packages/bullmq-py
Author: Kin
Author-email: Kin <your-email@example.com>
License-Expression: MIT
Project-URL: Homepage, https://github.com/tredops-packages/bullmq-py
Project-URL: Repository, https://github.com/tredops-packages/bullmq-py
Project-URL: Issues, https://github.com/tredops-packages/bullmq-py/issues
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Requires-Python: >=3.9
Description-Content-Type: text/markdown
Requires-Dist: bullmq==2.18.3
Provides-Extra: test
Requires-Dist: pytest; extra == "test"
Requires-Dist: python-dotenv==1.0.1; extra == "test"
Dynamic: author
Dynamic: home-page
Dynamic: requires-python

# TREDOPS BULLMQ UTILS

A complete utility library for BullMQ management and administration, designed to simplify message queue operations in enterprise-level Python applications.

## 🎯 Objectives

This library provides a comprehensive set of tools to:

- **Simplify BullMQ management** with intuitive Python interfaces
- **Monitor and administer queues** in real-time
- **Implement backup/restore systems** for operational continuity
- **Handle priorities, delays, and job cancellations** efficiently
- **Integrate with MongoDB** for persistence and auditing

## 📦 Main Components

### 🔧 **BullMQ Consumer (`consumer_bullmq.py`)**
- **Objective**: Simplified and robust consumer for processing messages
- **Features**:
  - Safe signal handling (SIGINT/SIGTERM)
  - Automatic queue configuration
  - Integrated logging and error handling
  - Reduced from 200+ lines to 78 lines of clean code

### 📤 **BullMQ Sender (`bullmq_sender.py`)**
- **Objective**: Message sending with advanced features
- **Features**:
  - **Priorities**: 1-10 system (default 5)
  - **Delays**: Seconds delay before processing
  - **Custom Job IDs**: For tracking and relationships
  - **Async/sync versions**: Compatibility with different contexts

### ❌ **Job Cancellation (`bullmq_cancel_execution.py`)**
- **Objective**: Cancel jobs before they are processed
- **Features**:
  - Individual and batch cancellation
  - Intelligent search by job_id across multiple states
  - Synchronous and asynchronous versions
  - Handles jobs in states: waiting, delayed, prioritized

### 👁️ **Monitoring and Visibility (`bullmq_visibility.py`)**
- **Objective**: Real-time monitoring of queue status
- **Features**:
  - **Per-queue statistics**: Job count by state
  - **System health**: Detection of saturated or problematic queues
  - **Global summary**: Consolidated view of all queues
  - **Automatic recommendations**: Alerts and optimization suggestions

### 💾 **Backup and Restore (`bullmaq_backup.py`)**
- **Objective**: Complete backup/restore system with MongoDB
- **Features**:
  - **Scalable structure**: One MongoDB document per job
  - **Complete backup**: Captures all job states (waiting, delayed, active, completed, failed)
  - **Selective restore**: By queue, state, or specific backup
  - **Backup management**: List, delete, and recreate backups
  - **MongoDB integration**: `bullmqJobBackup` collection for persistence

## 📋 Backup Data Structure

Each job is stored as an individual document in MongoDB:

```json
{
  "backup_id": "backup_20260105_120000",
  "createAt": "2026-01-05T12:00:00.000Z",
  "updateAt": "2026-01-05T12:00:00.000Z",
  "worker": "processing_queue",
  "job_id": "job_123",
  "job_name": "process_order",
  "job_state": "waiting",
  "message": {
    "order_id": 12345,
    "customer": "John Doe"
  },
  "delay": 0,
  "priority": 5,
  "attempts": 0
}
```

## 🚀 Use Cases

### **1. Message Processing**
```python
from bullmq_sender import bullmq_sender
from consumer_bullmq import consumer_bullmq

# Send message with high priority and delay
job_id = bullmq_sender(
    worker="urgent_tasks",
    priority=9,
    delay_sec=10,
    message={"task": "process_payment", "amount": 1000},
    job_id="payment_12345"
)

# Process messages
consumer_bullmq(worker="urgent_tasks", function_process=payment_processor)
```

### **2. Backup and Restore**
```python
from bullmaq_backup import bullmq_backup, bullmq_restore, bullmq_delete_backup

# Create complete backup
result = bullmq_backup(
    db=mongodb_connection,
    queue_names=["orders", "payments", "notifications"],
    backup_name="daily_backup_20260105"
)

# Restore from backup
restore_result = bullmq_restore(
    db=mongodb_connection,
    backup_id="daily_backup_20260105",
    restore_states=["waiting", "delayed"]  # Only pending jobs
)
```

### **3. Monitoring and Cancellation**
```python
from bullmq_visibility import bullmq_visibility_summary
from bullmq_cancel_execution import bullmq_cancel_execution

# System monitoring
summary = bullmq_visibility_summary(["orders", "payments"])
print(f"Total jobs: {summary['total_jobs']}")

# Cancel specific job
cancel_result = bullmq_cancel_execution(
    job_id="payment_12345",
    queue_name="payments"
)
```

## 🔧 Configuration

### **Environment Variables**
- `REDIS_HOST`: Redis URL (default: `redis://192.168.1.163:6379`)
- MongoDB configuration through `tredops_eval.connect`

### **Dependencies**
- `bullmq`: BullMQ client for Python
- `pymongo`: MongoDB driver
- `asyncio`: For asynchronous operations

## 🧪 Testing

The project includes comprehensive tests that verify:
- ✅ Functionality with real BullMQ
- ✅ Integration with real MongoDB
- ✅ Correct backup document structure
- ✅ Restore and cancellation operations
- ✅ Error handling and edge cases

```bash
# Run tests
pytest tredops_strategy_worker/bullmaq_backup_test.py -v
```

## 📊 Advantages

- **🚀 Scalability**: Individual documents per job, supports thousands of messages
- **🔒 Reliability**: Backup/restore system for operational continuity
- **📈 Observability**: Real-time monitoring and detailed metrics
- **⚡ Performance**: Optimized operations with aggregation pipelines
- **🔧 Flexibility**: Configurable APIs for different use cases
- **🛡️ Robustness**: Complete error handling and automatic recovery

---

**Developed for TREDOPS** - Enterprise BullMQ management system

