Metadata-Version: 2.4
Name: fastpluggy-tasks-worker
Version: 0.3.282
Summary: Task Runner plugin for Fastpluggy
Author: FastPluggy Team
License-Expression: MIT
Requires-Python: >=3.10
Description-Content-Type: text/markdown
Requires-Dist: FastPluggy>=0.4.22
Requires-Dist: fastpluggy-crud-tools>=0.2.12
Requires-Dist: croniter
Requires-Dist: psutil
Requires-Dist: python-dateutil>=2.9.0
Provides-Extra: rabbitmq
Requires-Dist: pika>=1.3.0; extra == "rabbitmq"
Provides-Extra: postgres
Requires-Dist: SQLAlchemy>=2.0; extra == "postgres"
Requires-Dist: psycopg[binary]>=3.1; extra == "postgres"
Provides-Extra: dev
Requires-Dist: pytest>=7.0; extra == "dev"
Requires-Dist: pytest-cov>=4.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.21; extra == "dev"
Requires-Dist: testcontainers[postgres,rabbitmq]>=4.0.0; extra == "dev"
Provides-Extra: tests
Requires-Dist: pytest>=7.0; extra == "tests"
Requires-Dist: pytest-cov>=4.0; extra == "tests"
Requires-Dist: pytest-asyncio>=0.21; extra == "tests"
Requires-Dist: testcontainers[postgres,rabbitmq]>=4.0.0; extra == "tests"
Provides-Extra: e2e
Requires-Dist: playwright>=1.40.0; extra == "e2e"
Requires-Dist: fastpluggy-example-plugin; extra == "e2e"

# FastPluggy Task Runner

![Task Runner](https://img.shields.io/badge/FastPluggy-Task%20Runner-blue)
[![Release](https://gitlab.ggcorp.fr/open/fastpluggy/plugins/tasks_worker/-/badges/release.svg)](https://gitlab.ggcorp.fr/open/fastpluggy/plugins/tasks_worker/-/releases)
[![Pipeline Status](https://gitlab.ggcorp.fr/open/fastpluggy/plugins/tasks_worker/badges/main/pipeline.svg?key_text=CI)](https://gitlab.ggcorp.fr/open/fastpluggy/plugins/tasks_worker/-/pipelines?ignore_skipped=true)
[![Coverage](https://gitlab.ggcorp.fr/open/fastpluggy/plugins/tasks_worker/badges/main/coverage.svg)](https://gitlab.ggcorp.fr/open/fastpluggy/plugins/tasks_worker/-/pipelines)

A powerful and extensible **task execution framework** for Python, built on top of [FastPluggy](https://fastpluggy.xyz).  
Easily register, run, monitor, and schedule background tasks with full support for retries, logging, live WebSocket updates, and notifications.

---

## ✨ Features

- 🔧 Task registration with metadata, retries, scheduling, and custom parameters
- 🧠 Dynamic form generation from metadata
- 📡 Live logs and WebSocket updates
- 📅 CRON-based scheduler with optional notification rules
- 🔁 Retry logic with auto-link to parent task
- 🔒 Non-concurrent task execution with lock tracking
- 🧩 Extensible subscribers system (Console, Slack, Webhook...)
- 📊 Admin UI to manage tasks, schedules, locks, and reports
- 💾 Persistent task context and rehydration
- 📈 Task metrics from process/thread info

---

## 🛠️ How It Works

```python
@TaskWorker.register(
    description="Sync data every 5 mins",
    schedule="*/5 * * * *",
    max_retries=3,
    allow_concurrent=False
)
def sync_data_task():
    print("Sync running...")
```

For detailed instructions on creating tasks and triggering them from JavaScript, see the [Task Creation and JS Triggering Guide](docs/task_creation_and_js_triggering.md).

For information about Jinja template global variables available for task triggering, see the [Jinja Template Globals documentation](docs/jinja_template_globals.md).

---

## 📋 Roadmap

### ✅ Completed / In Progress

- [x] Task registration with metadata (`description`, `tags`, `max_retries`, `schedule`, `allow_concurrent`)
- [x] Dynamic task form rendering via metadata
- [x] Notification/subscribers system with:
  - Console / webhook / Slack (optional)
  - Selectable events: `task_started`, `task_failed`, `logs`, etc.
- [x] Context/report tracking in DB
- [x] Task retry linking via `parent_task_id`
- [x] CRON-based scheduler loop
- [x] Web UI for:
  - Task logs
  - Task reports
  - Scheduled tasks
  - Locks
  - Running task status
- [x] Lock manager (`TaskLockManager`) with DB tracking
- [x] Cancel button for live-running tasks

---

### 📌 Upcoming Features

#### 🔁 Task Queue Enhancements
- [ ] Priority & rate-limit execution
- [ ] Per-user concurrency limits
- [ ] Task dependencies / DAG runner

#### 🧠 Task Registry & Detection
- [x] Auto-discovery of task definitions from modules
- [x] Celery-style shared task detection


#### 💾 Persistence & Rehydration
- [x] Save function reference + args for replay/retry
- [x] Task dependency tree and retry visualization

#### 🌐 Remote Workers
- [ ] Register and manage remote workers
- [ ] Assign tasks based on tags/strategies
- [x] Remote heartbeat & health monitoring

#### 📈 Observability
- [ ] Task metrics via `psutil` (CPU, memory, threads)
- [ ] UI views for thread/process diagnostics

---

## Standalone Worker

Run task workers as a standalone long-running process, independent of the FastAPI dev server:

```bash
# Worker only (consumes and executes tasks)
fastpluggy tasks-worker start

# Worker + scheduler (beat) — simple setups, dev
fastpluggy tasks-worker start --beat

# Standalone scheduler (recommended for production)
fastpluggy tasks-worker beat

# Use RabbitMQ in production
fastpluggy tasks-worker start --broker-type rabbitmq --broker-dsn amqp://user:pass@rabbit:5672/

# Consume only specific topics with 4 threads per worker
fastpluggy tasks-worker start --topics email,reports --max-workers 4

# Verbose logging for debugging
fastpluggy tasks-worker start --log-level DEBUG

# Multiple workers with PostgreSQL broker
fastpluggy tasks-worker start -n 3 --broker-type postgres --broker-dsn postgresql://localhost/tasks
```

The process blocks until interrupted with `Ctrl+C` or `SIGTERM`, then performs a graceful shutdown.

### RabbitMQ vhost auto-creation

When using the RabbitMQ broker, the worker automatically creates the vhost specified in the DSN if it does not already exist. This uses the RabbitMQ Management HTTP API (port 15672) and grants full permissions to the connecting user. If the management API is unreachable (not exposed, firewalled, or the user lacks admin rights), the check is silently skipped — the worker will connect normally if the vhost already exists, or fail with a clear error if it doesn't.

### Production deployment

For production, run the scheduler (beat) and workers as separate processes:

```bash
# One beat process — reads scheduled tasks from DB, submits when due
fastpluggy tasks-worker beat --broker-type rabbitmq --broker-dsn amqp://...

# N worker processes — consume and execute tasks
fastpluggy tasks-worker start -n 4 --broker-type rabbitmq --broker-dsn amqp://...
```

### Options

| Option | Description |
|--------|-------------|
| `-n`, `--workers` | Number of workers to start (default: `$WORKER_NUMBER` or 1) |
| `--beat` | Also start the scheduler alongside workers (for `start` command) |
| `--broker-type` | Broker backend: `local`, `memory`, `rabbitmq`, `postgres` (overrides `$BROKER_TYPE`) |
| `--broker-dsn` | Broker connection string (overrides `$BROKER_DSN`) |
| `--topics` | Comma-separated list of topics to consume (default: all) |
| `--max-workers` | Thread pool size per worker (default: 8) |
| `--log-level` | Logging level: `DEBUG`, `INFO`, `WARNING`, `ERROR` (default: `INFO`) |

### Topic Routing

Topics determine which queue tasks are published to and consumed from. Resolution order:

1. **`FORCE_TASK_TOPIC`** — if set, overrides everything. Both publish and consume use this value.
2. **Explicit `topic=` argument** — passed to `TaskWorker.submit(my_task, topic="email")`.
3. **Function metadata** — set via `@TaskWorker.register(topic="reports")`.
4. **`DEFAULT_TOPIC`** — fallback (default: `"default"`).

| Setting | Env var | Description |
|---------|---------|-------------|
| `default_topic` | `DEFAULT_TOPIC` | Fallback topic for publish and consume (default: `"default"`) |
| `force_task_topic` | `FORCE_TASK_TOPIC` | Hard override — locks both publish and consume to this value |

**Examples:**

```bash
# Pin a worker to a specific queue (useful for dedicated workers or debugging)
FORCE_TASK_TOPIC=gpu-worker fastpluggy tasks-worker start

# Two workers on the same machine, each consuming a different queue
FORCE_TASK_TOPIC=worker-a fastpluggy tasks-worker start &
FORCE_TASK_TOPIC=worker-b fastpluggy tasks-worker start &

# Normal mode — worker consumes all topics, tasks route via metadata or default
fastpluggy tasks-worker start
```

---

## 🧪 Testing

This plugin includes comprehensive test coverage with pytest.

### Running Tests Locally

```bash
# Install development dependencies
pip install -e ".[dev]"

# Run all tests
pytest tests/

# Run tests with coverage report
pytest tests/ --cov=src --cov-report=term-missing --cov-report=html

# Run specific test file
pytest tests/test_runner_topics.py -v

# Run tests with specific markers
pytest tests/ -m unit  # Only unit tests
pytest tests/ -m "not slow"  # Skip slow tests
```

### CI/CD Integration

Tests are automatically run in the GitLab CI/CD pipeline on:
- Merge requests
- Main branch commits

Coverage reports are generated and stored as artifacts for 30 days.

---

## 📦 Tech Stack

- FastAPI + FastPluggy
- SQLAlchemy + SQLite/PostgreSQL
- WTForms + Jinja2 + Bootstrap (Tabler)
- WebSockets for real-time feedback
- Plugin-ready & modular architecture

---

## 🧠 Philosophy

This runner is built to be:

- **Introspective**: auto-generate UIs from functions
- **Composable**: integrate with your FastPluggy app
- **Scalable**: support single-machine and multi-worker environments
- **Extensible**: notifiers, hooks, CRON, logs

---

## 📎 License

MIT – Use freely and contribute 💙

---

## 🚀 Contributions Welcome!

Open issues, send PRs, share ideas —  
Let’s build the most pluggable Python task runner together.

### Warning:
Does not work with SQLite due to JSONB field requirements.
