Metadata-Version: 2.4
Name: hawkapi-celery
Version: 0.1.0
Summary: Celery integration for HawkAPI — async tasks, beat scheduler, context propagation, healthchecks, eager-mode fixtures
Project-URL: Homepage, https://pypi.org/project/hawkapi-celery/
Project-URL: Repository, https://github.com/ashimov/hawkapi-celery
Project-URL: Issues, https://github.com/ashimov/hawkapi-celery/issues
Author-email: HawkAPI Contributors <hawkapi@users.noreply.github.com>
License: MIT License
        
        Copyright (c) 2026 HawkAPI Contributors
        
        Permission is hereby granted, free of charge, to any person obtaining a copy
        of this software and associated documentation files (the "Software"), to deal
        in the Software without restriction, including without limitation the rights
        to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
        copies of the Software, and to permit persons to whom the Software is
        furnished to do so, subject to the following conditions:
        
        The above copyright notice and this permission notice shall be included in all
        copies or substantial portions of the Software.
        
        THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
        IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
        FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
        AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
        LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
        OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
        SOFTWARE.
License-File: LICENSE
Keywords: async,background,beat,celery,hawkapi,queue,tasks
Classifier: Development Status :: 5 - Production/Stable
Classifier: Framework :: AsyncIO
Classifier: Framework :: Celery
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: System :: Distributed Computing
Classifier: Typing :: Typed
Requires-Python: >=3.12
Requires-Dist: celery>=5.4
Requires-Dist: hawkapi>=0.1.7
Provides-Extra: dev
Requires-Dist: pyright>=1.1; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.24; extra == 'dev'
Requires-Dist: pytest>=8.0; extra == 'dev'
Requires-Dist: redis>=5.0; extra == 'dev'
Requires-Dist: ruff>=0.8; extra == 'dev'
Provides-Extra: redis
Requires-Dist: redis>=5.0; extra == 'redis'
Description-Content-Type: text/markdown

# hawkapi-celery

Celery integration for [HawkAPI](https://github.com/ashimov/HawkAPI). Async tasks, beat scheduler, request-context propagation, broker/worker healthchecks, and eager-mode fixtures for tests.

## Install

```bash
pip install hawkapi-celery
pip install 'hawkapi-celery[redis]'         # adds redis client
```

## Quickstart

```python
from hawkapi import Depends, HawkAPI
from celery import Celery
from hawkapi_celery import (
    CeleryConfig, bind_context, get_celery, init_celery, task,
)


celery_app: Celery  # populated below


def make_app() -> HawkAPI:
    app = HawkAPI()
    global celery_app
    celery_app = init_celery(
        app,
        config=CeleryConfig(
            broker_url="redis://localhost:6379/0",
            result_backend="redis://localhost:6379/0",
        ),
    )


    @task(celery_app, name="emails.send")
    async def send_email(to: str, subject: str, body: str) -> None:
        ...  # any await-able send logic


    @app.post("/notify")
    async def notify(email: str, c: Celery = Depends(get_celery)):
        with bind_context(request_id="…"):
            send_email.delay(email, "Welcome", "Hello!")
        return {"ok": True}

    return app
```

## Tasks

```python
from hawkapi_celery import task

@task(celery_app, name="myapp.work", queue="default",
      autoretry_for=(ConnectionError,), retry_backoff=True, max_retries=5)
async def work(x: int) -> int:        # async def — runs on a private event loop
    ...
    return x * 2


@task(celery_app, bind=True)
def slow_work(self, payload):          # sync — bound `self` for retry handling
    try:
        do_thing(payload)
    except TransientError as exc:
        raise self.retry(exc=exc, countdown=compute_backoff(self.request.retries))
```

## Beat (periodic tasks)

```python
from datetime import timedelta
from hawkapi_celery import Periodic, add_periodic, crontab, every

add_periodic(celery_app, "cleanup",
             Periodic(task="myapp.cleanup", schedule=every(timedelta(hours=1))))

add_periodic(celery_app, "nightly_report",
             Periodic(task="myapp.report", schedule=crontab(hour=2, minute=0),
                      kwargs={"date": "yesterday"}))
```

## Context propagation

`bind_context()` carries a dict from the HTTP handler to the worker process via the task headers. Inside the task call `current_context()` to read it back.

```python
from hawkapi_celery import bind_context, current_context

@task(celery_app, name="log.event")
def log_event(payload: dict) -> None:
    ctx = current_context()                  # {"request_id": "…", "user_id": "…"}
    log.info("event", **ctx, **payload)


@app.post("/event")
async def post_event(p: Payload):
    with bind_context(request_id=p.request_id, user_id=p.user_id):
        log_event.delay(p.model_dump())
```

Wired automatically by `init_celery(..., propagate_context=True)` (default).

## Healthchecks

```python
from hawkapi_celery import healthcheck


@app.get("/healthz")
async def healthz():
    report = healthcheck(celery_app, timeout=2.0)
    return {
        "broker": report.broker_ok,
        "workers_alive": report.workers_alive,
        "workers": list(report.workers),
    }
```

## Testing

```python
from hawkapi_celery import eager_mode, record_tasks


def test_signup_enqueues_welcome_email(client, celery_app):
    with record_tasks(celery_app) as recorder:
        client.post("/signup", json={"email": "x@y.z"})
    assert any(t.name == "emails.send" for t in recorder.captured)


def test_signup_runs_welcome_email_inline(client, celery_app):
    with eager_mode(celery_app):
        client.post("/signup", json={"email": "x@y.z"})
    # All tasks executed synchronously in-process — assert their side-effects directly.
```

## CeleryConfig

```python
CeleryConfig(
    broker_url="redis://localhost:6379/0",
    result_backend="redis://localhost:6379/0",
    task_serializer="json",
    timezone="UTC",
    task_time_limit=600,
    task_soft_time_limit=540,
    worker_prefetch_multiplier=1,
    worker_max_tasks_per_child=1000,
    task_default_queue="default",
    extra_kwargs={...},          # forwarded to celery.conf.update
)
```

## Development

```bash
git clone https://github.com/ashimov/hawkapi-celery.git
cd hawkapi-celery
uv sync --extra dev
uv run pytest -q
uv run ruff check . && uv run ruff format --check .
uv run pyright src/
```

## License

MIT.
