Metadata-Version: 2.4
Name: pgwerk
Version: 0.1.15
Summary: Postgres-backed job queue. Durable, visible, transactional.
Project-URL: Repository, https://github.com/pgwerk/pgwerk
Project-URL: Homepage, https://pgwerk.github.io/pgwerk
Project-URL: Documentation, https://pgwerk.github.io/pgwerk
Project-URL: Bug Tracker, https://github.com/pgwerk/pgwerk/issues
Author-email: pgwerk <21188280+ccrvlh@users.noreply.github.com>
License: MIT License
        
        Copyright (c) 2024 ccrvlh
        
        Permission is hereby granted, free of charge, to any person obtaining a copy
        of this software and associated documentation files (the "Software"), to deal
        in the Software without restriction, including without limitation the rights
        to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
        copies of the Software, and to permit persons to whom the Software is
        furnished to do so, subject to the following conditions:
        
        The above copyright notice and this permission notice shall be included in all
        copies or substantial portions of the Software.
        
        THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
        IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
        FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
        AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
        LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
        OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
        SOFTWARE.
License-File: LICENSE
Keywords: asyncio,background jobs,background tasks,celery alternative,distributed jobs,job queue,no redis,pgqueue,postgres,postgresql,select for update,skip locked,task queue,transactional job queue,worker
Classifier: Development Status :: 3 - Alpha
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Database
Classifier: Topic :: Internet
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: System :: Distributed Computing
Classifier: Typing :: Typed
Requires-Python: >=3.11
Requires-Dist: click>=8.0
Requires-Dist: psycopg[async]>=3.1
Provides-Extra: analytics
Requires-Dist: plotext>=5.0; extra == 'analytics'
Requires-Dist: rich>=13.0; extra == 'analytics'
Provides-Extra: api
Requires-Dist: litestar>=2.0; extra == 'api'
Requires-Dist: uvicorn>=0.30; extra == 'api'
Provides-Extra: cron
Requires-Dist: croniter>=2.0; extra == 'cron'
Provides-Extra: dev
Requires-Dist: croniter>=2.0; extra == 'dev'
Requires-Dist: litestar>=2.0; extra == 'dev'
Requires-Dist: mkdocs-shadcn>=0.10; extra == 'dev'
Requires-Dist: mypy>=1.0; extra == 'dev'
Requires-Dist: plotext>=5.0; extra == 'dev'
Requires-Dist: prometheus-client>=0.8; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23; extra == 'dev'
Requires-Dist: pytest-cov>=5.0; extra == 'dev'
Requires-Dist: pytest>=8.0; extra == 'dev'
Requires-Dist: rich>=13.0; extra == 'dev'
Requires-Dist: ruff>=0.4; extra == 'dev'
Requires-Dist: types-croniter>=6.2.2.20260408; extra == 'dev'
Requires-Dist: uvicorn>=0.30; extra == 'dev'
Provides-Extra: exporter
Requires-Dist: prometheus-client>=0.8; extra == 'exporter'
Description-Content-Type: text/markdown

# PGWerk

[![CI](https://github.com/pgwerk/pgwerk/actions/workflows/ci.yml/badge.svg)](https://github.com/pgwerk/pgwerk/actions/workflows/ci.yml)
[![PyPI](https://img.shields.io/pypi/v/pgwerk?v=2)](https://pypi.org/project/pgwerk/)
[![Python](https://img.shields.io/pypi/pyversions/pgwerk?v=2)](https://pypi.org/project/pgwerk/)
[![License: MIT](https://img.shields.io/badge/license-MIT-blue.svg)](LICENSE)
[![Docs](https://img.shields.io/badge/docs-pgwerk.github.io/pgwerk-blue)](https://pgwerk.github.io/pgwerk)

A Postgres-backed job queue. Durable, visible, transactional.

Jobs are rows. Workers poll with `SELECT … FOR UPDATE SKIP LOCKED`. No external broker, no sidecar, just your existing Postgres instance. The schema is created automatically on first connect.

pgwerk does not maintain an internal connection pool. Each operation uses a short-lived connection, making it compatible with external poolers like PgBouncer in transaction pooling mode.

![PGWerk dashboard overview](docs/assets/01-ss-home.png)

---

## Python

### Install

```bash
pip install pgwerk

# Cron support (optional)
pip install "pgwerk[cron]"
```

Requires Python 3.11+ and a Postgres 14+ database.

### Quickstart

Define your app and handlers:

```python
from pgwerk import Werk, Context

app = Werk("postgresql://user:pass@localhost/mydb")

# Call connect() once at startup; disconnect() at shutdown.
# async with app: is shorthand for the same pair.
await app.connect()

async def send_email(to: str):
    ...  # plain handler — ctx is optional

async def send_email_with_context(ctx: Context, to: str):
    ...  # opt in by naming/typing the first parameter as ctx
```

Enqueue jobs from anywhere in your application:

```python
await app.enqueue(send_email, to="user@example.com")
```

Run a worker in a separate process:

```python
import asyncio
from pgwerk import AsyncWorker

async def main():
    worker = AsyncWorker(app=app, queues=["default"], concurrency=10)
    await worker.run()

asyncio.run(main())
```

Handlers are identified by their dotted import path (`myapp.tasks.send_email`). The function itself is passed to `enqueue`; `werk` records its path and imports it on the worker side. `ctx` is injected when the first parameter is named `ctx` or annotated as `Context`.

### Enqueueing

```python
from pgwerk import Retry, Repeat, Dependency
from datetime import datetime, timezone, timedelta

# Basic
await app.enqueue(my_func, x=1)

# With options
await app.enqueue(
    my_func,
    x=1,
    _queue="high",
    _priority=10,
    _delay=30,                         # seconds from now
    _at=datetime(2025, 1, 1, tzinfo=timezone.utc),
    _retry=Retry(max=3, intervals=[10, 60, 300]),  # total attempts, including the first
    _timeout=120,                      # fail after 2 minutes
    _heartbeat=30,                     # worker auto-renews while the job is running
    _key="unique-key",                 # idempotency key — duplicate enqueues are silently dropped
    _group="user:42",                  # at most one active job per group
    _meta={"source": "api"},
    _on_success=notify_done,
    _on_failure=alert_team,
)

# Repeat a job N more times after the first run
await app.enqueue(cleanup, _repeat=Repeat(times=5, interval=3600))

# Depend on another job finishing first
job_a = await app.enqueue(step_one)
await app.enqueue(step_two, _depends_on=job_a)          # waits for job_a
await app.enqueue(step_two, _depends_on=Dependency(job_a, allow_failure=True))

# Bulk enqueue in one round-trip
from pgwerk import EnqueueParams
await app.enqueue_many([
    EnqueueParams(func=my_func, kwargs={"n": i}, queue="bulk") for i in range(100)
])
```

### Workers

```python
from pgwerk import AsyncWorker, ThreadWorker, ProcessWorker, ForkWorker

# Asyncio (default — best for I/O-bound work)
worker = AsyncWorker(app=app, queues=["default", "high"], concurrency=20)

# Thread pool (CPU-light work, blocking libraries)
worker = ThreadWorker(app=app, concurrency=8)

# Process pool (CPU-bound work, true parallelism)
worker = ProcessWorker(app=app, concurrency=4)

# Fork per job (maximum isolation)
worker = ForkWorker(app=app, concurrency=4)

await worker.run()
```

Workers register themselves in the database, send periodic heartbeats, auto-touch jobs that opt into `_heartbeat`, and use `LISTEN/NOTIFY` for instant wake-up when jobs are enqueued.

### Cron

```python
from pgwerk import CronScheduler, CronJob

scheduler = CronScheduler(app)
scheduler.register(CronJob(func=my_func, cron="*/15 * * * *"))  # every 15 min
scheduler.register(CronJob(func=other_func, interval=3600))      # every hour

async with app:
    await scheduler.run()
```

`CronScheduler` uses a Postgres advisory lock so only one process runs the scheduler at a time. Requires `croniter` for cron expressions.

### Serializers

```python
from pgwerk import Werk, PickleSerializer

app = Werk(dsn, serializer=PickleSerializer())  # default is JSONSerializer
```

The configured serializer is used for job payloads, job results, and execution results.

### Job inspection

```python
job = await app.get_job(job_id)
executions = await app.get_executions(job_id)
await app.cancel_job(job_id)
```

### CLI

`APP` is a `module:attribute` path to a `Werk` instance.

#### Worker

```bash
werk worker myapp.tasks:app --queues default,high --concurrency 10 --worker-type async
```

#### Observability API

```bash
werk api --dsn postgresql://localhost/mydb
```

| Flag | Env var | Default | Description |
|---|---|---|---|
| `--dsn` | `PGWERK_DSN` | — | Postgres connection string (**required**) |
| `--host` / `-h` | `PGWERK_HOST` | `127.0.0.1` | Host to bind |
| `--port` / `-p` | `PGWERK_PORT` | `8000` | Port to bind |
| `--schema` | `PGWERK_SCHEMA` | — | Postgres schema for wrk tables |
| `--prefix` | `PGWERK_PREFIX` | — | Table-name prefix |
| `--metrics` | `PGWERK_METRICS` | off | Enable Prometheus metrics at `GET /metrics` |
| `--metrics-interval` | `PGWERK_METRICS_INTERVAL` | `15.0` | Metrics scrape interval in seconds |
| `--no-ui` | `PGWERK_NO_UI` | off | Disable the SPA dashboard |
| `--ui-auth` | `PGWERK_UI_AUTH` | — | Basic Auth for the SPA as `user:password` |
| `--api-token` | `PGWERK_API_TOKEN` | — | Bearer token for all `/api/*` routes |
| `--log-level` / `-l` | `PGWERK_LOG_LEVEL` | `INFO` | `debug`, `info`, `warning`, `error` |
| `--log-format` | `PGWERK_LOG_FORMAT` | `text` | `text` or `json` |
| `--no-color` | `PGWERK_NO_COLOR` | off | Disable colored log output |
| `--reload` | `PGWERK_RELOAD` | off | Auto-reload (development) |

#### Inspection

```bash
# Show queue statistics and active workers
werk info myapp.tasks:app

# Live terminal dashboard
werk dashboard myapp.tasks:app

# List recent jobs
werk jobs myapp.tasks:app

# Show slowest functions by average execution time
werk slowest myapp.tasks:app

# Delete finished jobs
werk purge myapp.tasks:app --status complete,failed
```

---

## Schema

All tables are prefixed (default `_pgwerk_`) and optionally placed in a named schema. Migrations run automatically on connect using an advisory lock to prevent races across multiple processes starting simultaneously.

| Table | Purpose |
|---|---|
| `_pgwerk_worker` | Registered workers + heartbeat tracking |
| `_pgwerk_jobs` | Job queue — all state lives here |
| `_pgwerk_worker_jobs` | Active claim tracking |
| `_pgwerk_jobs_executions` | Per-attempt execution history |
| `_pgwerk_job_deps` | Job dependency graph |


### Job lifecycle

```
queued → active → complete
                ↘ failed   (retries exhausted)
       → waiting           (blocked on dependencies, Python only)
       → aborted           (cancelled before execution)
```

---

## Production security

By default, pgwerk tables land in whatever schema your connection's `search_path` resolves to (usually `public`). For production we recommend a dedicated schema and a role scoped to it, so pgwerk credentials can never touch the rest of your database.

```sql
CREATE SCHEMA pgwerk;
CREATE USER pgwerk_app WITH PASSWORD 'strong-password';
GRANT USAGE, CREATE ON SCHEMA pgwerk TO pgwerk_app;
```

Then point your app at that schema and user:

```python
app = Werk("postgresql://pgwerk_app:strong-password@localhost/mydb", schema="pgwerk")
```

The `CREATE` privilege is needed because pgwerk auto-migrates on first connect. If you prefer to run migrations separately — e.g. as a CI/CD step with elevated credentials — use `werk migrate`:

```bash
# Run once during deployment with a role that has CREATE
werk migrate --dsn postgresql://pgwerk_admin:...@localhost/mydb --schema pgwerk

# Application runtime needs only DML
app = Werk("postgresql://pgwerk_app:...@localhost/mydb", schema="pgwerk", auto_migrate=False)
```

---

## License

MIT
