Metadata-Version: 2.4
Name: camunda-orchestration-sdk
Version: 8.9.0.dev27
Summary: Python client for Camunda 8 Orchestration Cluster API
Author-email: Josh Wulf <josh.wulf@camunda.com>
Requires-Python: >=3.10
Description-Content-Type: text/markdown
Requires-Dist: httpx>=0.28.1
Requires-Dist: attrs>=21.3.0
Requires-Dist: pydantic>=2
Requires-Dist: python-dateutil>=2.8.2
Requires-Dist: loguru>=0.7.2
Requires-Dist: python-dotenv>=1.0.0

# Camunda Orchestration Cluster API – Python SDK

<!-- WARNING: The content and specific structure of this file drives Docusaurus generation in camunda-docs. Also, code examples are injected during build. Please refer to MAINTAINER.md before editing. -->
<!-- docs:cut:start -->
[![PyPI - Version](https://img.shields.io/pypi/v/camunda-orchestration-sdk)](https://pypi.org/project/camunda-orchestration-sdk/)
[![Documentation](https://img.shields.io/badge/docs-API%20Reference-blue)](https://camunda.github.io/orchestration-cluster-api-python/)
<!-- docs:cut:end -->

A fully typed Python client for the [Camunda 8 Orchestration Cluster REST API](https://docs.camunda.io/docs/apis-tools/camunda-api-rest/camunda-api-rest-overview/). Fully compliant with the Camunda OpenAPI spec with hand-written runtime infrastructure for authentication, configuration, and job workers.

- **Sync and async** — `CamundaClient` (synchronous) and `CamundaAsyncClient` (async/await)
- **Strict typing** — pyright-strict compatible with PEP 561 `py.typed` marker
- **Zero-config** — reads `CAMUNDA_*` environment variables (12-factor style)
- **Job workers** — long-poll workers with thread, process, or async execution strategies
- **OAuth & Basic auth** — pluggable authentication with automatic token management
- **Pluggable logging** — inject your own logger (stdlib `logging`, loguru, or custom)

## Installing the SDK to your project

### Requirements

- Python 3.10 or later

### Stable release (recommended for production)

The stable version tracks the latest supported Camunda server release. The first stable release will be **8.9.0**.

```bash
pip install camunda-orchestration-sdk
```

### Pre-release / dev channel

Pre-release versions (e.g. `8.9.0.dev2`) are published from the `main` branch and contain the latest changes targeting the next server minor version. Use these to preview upcoming features or validate your integration ahead of a stable release.

```bash
# pip
pip install --pre camunda-orchestration-sdk

# pin to a specific pre-release
pip install camunda-orchestration-sdk==8.9.0.dev2
```

In a `requirements.txt`:

```text
camunda-orchestration-sdk>=8.9.0.dev1
```

> **Note:** Pre-release versions may contain breaking changes between builds. Pin to a specific version if you need reproducible builds.

### Versioning

This SDK has a different release cadence from the Camunda server. Features and fixes land in the SDK during a server release. 

The major version of the SDK signals a 1:1 type coherence with the server API for a Camunda minor release. 

SDK version `n.y.z` -> server version `8.n`, so the type surface of SDK version 9.y.z matches the API surface of Camunda 8.9.

Using a later SDK version, for example: SDK version 10.y.z with Camunda 8.9, means that the SDK contains additive surfaces that are not guaranteed at runtime, and the compiler cannot warn of unsupported operations. 

Using an earlier SDK version, for example: SDK version 9.y.z with Camunda 8.10, results in slightly degraded compiler reasoning: exhaustiveness checks cannot be guaranteed by the compiler for any extended surfaces (principally, enums with added members). 

In the vast majority of use-cases, this will not be an issue; but you should be aware that using the matching SDK major version for the server minor version provides the strongest compiler guarantees about runtime reliability. 

**Recommended approach**:

- Check the [CHANGELOG](https://github.com/camunda/orchestration-cluster-api-python/releases).
- As a sanity check during server version upgrade, rebuild applications with the matching SDK major version to identify any affected runtime surfaces.

## Using the SDK

The SDK provides two clients with identical API surfaces:

- **`CamundaClient`** — synchronous. Every method blocks until the response arrives. Use this in scripts, CLI tools, Django views, Flask handlers, or anywhere you don't have an async event loop.
- **`CamundaAsyncClient`** — asynchronous (`async`/`await`). Every method is a coroutine. Use this in FastAPI, aiohttp, or any `asyncio`-based application. **Job workers require `CamundaAsyncClient`** because they use `asyncio` for long-polling and concurrent job execution.

Both clients share the same method names and parameters — the only difference is calling convention:

<!-- snippet-source: examples/readme.py | regions: ReadmeSyncClient -->
```python
# Sync
from camunda_orchestration_sdk import CamundaClient

with CamundaClient() as client:
    topology = client.get_topology()
```

<!-- snippet-source: examples/readme.py | regions: ReadmeAsyncClient -->
```python
# Async
import asyncio

from camunda_orchestration_sdk import CamundaAsyncClient

async def main() -> None:
    async with CamundaAsyncClient() as client:
        topology = await client.get_topology()

asyncio.run(main())
```

> **Which one should I use?** If your application already uses `asyncio` (FastAPI, aiohttp, etc.) or you need job workers, use `CamundaAsyncClient`. Otherwise, `CamundaClient` is simpler and works everywhere.


## Semantic Types

The SDK uses Python `NewType` wrappers for identifiers like `ProcessDefinitionKey`, `ProcessInstanceKey`, `JobKey`, `TenantId`, etc. These are defined in `camunda_orchestration_sdk.semantic_types` and re-exported from the top-level package.

### Why they exist

Camunda's API has many operations that accept string keys — process definition keys, process instance keys, incident keys, job keys, and so on. Without semantic types, it is easy to accidentally pass a process instance key where a process definition key is expected, or mix up a job key with an incident key. The type checker cannot help you if everything is `str`.

Semantic types make these identifiers **distinct at the type level**. Pyright (and other type checkers) will flag an error if you pass a `ProcessInstanceKey` where a `ProcessDefinitionKey` is expected, catching bugs before runtime.

### How to use them

Treat semantic types as **opaque identifiers** — receive them from API responses and pass them to subsequent API calls without inspecting or transforming the underlying value:

<!-- snippet-source: examples/readme.py | regions: ReadmeSemanticTypes -->
```python
from camunda_orchestration_sdk import CamundaClient, ProcessCreationByKey

client = CamundaClient()

# Deploy → the response already carries typed keys
deployment = client.deploy_resources_from_files(["process.bpmn"])
process_key = deployment.processes[0].process_definition_key  # ProcessDefinitionKey

# Pass it directly to another call — no conversion needed
result = client.create_process_instance(
    data=ProcessCreationByKey(process_definition_key=process_key)
)

# The result also carries typed keys
instance_key = result.process_instance_key  # ProcessInstanceKey
client.cancel_process_instance(process_instance_key=instance_key)
```

### Serialising in and out of the type system

Semantic types are `NewType` wrappers over `str`, so they serialise transparently:

<!-- snippet-exempt: uses hypothetical db.save/db.load pseudo-code -->
```python
from camunda_orchestration_sdk import ProcessDefinitionKey, ProcessInstanceKey

# --- Serialising out (to storage / JSON / message queue) ---
# A semantic type IS a str at runtime, so str()/json.dumps()/ORM columns just work:
process_key: ProcessDefinitionKey = deployment.processes[0].process_definition_key
db.save("process_key", process_key)   # stores the raw string
json.dumps({"key": process_key})      # "2251799813685249"

# --- Deserialising in (from storage / external input) ---
# Wrap the raw string with the type constructor:
raw = db.load("process_key")           # returns a plain str
typed_key = ProcessDefinitionKey(raw)  # re-enters the type system

result = client.create_process_instance(
    data=ProcessCreationByKey(process_definition_key=typed_key)
)
```

The available semantic types include: `ProcessDefinitionKey`, `ProcessDefinitionId`, `ProcessInstanceKey`, `JobKey`, `IncidentKey`, `DecisionDefinitionKey`, `DecisionDefinitionId`, `DeploymentKey`, `UserTaskKey`, `MessageKey`, `SignalKey`, `TenantId`, `ElementId`, `FormKey`, and others. All are importable from `camunda_orchestration_sdk` or `camunda_orchestration_sdk.semantic_types`.

## Quick start (Zero-config – recommended)

Keep configuration out of application code. Let the client read `CAMUNDA_*` variables from the environment (12-factor style). This makes secret rotation, environment promotion (dev → staging → prod), and operational tooling (vaults / secret managers) safer and simpler.

If no configuration is present, the SDK defaults to a local Camunda 8 Run-style endpoint at `http://localhost:8080/v2`.

<!-- snippet-source: examples/readme.py | regions: ReadmeZeroConfig -->
```python
from camunda_orchestration_sdk import CamundaAsyncClient, CamundaClient

# Zero-config construction: reads CAMUNDA_* from the environment
client = CamundaClient()
async_client = CamundaAsyncClient()
```

Typical `.env` (example):

```bash
CAMUNDA_REST_ADDRESS=https://cluster.example/v2
CAMUNDA_AUTH_STRATEGY=OAUTH
CAMUNDA_CLIENT_ID=***
CAMUNDA_CLIENT_SECRET=***
```

#### Loading configuration from a `.env` file (`CAMUNDA_LOAD_ENVFILE`)

The SDK can optionally load configuration values from a dotenv file.

- Set `CAMUNDA_LOAD_ENVFILE=true` (or `1` / `yes`) to load `.env` from the current working directory.
- Set `CAMUNDA_LOAD_ENVFILE=/path/to/file.env` to load from an explicit path.
- If the file does not exist, it is silently ignored.
- Precedence is: `.env` < environment variables < explicit `configuration={...}` passed to the client.
- The resolver reads dotenv values without mutating `os.environ`.

Example `.env`:

```bash
CAMUNDA_REST_ADDRESS=http://localhost:8080/v2
CAMUNDA_CLIENT_ID=your-client-id
CAMUNDA_CLIENT_SECRET=your-client-secret
```

Enable loading from the current directory:

```bash
export CAMUNDA_LOAD_ENVFILE=true
python your_script.py
```

Or enable loading from a specific file:

```bash
export CAMUNDA_LOAD_ENVFILE=~/camunda/dev.env
python your_script.py
```

You can also enable it via the explicit configuration dict:

<!-- snippet-source: examples/readme.py | regions: ReadmeEnvFileLoading -->
```python
from camunda_orchestration_sdk import CamundaClient

client = CamundaClient(configuration={"CAMUNDA_LOAD_ENVFILE": "true"})
```

## Programmatic configuration (use sparingly)

Only use `configuration={...}` when you must supply or mutate configuration dynamically (e.g. tests, multi-tenant routing, or ephemeral preview environments). Keys mirror their `CAMUNDA_*` environment names.

<!-- snippet-source: examples/readme.py | regions: ReadmeProgrammaticConfig -->
```python
from camunda_orchestration_sdk import CamundaClient

client = CamundaClient(
    configuration={
        "CAMUNDA_REST_ADDRESS": "http://localhost:8080/v2",
        "CAMUNDA_AUTH_STRATEGY": "NONE",
    }
)
```

## Authentication

The SDK supports three authentication strategies, controlled by `CAMUNDA_AUTH_STRATEGY`:

| Strategy | When to use |
|----------|------------|
| `NONE`   | Local development with unauthenticated Camunda (default) |
| `OAUTH`  | Camunda SaaS or any OAuth 2.0 Client Credentials endpoint |
| `BASIC`  | Self-Managed Camunda with Basic auth (username/password) |

### Auto-detection

If you omit `CAMUNDA_AUTH_STRATEGY`, the SDK infers it from the credentials you provide:

- Only `CAMUNDA_CLIENT_ID` + `CAMUNDA_CLIENT_SECRET` → **OAUTH**
- Only `CAMUNDA_BASIC_AUTH_USERNAME` + `CAMUNDA_BASIC_AUTH_PASSWORD` → **BASIC**
- No credentials → **NONE**
- Both OAuth and Basic credentials present → **error** (set `CAMUNDA_AUTH_STRATEGY` explicitly)

### OAuth 2.0

```bash
CAMUNDA_REST_ADDRESS=https://cluster.example/v2
CAMUNDA_AUTH_STRATEGY=OAUTH
CAMUNDA_CLIENT_ID=your-client-id
CAMUNDA_CLIENT_SECRET=your-client-secret
# Optional:
# CAMUNDA_OAUTH_URL=https://login.cloud.camunda.io/oauth/token
# CAMUNDA_TOKEN_AUDIENCE=zeebe.camunda.io
```

### Basic authentication

```bash
CAMUNDA_REST_ADDRESS=http://localhost:8080/v2
CAMUNDA_AUTH_STRATEGY=BASIC
CAMUNDA_BASIC_AUTH_USERNAME=your-username
CAMUNDA_BASIC_AUTH_PASSWORD=your-password
```

Or programmatically:

<!-- snippet-source: examples/readme.py | regions: ReadmeBasicAuth -->
```python
from camunda_orchestration_sdk import CamundaClient

client = CamundaClient(
    configuration={
        "CAMUNDA_REST_ADDRESS": "http://localhost:8080/v2",
        "CAMUNDA_AUTH_STRATEGY": "BASIC",
        "CAMUNDA_BASIC_AUTH_USERNAME": "your-username",
        "CAMUNDA_BASIC_AUTH_PASSWORD": "your-password",
    }
)
```

## Deploying Resources

Deploy BPMN, DMN, or Form files from disk:

<!-- snippet-source: examples/readme.py | regions: ReadmeDeployResources -->
```python
from camunda_orchestration_sdk import CamundaClient

with CamundaClient() as client:
    result = client.deploy_resources_from_files(["process.bpmn", "decision.dmn"])

    print(f"Deployment key: {result.deployment_key}")
    for process in result.processes:
        print(f"  Process: {process.process_definition_id} (key: {process.process_definition_key})")
```

## Creating a Process Instance

The recommended pattern is to obtain keys from a prior API response (e.g. a deployment) and pass them directly — no manual lifting needed:

<!-- snippet-source: examples/readme.py | regions: ReadmeCreateProcessInstance -->
```python
from camunda_orchestration_sdk import CamundaClient, ProcessCreationByKey

with CamundaClient() as client:
    # Deploy and capture the typed key
    deployment = client.deploy_resources_from_files(["process.bpmn"])
    process_key = deployment.processes[0].process_definition_key

    # Use it directly — the type flows through without conversion
    result = client.create_process_instance(
        data=ProcessCreationByKey(process_definition_key=process_key)
    )
    print(f"Process instance key: {result.process_instance_key}")
```

If you need to restore a key from external storage (database, message queue, config file), wrap the raw string with the semantic type constructor:

<!-- snippet-source: examples/readme.py | regions: ReadmeCreateFromStorage -->
```python
from camunda_orchestration_sdk import CamundaClient, ProcessCreationByKey, ProcessDefinitionKey

with CamundaClient() as client:
    stored_key = "2251799813685249"  # from a DB row or config
    result = client.create_process_instance(
        data=ProcessCreationByKey(process_definition_key=ProcessDefinitionKey(stored_key))
    )
    print(f"Process instance key: {result.process_instance_key}")
```

## Job Workers

Job workers long-poll for available jobs, execute a callback, and automatically complete or fail the job based on the return value. Workers are available on `CamundaAsyncClient`.

Handlers receive a context object that includes a `client` reference, so your handler can make API calls during job execution. The context type depends on the execution strategy:

- **Async handlers** → `ConnectedJobContext` with `client: CamundaAsyncClient` (use `await`)
- **Thread handlers** → `SyncJobContext` with `client: CamundaClient` (call directly)
- **Process handlers** → plain `JobContext` (no client — cannot be pickled across process boundaries)

<!-- snippet-source: examples/readme.py | regions: ReadmeJobWorker -->
```python
import asyncio

from camunda_orchestration_sdk import CamundaAsyncClient, ConnectedJobContext, WorkerConfig

async def handle_job(job_context: ConnectedJobContext) -> dict[str, object]:
    variables = job_context.variables.to_dict()
    job_context.log.info(f"Processing job {job_context.job_key}: {variables}")
    return {"result": "processed"}

async def main() -> None:
    async with CamundaAsyncClient() as client:
        config = WorkerConfig(
            job_type="my-service-task",
            job_timeout_milliseconds=30_000,
        )
        client.create_job_worker(config=config, callback=handle_job)

        # Keep workers running until cancelled
        await client.run_workers()

asyncio.run(main())
```

### Using the Client in a Job Handler

Because `ConnectedJobContext` and `SyncJobContext` include a `client` reference, your handler can make API calls during job execution — for example, publishing a message to trigger another part of the process.

**Async handlers** (`execution_strategy="async"`) — `await` the client method directly:

<!-- snippet-source: examples/readme.py | regions: ReadmeAsyncHandler -->
```python
from camunda_orchestration_sdk import ConnectedJobContext, MessagePublicationRequest, MessagePublicationRequestVariables

async def handle_order(job: ConnectedJobContext) -> dict[str, object]:
    variables = job.variables.to_dict()
    order_id = variables["orderId"]

    await job.client.publish_message(
        data=MessagePublicationRequest(
            name="order-processed",
            correlation_key=order_id,
            time_to_live=60000,
            variables=MessagePublicationRequestVariables.from_dict({"orderId": order_id, "status": "completed"}),
        )
    )

    job.log.info(f"Published order-processed message for order {order_id}")
    return {"status": "done"}
```

**Sync (thread) handlers** (`execution_strategy="thread"`) — `job.client` is a sync `CamundaClient`, so call methods directly:

<!-- snippet-source: examples/readme.py | regions: ReadmeSyncHandler -->
```python
from camunda_orchestration_sdk import MessagePublicationRequest, MessagePublicationRequestVariables, SyncJobContext

def handle_order(job: SyncJobContext) -> dict[str, object]:
    variables = job.variables.to_dict()
    order_id = variables["orderId"]

    job.client.publish_message(
        data=MessagePublicationRequest(
            name="order-processed",
            correlation_key=order_id,
            time_to_live=60000,
            variables=MessagePublicationRequestVariables.from_dict({"orderId": order_id, "status": "completed"}),
        )
    )

    job.log.info(f"Published order-processed message for order {order_id}")
    return {"status": "done"}
```

> **Note:** The SDK automatically provides the right client type for each strategy — async handlers get `CamundaAsyncClient` (use `await`), thread handlers get `CamundaClient` (call directly). You don't need to create or manage these clients yourself.

### Job Logger

Each `JobContext` exposes a `log` property — a scoped logger automatically bound with the job's context (job type, worker name, and job key). Use it inside your handler for structured, per-job log output:

<!-- snippet-source: examples/readme.py | regions: ReadmeJobLogger -->
```python
async def handler(job: ConnectedJobContext) -> dict[str, object]:
    job.log.info(f"Starting work on {job.job_key}")
    # ... do work ...
    job.log.debug("Work completed successfully")
    return {"done": True}
```

The job logger inherits the SDK's logger configuration (loguru by default, or whatever you passed via `logger=`). If you injected a custom logger into the client, job handlers will use a child of that same logger.

> **Note:** When using the `"process"` execution strategy, the job logger silently degrades to a no-op (`NullLogger`) because loggers cannot be pickled across process boundaries. The worker's main-process logger still records all job lifecycle events (activation, completion, failure, errors). If you need per-job logging from a process-isolated handler, configure a logger inside the handler itself.

### Execution Strategies

Job workers support multiple execution strategies to match your workload type. Pass `execution_strategy` as a keyword argument to `create_job_worker`, or let the SDK auto-detect.

| Strategy | How it runs your handler | Context type | Best for |
|----------|--------------------------|--------------|----------|
| `"auto"` (default) | Auto-detects: `"async"` for `async def` handlers, `"thread"` for sync handlers | `ConnectedJobContext` or `SyncJobContext` | Most use cases — sensible defaults without configuration |
| `"async"` | Runs on the main `asyncio` event loop | `ConnectedJobContext` (async client) | I/O-bound async work (HTTP calls, database queries). Best throughput for handlers that call remote systems over HTTP |
| `"thread"` | Runs in a `ThreadPoolExecutor` | `SyncJobContext` (sync client) | CPU-bound work, blocking I/O (file system, synchronous HTTP libraries) |
| `"process"` | Runs in a `ProcessPoolExecutor` | `JobContext` (no client) | Heavy CPU-bound work that needs to escape the GIL (image processing, ML inference) |

> **Choosing between `"async"` and `"thread"`:** If your job handler makes HTTP calls to remote systems (APIs, databases, microservices), `"async"` delivers the best performance — it can multiplex many concurrent jobs on a single thread without blocking. Use `"thread"` when your handler performs CPU-bound computation or calls synchronous libraries that would block the event loop.

**Auto-detection logic:** If your handler is an `async def`, the strategy defaults to `"async"`. If it's a regular `def`, the strategy defaults to `"thread"`. You can override this explicitly:

<!-- snippet-source: examples/readme.py | regions: ReadmeExecutionStrategies -->
```python
from camunda_orchestration_sdk import SyncJobContext, JobContext

# Force thread pool for a sync handler (receives SyncJobContext)
def io_handler(job: SyncJobContext) -> dict[str, object]:
    return {"done": True}

client.create_job_worker(
    config=WorkerConfig(job_type="io-bound-task", job_timeout_milliseconds=30_000),
    callback=io_handler,
    execution_strategy="thread",
)

# Force process pool for CPU-heavy work (receives plain JobContext)
def cpu_handler(job: JobContext) -> dict[str, object]:
    return {"computed": True}

client.create_job_worker(
    config=WorkerConfig(job_type="image-processing", job_timeout_milliseconds=120_000),
    callback=cpu_handler,
    execution_strategy="process",
)
```

**Process strategy caveats:** The `"process"` strategy serialises (pickles) your handler and its context to send them to a worker process. Because the SDK client cannot be pickled, handlers running under this strategy receive a plain `JobContext` (without a `client` attribute) instead of `ConnectedJobContext`/`SyncJobContext`. This means:

- Your handler function and its closure must be picklable (top-level functions work; lambdas and closures over unpicklable objects do not).
- Your handler must accept `JobContext`, not `ConnectedJobContext` or `SyncJobContext` — the type checker enforces this via overloaded signatures on `create_job_worker`.
- `job.log` degrades to a silent no-op logger in the child process (see [Job Logger](#job-logger)).
- There is additional overhead per job from serialisation and inter-process communication.

### Worker Configuration

`WorkerConfig` supports:

| Parameter | Default | Description |
|-----------|---------|-------------|
| `job_type` | *(required)* | The BPMN service task type to poll for |
| `job_timeout_milliseconds` | env / *(required)* | How long the worker has to complete the job |
| `request_timeout_milliseconds` | env / `0` | Long-poll request timeout (0 = server default) |
| `max_concurrent_jobs` | env / `10` | Maximum jobs executing concurrently |
| `fetch_variables` | `None` | List of variable names to fetch (None = all) |
| `worker_name` | env / `"camunda-python-sdk-worker"` | Identifier for this worker in Camunda |

The following are keyword-only arguments on `create_job_worker`, not part of `WorkerConfig`:

| Parameter | Default | Description |
|-----------|---------|-------------|
| `execution_strategy` | `"auto"` | `"auto"`, `"async"`, `"thread"`, or `"process"`. Controls how the handler is invoked and which context type it receives. |
| `startup_jitter_max_seconds` | env / `0` | Maximum random delay (in seconds) before the worker starts polling. When multiple application instances restart simultaneously, this spreads out initial activation requests to avoid saturating the server. A value of `0` (the default) means no delay. |

#### Heritable Worker Defaults

Worker configuration fields marked "env" in the table above can be set globally via environment variables or the client constructor. Individual `WorkerConfig` values take precedence.

| Environment variable | Maps to |
|---|---|
| `CAMUNDA_WORKER_TIMEOUT` | `job_timeout_milliseconds` |
| `CAMUNDA_WORKER_MAX_CONCURRENT_JOBS` | `max_concurrent_jobs` |
| `CAMUNDA_WORKER_REQUEST_TIMEOUT` | `request_timeout_milliseconds` |
| `CAMUNDA_WORKER_NAME` | `worker_name` |
| `CAMUNDA_WORKER_STARTUP_JITTER_MAX_SECONDS` | `startup_jitter_max_seconds` |

**Precedence:** explicit `WorkerConfig` value > environment variable / client constructor > hardcoded default.

Example — set defaults via environment variables:

```bash
export CAMUNDA_WORKER_TIMEOUT=30000
export CAMUNDA_WORKER_MAX_CONCURRENT_JOBS=32
```

<!-- snippet-source: examples/readme.py | regions: ReadmeWorkerDefaultsEnv -->
```python
# No need to set job_timeout_milliseconds on every worker — inherited from env
client.create_job_worker(
    config=WorkerConfig(job_type="payment-service"),
    callback=handle_payment,
)
client.create_job_worker(
    config=WorkerConfig(job_type="notification-service"),
    callback=handle_notification,
)
```

Example — set defaults via client constructor:

<!-- snippet-source: examples/readme.py | regions: ReadmeWorkerDefaultsClient -->
```python
client = CamundaAsyncClient(configuration={
    "CAMUNDA_WORKER_TIMEOUT": "30000",
    "CAMUNDA_WORKER_MAX_CONCURRENT_JOBS": "16",
    "CAMUNDA_WORKER_NAME": "my-app",
})

# Both workers inherit timeout, concurrency, and name
client.create_job_worker(
    config=WorkerConfig(job_type="payment-service"),
    callback=handle_payment,
)
client.create_job_worker(
    config=WorkerConfig(job_type="shipping-service"),
    callback=handle_shipping,
)
```

### Failing a Job

To explicitly fail a job with a custom error message, retry count, and backoff, raise `JobFailure` in your handler:

<!-- snippet-source: examples/readme.py | regions: ReadmeFailJob -->
```python
from camunda_orchestration_sdk import ConnectedJobContext, JobFailure

async def handle_job(job: ConnectedJobContext) -> dict[str, object]:
    if not job.variables.to_dict().get("required_field"):
        raise JobFailure(
            message="Missing required field",
            retries=2,
            retry_back_off=5000,  # milliseconds
        )
    return {"result": "ok"}
```

| Parameter | Default | Description |
|-----------|---------|-------------|
| `message` | *(required)* | Error message attached to the failure |
| `retries` | `None` | Remaining retries. `None` decrements the current retry count by 1 |
| `retry_back_off` | `0` | Backoff before the next retry, in milliseconds |

If an unhandled exception escapes your handler, the job is automatically failed with the exception message and the retry count decremented by 1.

### Throwing a BPMN Error

To throw a [BPMN error](https://docs.camunda.io/docs/components/modeler/bpmn/error-events/) from a job handler — for example, to trigger an error boundary event — raise `JobError`:

<!-- snippet-source: examples/readme.py | regions: ReadmeBpmnError -->
```python
from camunda_orchestration_sdk import ConnectedJobContext, JobError

async def handle_payment(job: ConnectedJobContext) -> dict[str, object]:
    variables = job.variables.to_dict()
    if variables.get("amount", 0) > 10_000:
        raise JobError(error_code="AMOUNT_TOO_HIGH", message="Payment exceeds limit")
    return {"status": "approved"}
```

| Parameter | Default | Description |
|-----------|---------|-------------|
| `error_code` | *(required)* | The error code that is matched against BPMN error catch events |
| `message` | `""` | An optional error message for logging/diagnostics |

The `error_code` must match the error code defined on a BPMN error catch event in your process model. If no catch event matches, the job becomes an incident.

### Job Corrections (User Task Listeners)

When a job worker handles a [user task listener](https://docs.camunda.io/docs/components/concepts/user-task-listeners/), it can correct task properties (assignee, due date, candidate groups, etc.) as part of the completion. Return a `JobCompletionRequest` with a `result` containing `JobResultCorrections`:

<!-- snippet-source: examples/readme.py | regions: ReadmeJobCorrections -->
```python
from camunda_orchestration_sdk import ConnectedJobContext
from camunda_orchestration_sdk.models import (
    JobCompletionRequest,
    JobResultUserTask,
    JobResultCorrections,
)

async def validate_task(job: ConnectedJobContext) -> JobCompletionRequest:
    return JobCompletionRequest(
        result=JobResultUserTask(
            type_="userTask",
            corrections=JobResultCorrections(
                assignee="corrected-user",
                priority=80,
            ),
        ),
    )
```

To deny a task completion (reject the work), set `denied=True`:

<!-- snippet-source: examples/readme.py | regions: ReadmeJobCorrectionsDenied -->
```python
async def review_task(job: ConnectedJobContext) -> JobCompletionRequest:
    return JobCompletionRequest(
        result=JobResultUserTask(
            type_="userTask",
            denied=True,
            denied_reason="Insufficient documentation",
        ),
    )
```

| Correctable attribute | Type | Clear value |
|---|---|---|
| `assignee` | `str` | Empty string `""` |
| `due_date` | `datetime` | Empty string `""` |
| `follow_up_date` | `datetime` | Empty string `""` |
| `candidate_users` | `list[str]` | Empty list `[]` |
| `candidate_groups` | `list[str]` | Empty list `[]` |
| `priority` | `int` (0–100) | — |

Omitting an attribute or passing `None` preserves the persisted value. This works with all handler types (async, thread, and process).

## Error Handling

The SDK raises typed exceptions for API errors. Each HTTP error status code has a corresponding exception class (e.g. `BadRequestError` for 400, `NotFoundError` for 404). Every exception carries the `operation_id` of the method that raised it:

<!-- snippet-source: examples/readme.py | regions: ReadmeErrorHandling -->
```python
from camunda_orchestration_sdk import CamundaClient, ProcessCreationByKey, ProcessDefinitionKey
from camunda_orchestration_sdk.errors import BadRequestError

with CamundaClient() as client:
    try:
        result = client.create_process_instance(
            data=ProcessCreationByKey(process_definition_key=ProcessDefinitionKey("nonexistent"))
        )
    except BadRequestError as e:
        print(f"Bad request ({e.operation_id}): {e}")
```

## Logging

By default the SDK logs via [loguru](https://github.com/Delgan/loguru). You can inject any logger that exposes `debug`, `info`, `warning`, and `error` methods — including Python's built-in `logging.Logger`.

### Using the default logger (loguru)

No configuration needed. Control verbosity with `CAMUNDA_SDK_LOG_LEVEL` or loguru's own `LOGURU_LEVEL` environment variable:

```bash
CAMUNDA_SDK_LOG_LEVEL=debug python your_script.py
```

### Injecting a custom logger

Pass a `logger=` argument to `CamundaClient` or `CamundaAsyncClient`. The logger is forwarded to all internal components (auth providers, HTTP hooks, job workers).

**stdlib `logging`:**

<!-- snippet-source: examples/readme.py | regions: ReadmeStdlibLogger -->
```python
import logging

from camunda_orchestration_sdk import CamundaClient

my_logger = logging.getLogger("my_app.camunda")
my_logger.setLevel(logging.DEBUG)

client = CamundaClient(logger=my_logger)
```

**Custom logger object:**

<!-- snippet-source: examples/readme.py | regions: ReadmeCustomLogger -->
```python
from camunda_orchestration_sdk import CamundaClient

class MyLogger:
    def debug(self, msg: object, *args: object, **kwargs: object) -> None:
        print(f"[DEBUG] {msg}")

    def info(self, msg: object, *args: object, **kwargs: object) -> None:
        print(f"[INFO] {msg}")

    def warning(self, msg: object, *args: object, **kwargs: object) -> None:
        print(f"[WARN] {msg}")

    def error(self, msg: object, *args: object, **kwargs: object) -> None:
        print(f"[ERROR] {msg}")

client = CamundaClient(logger=MyLogger())
```

### Disabling logging

Pass an instance of `NullLogger` to silence all SDK output:

<!-- snippet-source: examples/readme.py | regions: ReadmeDisableLogging -->
```python
from camunda_orchestration_sdk import CamundaClient, NullLogger

client = CamundaClient(logger=NullLogger())
```

## Backpressure

The SDK includes built-in adaptive backpressure management that protects the Camunda cluster from overload. When the cluster returns backpressure signals (HTTP 429, 503, or `RESOURCE_EXHAUSTED`), the SDK automatically reduces outbound concurrency. When conditions improve, it gradually recovers — returning to full throughput with no manual intervention.

This is enabled by default with the `BALANCED` profile and requires no configuration. Operations that drain work from the cluster (completing jobs, failing jobs) are never throttled.

| Profile | Behavior |
|---------|----------|
| `BALANCED` (default) | Adaptive concurrency gating with AIMD-style permit management and exponential backoff at floor. |
| `LEGACY` | Observe-only — records severity but never gates or queues requests. |

Set the profile via the `CAMUNDA_SDK_BACKPRESSURE_PROFILE` environment variable.

For detailed algorithm documentation, see [docs/backpressure.md](docs/backpressure.md).

## Configuration reference

All `CAMUNDA_*` environment variables recognised by the SDK. These can also be passed as keys in the `configuration={...}` dict.

<!-- BEGIN_CONFIG_REFERENCE -->

| Variable | Default | Description |
|----------|---------|-------------|
| `ZEEBE_REST_ADDRESS` | `http://localhost:8080/v2` | REST API base URL (alias for CAMUNDA_REST_ADDRESS). |
| `CAMUNDA_REST_ADDRESS` | `http://localhost:8080/v2` | REST API base URL. `/v2` is appended automatically if missing. |
| `CAMUNDA_TOKEN_AUDIENCE` | `zeebe.camunda.io` | OAuth token audience. |
| `CAMUNDA_OAUTH_URL` | `https://login.cloud.camunda.io/oauth/token` | OAuth token endpoint URL. |
| `CAMUNDA_CLIENT_ID` | — | OAuth client ID. |
| `CAMUNDA_CLIENT_SECRET` | — | OAuth client secret. |
| `CAMUNDA_CLIENT_AUTH_CLIENTID` | — | Alias for CAMUNDA_CLIENT_ID. |
| `CAMUNDA_CLIENT_AUTH_CLIENTSECRET` | — | Alias for CAMUNDA_CLIENT_SECRET. |
| `CAMUNDA_AUTH_STRATEGY` | `NONE` | Authentication strategy: NONE, OAUTH, or BASIC. Auto-inferred from credentials if omitted. |
| `CAMUNDA_BASIC_AUTH_USERNAME` | — | Basic auth username. Required when CAMUNDA_AUTH_STRATEGY=BASIC. |
| `CAMUNDA_BASIC_AUTH_PASSWORD` | — | Basic auth password. Required when CAMUNDA_AUTH_STRATEGY=BASIC. |
| `CAMUNDA_SDK_LOG_LEVEL` | `error` | SDK log level: silent, error, warn, info, debug, trace, or silly. |
| `CAMUNDA_TOKEN_CACHE_DIR` | — | Directory for OAuth token disk cache. Disabled if unset. |
| `CAMUNDA_TOKEN_DISK_CACHE_DISABLE` | `false` | Disable OAuth token disk caching. |
| `CAMUNDA_SDK_BACKPRESSURE_PROFILE` | `BALANCED` | Backpressure profile: BALANCED (adaptive gating, default) or LEGACY (observe-only, no gating). |
| `CAMUNDA_TENANT_ID` | — | Default tenant ID applied to all operations that accept a tenant_id parameter. |
| `CAMUNDA_WORKER_TIMEOUT` | — | Default job timeout in milliseconds for all workers. |
| `CAMUNDA_WORKER_MAX_CONCURRENT_JOBS` | — | Default maximum concurrent jobs per worker. |
| `CAMUNDA_WORKER_REQUEST_TIMEOUT` | — | Default long-poll request timeout in milliseconds for all workers. |
| `CAMUNDA_WORKER_NAME` | — | Default worker name for all workers. |
| `CAMUNDA_WORKER_STARTUP_JITTER_MAX_SECONDS` | — | Default maximum startup jitter in seconds for all workers. |
| `CAMUNDA_LOAD_ENVFILE` | — | Load configuration from a `.env` file. Set to `true` (or a file path). |

<!-- END_CONFIG_REFERENCE -->


<!-- docs:cut:start -->
## Contributing

See [CONTRIBUTING.md](CONTRIBUTING.md) for development setup and generation workflow. See [MAINTAINER.md](MAINTAINER.md) for architecture and pipeline documentation.

## License

Apache-2.0
<!-- docs:cut:end -->


