Metadata-Version: 2.1
Name: django-i3tasks
Version: 0.0.26
Summary: Django app for manage async tasks by http requests
Home-page: https://github.com/sajlx/django-i3tasks
Author: Ivan Bettarini
Author-email: ivan.bettarini@gmail.com
License: GNU General Public License v3.0
Classifier: Environment :: Web Environment
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Framework :: Django
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: django
Requires-Dist: djangorestframework
Requires-Dist: requests
Requires-Dist: croniter >=2.0.1

# django-i3tasks

Django app for managing async tasks via HTTP using Google Cloud Pub/Sub.

```
pip install django-i3tasks
```

---

## Quick start

### 1. Add to `INSTALLED_APPS`

```python
INSTALLED_APPS = [
    ...,
    "django_i3tasks",
]
```

### 2. Include the URL configuration

```python
# urls.py
from django.urls import path, include

urlpatterns = [
    ...,
    path("i3/", include("django_i3tasks.urls")),
]
```

This registers two endpoints:
- `POST /i3/tasks-push/` — receives tasks pushed by Pub/Sub
- `POST /i3/tasks-beat/` — triggered by an external scheduler (e.g. Google Cloud Scheduler) to run scheduled tasks

### 3. Run migrations

```bash
python manage.py migrate
```

This creates the tables for task executions, attempts, and results.

### 4. Configure settings

#### Local / emulator

```python
from django_i3tasks.types import I3TasksSettings, Queue, Schedule

PUBSUB_CONFIG = {
    "EMULATOR": True,
    "HOST": "localhost:8085",       # or named host in Docker Compose
    "PROJECT_ID": "my-project",
    "CREDENTIALS": False,
}

I3TASKS = I3TasksSettings(
    namespace=f"tasks.{SHORT_PROJECT_NAME}",
    default_queue=Queue(
        queue_name="default",
        subscription_name="default",
        push_endpoint="http://localhost:8000/i3/tasks-push/",
    ),
    other_queues=(),
    schedules=(
        Schedule(
            module_name="myapp.tasks",
            func_name="my_scheduled_task",
            cron="* * * * *",
            args=[],
            kwargs={},
        ),
    ),
)
```

#### Production (Google Cloud)

```python
from django_i3tasks.types import I3TasksSettings, Queue, Schedule

PUBSUB_CONFIG = {
    "EMULATOR": False,
    "PROJECT_ID": "my-project",
    "CREDENTIALS": "/app/conf/credentials.json",  # path to service account JSON
}

I3TASKS = I3TasksSettings(
    namespace=f"tasks.{SHORT_PROJECT_NAME}",
    default_queue=Queue(
        queue_name="default",
        subscription_name="default",
        push_endpoint="https://your-host.example.com/i3/tasks-push/",
    ),
    other_queues=(),
    schedules=(),
)
```

### 5. Ensure Pub/Sub topics and subscriptions exist

Run this once to create the required Pub/Sub resources:

```bash
python manage.py i3tasks_ensure_pubsub
```

This is also called automatically on startup if `run_queue_create_command_on_startup=True` (the default).

---

## Defining tasks

Decorate any function with `@TaskDecorator` to make it an async task:

```python
# myapp/tasks.py
from django_i3tasks.utils import TaskDecorator

@TaskDecorator
def send_email(recipient, subject, body):
    # your logic here
    pass
```

### Running a task asynchronously

```python
from myapp.tasks import send_email

send_email.delay("user@example.com", "Hello", "World")
# or equivalently:
send_email.async_run("user@example.com", "Hello", "World")
```

### Running a task synchronously

```python
send_email.sync_run("user@example.com", "Hello", "World")
# or call it directly:
send_email("user@example.com", "Hello", "World")
```

### Accessing task metadata inside the function (`bind`)

When `bind=True`, the task receives itself as `task_metadata`:

```python
@TaskDecorator(bind=True)
def my_task(arg1, task_metadata=None):
    print(task_metadata)  # TaskObj instance
```

---

## Task chaining

`.delay()` returns a `ChainHandle`. Use `.then()` to schedule a follow-up task that runs after the current one succeeds:

```python
from myapp.tasks import send_email, log_sent

send_email.delay("user@example.com", "Hello", "World").then(log_sent)
```

You can chain multiple steps:

```python
send_email.delay(...).then(step_two).then(step_three)
```

Each step is persisted to the database. If the original task is executed by Pub/Sub, the next step in the chain is enqueued automatically on success.

### `on_success` shorthand

For a single fixed follow-up, declare it on the decorator:

```python
@TaskDecorator(on_success=log_sent)
def send_email(recipient, subject, body):
    ...
```

Every `.delay()` call will automatically chain `log_sent` after a successful execution.

---

## Task groups (fan-out / join)

Use `TaskGroup` to fan out N parallel tasks and run a callback when all of them succeed.

### Basic usage

```python
from django_i3tasks.models import TaskGroup
from myapp.tasks import process_item, all_done

# 1. Create the group — declare the callback and the expected member count.
group = TaskGroup.create(callback=all_done, total_count=3)

# 2. Dispatch member tasks, passing the group via __i3group__.
for item in items:
    process_item.delay(item, __i3group__=group)
```

`all_done` is called automatically once all 3 members complete successfully. If any member exceeds its retry limit, the group is marked `failed` and the callback is never called.

### Callback with a chain

Use `build_chain()` to attach a chain to the callback without dispatching it immediately:

```python
from myapp.tasks import all_done, notify_admin

chain = all_done.build_chain().then(notify_admin)
group = TaskGroup.create(callback=chain, total_count=3)
```

When the join fires, `all_done` is called and `notify_admin` is chained after it.

### `TaskGroup` states

| Status | Meaning |
|--------|---------|
| `pending` | Waiting for members to complete |
| `success` | All members succeeded; callback dispatched |
| `failed` | At least one member exceeded retries |

---

## `I3TasksSettings` reference

| Parameter | Type | Default | Description |
|---|---|---|---|
| `namespace` | `str` | required | Prefix for Pub/Sub topic/subscription names |
| `default_queue` | `Queue` | required | Default queue configuration |
| `other_queues` | `tuple[Queue]` | `()` | Additional queues |
| `schedules` | `tuple[Schedule]` | `()` | Scheduled tasks (cron-based) |
| `force_sync` | `bool` | `False` | If `True`, `.delay()` runs synchronously (useful for testing) |
| `default_max_retries` | `int` | `3` | Maximum retry attempts on failure |
| `run_queue_create_command_on_startup` | `bool` | `True` | Auto-run `i3tasks_ensure_pubsub` on app startup |

---

## How it works

1. `.delay()` serializes the task and publishes it to Google Cloud Pub/Sub.
2. A `TaskExecution` and a `TaskExecutionTry` record are saved to the database.
3. The Pub/Sub push subscription delivers the message to `/i3/tasks-push/`.
4. The endpoint deserializes and executes the task, saving the result.
5. On failure, the task is re-enqueued up to `default_max_retries` times.

Scheduled tasks are triggered by hitting `/i3/tasks-beat/`. The app evaluates each configured `Schedule`'s cron expression and runs matching tasks.
