Metadata-Version: 2.4
Name: djprocess
Version: 0.0.1
Summary: Django process orchestration with paginated logs and async workers
License: MIT
Requires-Python: >=3.12
Description-Content-Type: text/markdown
Requires-Dist: celery>=5.4
Requires-Dist: django>=6.0
Requires-Dist: django-model-utils>=5.0
Provides-Extra: celery
Requires-Dist: celery>=5.4; extra == "celery"
Provides-Extra: subprocess
Provides-Extra: gvisor
Requires-Dist: djprocess[subprocess]; extra == "gvisor"
Provides-Extra: example
Requires-Dist: djmvc[bulma]>=0.0.1; extra == "example"
Requires-Dist: djprocess[celery]; extra == "example"
Provides-Extra: dev
Requires-Dist: djprocess[example]; extra == "dev"
Requires-Dist: pytest>=7.0; extra == "dev"
Requires-Dist: pytest-django>=4.5; extra == "dev"
Requires-Dist: pytest-asyncio>=0.23; extra == "dev"
Requires-Dist: pytest-cov>=4.0; extra == "dev"
Requires-Dist: black>=23.0; extra == "dev"
Requires-Dist: ruff>=0.1; extra == "dev"

# djprocess

Django process orchestration with paginated logs and Celery workers.

Track long-running jobs in the database, stream structured log lines, and dispatch work to Celery workers.

## Features

- Abstract `BaseProcess` and `BaseProcessLog` models with status lifecycle tracking
- Monotonic, paginated log lines per process
- Per-process `timeout` (default 20 minutes) enforced by a supervisor-queue watchdog
- Escalating kill: SIGTERM retries, then SIGKILL
- Pluggable worker backend (Celery included)
- Eager-mode Celery for fast tests and local development

## Installation

```bash
pip install -e ".[dev]"
```

## Quick start

### 1. Define models

```python
from datetime import timedelta

from django.db import models
from djprocess.models import BaseProcess, BaseProcessLog


class ExportProcess(BaseProcess):
    path = models.CharField(max_length=255)

    def run(self):
        self.log(f"Exporting {self.path}")
        # ... do work ...
        self.log("Export complete")


class ExportProcessLog(BaseProcessLog):
    process = models.ForeignKey(
        ExportProcess,
        on_delete=models.CASCADE,
        related_name="logs",
    )
```

### 2. Configure Django and Celery

```python
INSTALLED_APPS = [
    # ...
    "djprocess",
    "djprocess_celery",
]

CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/0"

DJPROCESS_PROCESS_QUEUE = "processes"
DJPROCESS_SUPERVISOR_QUEUE = "supervisor"
DJPROCESS_KILL_TERM_RETRIES = 3
DJPROCESS_KILL_TERM_WAIT_SECONDS = 2
```

Create `myproject/celery.py`:

```python
import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")

app = Celery("myproject")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
```

### 3. Run a process

```python
process = ExportProcess.objects.create(path="/tmp/data.csv")
process.timeout = timedelta(minutes=45)  # optional, default is 20 minutes
process.start()  # dispatches process + per-job watchdog

page = process.paginate_logs(page=1, per_page=50)
for entry in page:
    print(entry.sequence, entry.level, entry.message)
```

Each `start()` schedules a watchdog on the supervisor queue with `countdown=process.timeout_seconds()`. Different processes can have different timeouts on the same worker pool.

### 4. Start workers

Use the **prefork** pool so Celery can terminate individual child processes:

```bash
celery -A myproject worker -Q processes -c 4 -P prefork -l info
celery -A myproject worker -Q supervisor -c 2 -P prefork -l info
```

Do not use worker-level `--time-limit`; timeouts are per process, not per worker.

## Process lifecycle

| Status      | Meaning                          |
|-------------|----------------------------------|
| `pending`   | Created, not yet started         |
| `running`   | Worker is executing `run()`      |
| `succeeded` | Finished without error           |
| `failed`    | Raised an exception              |
| `cancelled` | Revoked before or during run     |
| `timed_out` | Exceeded `timeout` and was killed |

When the watchdog fires on a still-running process, it sends SIGTERM up to `DJPROCESS_KILL_TERM_RETRIES` times (waiting `DJPROCESS_KILL_TERM_WAIT_SECONDS` between checks), then SIGKILL. The supervisor task retries immediately if the control command itself fails.

Call `process.start()` to dispatch, `process.cancel()` to kill the process task and revoke the watchdog, and `process.execute()` directly when running synchronously in tests.

## Worker backend

The default backend is `djprocess_celery.backend.CeleryWorkerBackend`. Override with:

```python
DJPROCESS_WORKER_BACKEND = "myapp.backends.CustomWorkerBackend"
```

Your backend must implement:

- `dispatch(process) -> (process_task_id, watchdog_task_id)`
- `kill(task_id)` — escalating SIGTERM → SIGKILL
- `revoke_scheduled(task_id)` — cancel a not-yet-run watchdog

## Development

```bash
python manage.py makemigrations
python manage.py migrate
pytest
```

## Example app

This repository includes `djprocess_example` with `ExampleProcess` and test settings configured for in-memory Celery (`CELERY_TASK_ALWAYS_EAGER = True`).

## License

MIT
