Metadata-Version: 2.4
Name: camunda-orchestration-sdk
Version: 8.9.0.dev16
Summary: Python client for Camunda 8 Orchestration Cluster API
Author-email: Josh Wulf <josh.wulf@camunda.com>
Requires-Python: >=3.10
Description-Content-Type: text/markdown
Requires-Dist: httpx>=0.28.1
Requires-Dist: attrs>=21.3.0
Requires-Dist: pydantic>=2
Requires-Dist: python-dateutil>=2.8.2
Requires-Dist: typing-extensions>=4.7.1
Requires-Dist: loguru>=0.7.2
Requires-Dist: python-dotenv>=1.0.0

# Camunda Orchestration Cluster API – Python SDK

<!-- WARNING: The content and specific structure of this file drives Docusaurus generation in camunda-docs. Please refer to MAINTAINER.md before editing. -->
<!-- docs:cut:start -->
[![PyPI - Version](https://img.shields.io/pypi/v/camunda-orchestration-sdk)](https://pypi.org/project/camunda-orchestration-sdk/)
[![Documentation](https://img.shields.io/badge/docs-API%20Reference-blue)](https://camunda.github.io/orchestration-cluster-api-python/)
<!-- docs:cut:end -->

A fully typed Python client for the [Camunda 8 Orchestration Cluster REST API](https://docs.camunda.io/docs/apis-tools/camunda-api-rest/camunda-api-rest-overview/). Fully compliant with the Camunda OpenAPI spec with hand-written runtime infrastructure for authentication, configuration, and job workers.

- **Sync and async** — `CamundaClient` (synchronous) and `CamundaAsyncClient` (async/await)
- **Strict typing** — pyright-strict compatible with PEP 561 `py.typed` marker
- **Zero-config** — reads `CAMUNDA_*` environment variables (12-factor style)
- **Job workers** — long-poll workers with thread, process, or async execution strategies
- **OAuth & Basic auth** — pluggable authentication with automatic token management
- **Pluggable logging** — inject your own logger (stdlib `logging`, loguru, or custom)

## Installing the SDK to your project

### Requirements

- Python 3.10 or later

### Stable release (recommended for production)

The stable version tracks the latest supported Camunda server release. The first stable release will be **8.9.0**.

```bash
pip install camunda-orchestration-sdk
```

### Pre-release / dev channel

Pre-release versions (e.g. `8.9.0.dev2`) are published from the `main` branch and contain the latest changes targeting the next server minor version. Use these to preview upcoming features or validate your integration ahead of a stable release.

```bash
# pip
pip install --pre camunda-orchestration-sdk

# pin to a specific pre-release
pip install camunda-orchestration-sdk==8.9.0.dev2
```

In a `requirements.txt`:

```text
camunda-orchestration-sdk>=8.9.0.dev1
```

> **Note:** Pre-release versions may contain breaking changes between builds. Pin to a specific version if you need reproducible builds.

### Versioning

This SDK does **not** follow traditional semver. The **major.minor** version tracks the Camunda server version, so you can easily match the SDK to your deployment target (e.g. SDK `8.9.x` targets Camunda `8.9`).

**Patch releases** contain fixes, features, and occasionally **breaking type changes**. A breaking type change typically means an upstream API definition fix that corrects the shape of a request or response model — your code may stop type-checking even though it worked before.

When this happens, we signal it in the [CHANGELOG](https://github.com/camunda/orchestration-cluster-api-python/releases).

**Recommended approach:**

- **Ride the latest** — accept that types may shift and update your code when it happens. This keeps you on the most accurate API surface.
- **Pin and review** — pin to a specific patch version and review the [CHANGELOG](https://github.com/camunda/orchestration-cluster-api-python/releases) before upgrading:

  ```text
  camunda-orchestration-sdk==8.9.3
  ```

## Using the SDK

The SDK provides two clients with identical API surfaces:

- **`CamundaClient`** — synchronous. Every method blocks until the response arrives. Use this in scripts, CLI tools, Django views, Flask handlers, or anywhere you don't have an async event loop.
- **`CamundaAsyncClient`** — asynchronous (`async`/`await`). Every method is a coroutine. Use this in FastAPI, aiohttp, or any `asyncio`-based application. **Job workers require `CamundaAsyncClient`** because they use `asyncio` for long-polling and concurrent job execution.

Both clients share the same method names and parameters — the only difference is calling convention:

```python
# Sync
from camunda_orchestration_sdk import CamundaClient

with CamundaClient() as client:
    topology = client.get_topology()
```

```python
# Async
import asyncio
from camunda_orchestration_sdk import CamundaAsyncClient

async def main():
    async with CamundaAsyncClient() as client:
        topology = await client.get_topology()

asyncio.run(main())
```

> **Which one should I use?** If your application already uses `asyncio` (FastAPI, aiohttp, etc.) or you need job workers, use `CamundaAsyncClient`. Otherwise, `CamundaClient` is simpler and works everywhere.


## Semantic Types

The SDK uses Python `NewType` wrappers for identifiers like `ProcessDefinitionKey`, `ProcessInstanceKey`, `JobKey`, `TenantId`, etc. These are defined in `camunda_orchestration_sdk.semantic_types` and re-exported from the top-level package.

### Why they exist

Camunda's API has many operations that accept string keys — process definition keys, process instance keys, incident keys, job keys, and so on. Without semantic types, it is easy to accidentally pass a process instance key where a process definition key is expected, or mix up a job key with an incident key. The type checker cannot help you if everything is `str`.

Semantic types make these identifiers **distinct at the type level**. Pyright (and other type checkers) will flag an error if you pass a `ProcessInstanceKey` where a `ProcessDefinitionKey` is expected, catching bugs before runtime.

### How to use them

Treat semantic types as **opaque identifiers** — receive them from API responses and pass them to subsequent API calls without inspecting or transforming the underlying value:

```python
from camunda_orchestration_sdk import CamundaClient, ProcessCreationByKey

client = CamundaClient()

# Deploy → the response already carries typed keys
deployment = client.deploy_resources_from_files(["process.bpmn"])
process_key = deployment.processes[0].process_definition_key  # ProcessDefinitionKey

# Pass it directly to another call — no conversion needed
result = client.create_process_instance(
    data=ProcessCreationByKey(process_definition_key=process_key)
)

# The result also carries typed keys
instance_key = result.process_instance_key  # ProcessInstanceKey
client.cancel_process_instance(process_instance_key=instance_key)
```

### Serialising in and out of the type system

Semantic types are `NewType` wrappers over `str`, so they serialise transparently:

```python
from camunda_orchestration_sdk import ProcessDefinitionKey, ProcessInstanceKey

# --- Serialising out (to storage / JSON / message queue) ---
# A semantic type IS a str at runtime, so str()/json.dumps()/ORM columns just work:
process_key: ProcessDefinitionKey = deployment.processes[0].process_definition_key
db.save("process_key", process_key)   # stores the raw string
json.dumps({"key": process_key})      # "2251799813685249"

# --- Deserialising in (from storage / external input) ---
# Wrap the raw string with the type constructor:
raw = db.load("process_key")           # returns a plain str
typed_key = ProcessDefinitionKey(raw)  # re-enters the type system

result = client.create_process_instance(
    data=ProcessCreationByKey(process_definition_key=typed_key)
)
```

The available semantic types include: `ProcessDefinitionKey`, `ProcessDefinitionId`, `ProcessInstanceKey`, `JobKey`, `IncidentKey`, `DecisionDefinitionKey`, `DecisionDefinitionId`, `DeploymentKey`, `UserTaskKey`, `MessageKey`, `SignalKey`, `TenantId`, `ElementId`, `FormKey`, and others. All are importable from `camunda_orchestration_sdk` or `camunda_orchestration_sdk.semantic_types`.

## Quick start (Zero-config – recommended)

Keep configuration out of application code. Let the client read `CAMUNDA_*` variables from the environment (12-factor style). This makes secret rotation, environment promotion (dev → staging → prod), and operational tooling (vaults / secret managers) safer and simpler.

If no configuration is present, the SDK defaults to a local Camunda 8 Run-style endpoint at `http://localhost:8080/v2`.

```python
from camunda_orchestration_sdk import CamundaClient, CamundaAsyncClient

# Zero-config construction: reads CAMUNDA_* from the environment
client = CamundaClient()
async_client = CamundaAsyncClient()
```

Typical `.env` (example):

```bash
CAMUNDA_REST_ADDRESS=https://cluster.example/v2
CAMUNDA_AUTH_STRATEGY=OAUTH
CAMUNDA_CLIENT_ID=***
CAMUNDA_CLIENT_SECRET=***
```

#### Loading configuration from a `.env` file (`CAMUNDA_LOAD_ENVFILE`)

The SDK can optionally load configuration values from a dotenv file.

- Set `CAMUNDA_LOAD_ENVFILE=true` (or `1` / `yes`) to load `.env` from the current working directory.
- Set `CAMUNDA_LOAD_ENVFILE=/path/to/file.env` to load from an explicit path.
- If the file does not exist, it is silently ignored.
- Precedence is: `.env` < environment variables < explicit `configuration={...}` passed to the client.
- The resolver reads dotenv values without mutating `os.environ`.

Example `.env`:

```bash
CAMUNDA_REST_ADDRESS=http://localhost:8080/v2
CAMUNDA_CLIENT_ID=your-client-id
CAMUNDA_CLIENT_SECRET=your-client-secret
```

Enable loading from the current directory:

```bash
export CAMUNDA_LOAD_ENVFILE=true
python your_script.py
```

Or enable loading from a specific file:

```bash
export CAMUNDA_LOAD_ENVFILE=~/camunda/dev.env
python your_script.py
```

You can also enable it via the explicit configuration dict:

```python
from camunda_orchestration_sdk import CamundaClient

client = CamundaClient(configuration={"CAMUNDA_LOAD_ENVFILE": "true"})
```

## Programmatic configuration (use sparingly)

Only use `configuration={...}` when you must supply or mutate configuration dynamically (e.g. tests, multi-tenant routing, or ephemeral preview environments). Keys mirror their `CAMUNDA_*` environment names.

```python
from camunda_orchestration_sdk import CamundaClient

client = CamundaClient(
    configuration={
        "CAMUNDA_REST_ADDRESS": "http://localhost:8080/v2",
        "CAMUNDA_AUTH_STRATEGY": "NONE",
    }
)
```

## Authentication

The SDK supports three authentication strategies, controlled by `CAMUNDA_AUTH_STRATEGY`:

| Strategy | When to use |
|----------|------------|
| `NONE`   | Local development with unauthenticated Camunda (default) |
| `OAUTH`  | Camunda SaaS or any OAuth 2.0 Client Credentials endpoint |
| `BASIC`  | Self-Managed Camunda with Basic auth (username/password) |

### Auto-detection

If you omit `CAMUNDA_AUTH_STRATEGY`, the SDK infers it from the credentials you provide:

- Only `CAMUNDA_CLIENT_ID` + `CAMUNDA_CLIENT_SECRET` → **OAUTH**
- Only `CAMUNDA_BASIC_AUTH_USERNAME` + `CAMUNDA_BASIC_AUTH_PASSWORD` → **BASIC**
- No credentials → **NONE**
- Both OAuth and Basic credentials present → **error** (set `CAMUNDA_AUTH_STRATEGY` explicitly)

### OAuth 2.0

```bash
CAMUNDA_REST_ADDRESS=https://cluster.example/v2
CAMUNDA_AUTH_STRATEGY=OAUTH
CAMUNDA_CLIENT_ID=your-client-id
CAMUNDA_CLIENT_SECRET=your-client-secret
# Optional:
# CAMUNDA_OAUTH_URL=https://login.cloud.camunda.io/oauth/token
# CAMUNDA_TOKEN_AUDIENCE=zeebe.camunda.io
```

### Basic authentication

```bash
CAMUNDA_REST_ADDRESS=http://localhost:8080/v2
CAMUNDA_AUTH_STRATEGY=BASIC
CAMUNDA_BASIC_AUTH_USERNAME=your-username
CAMUNDA_BASIC_AUTH_PASSWORD=your-password
```

Or programmatically:

```python
from camunda_orchestration_sdk import CamundaClient

client = CamundaClient(
    configuration={
        "CAMUNDA_REST_ADDRESS": "http://localhost:8080/v2",
        "CAMUNDA_AUTH_STRATEGY": "BASIC",
        "CAMUNDA_BASIC_AUTH_USERNAME": "your-username",
        "CAMUNDA_BASIC_AUTH_PASSWORD": "your-password",
    }
)
```

## Deploying Resources

Deploy BPMN, DMN, or Form files from disk:

```python
from camunda_orchestration_sdk import CamundaClient

with CamundaClient() as client:
    result = client.deploy_resources_from_files(["process.bpmn", "decision.dmn"])

    print(f"Deployment key: {result.deployment_key}")
    for process in result.processes:
        print(f"  Process: {process.process_definition_id} (key: {process.process_definition_key})")
```

## Creating a Process Instance

The recommended pattern is to obtain keys from a prior API response (e.g. a deployment) and pass them directly — no manual lifting needed:

```python
from camunda_orchestration_sdk import CamundaClient, ProcessCreationByKey

with CamundaClient() as client:
    # Deploy and capture the typed key
    deployment = client.deploy_resources_from_files(["process.bpmn"])
    process_key = deployment.processes[0].process_definition_key

    # Use it directly — the type flows through without conversion
    result = client.create_process_instance(
        data=ProcessCreationByKey(process_definition_key=process_key)
    )
    print(f"Process instance key: {result.process_instance_key}")
```

If you need to restore a key from external storage (database, message queue, config file), wrap the raw string with the semantic type constructor:

```python
from camunda_orchestration_sdk import CamundaClient, ProcessDefinitionKey, ProcessCreationByKey

with CamundaClient() as client:
    stored_key = "2251799813685249"  # from a DB row or config
    result = client.create_process_instance(
        data=ProcessCreationByKey(process_definition_key=ProcessDefinitionKey(stored_key))
    )
    print(f"Process instance key: {result.process_instance_key}")
```

## Job Workers

Job workers long-poll for available jobs, execute a callback, and automatically complete or fail the job based on the return value. Workers are available on `CamundaAsyncClient`.

Handlers receive a context object that includes a `client` reference, so your handler can make API calls during job execution. The context type depends on the execution strategy:

- **Async handlers** → `ConnectedJobContext` with `client: CamundaAsyncClient` (use `await`)
- **Thread handlers** → `SyncJobContext` with `client: CamundaClient` (call directly)
- **Process handlers** → plain `JobContext` (no client — cannot be pickled across process boundaries)

```python
import asyncio
from camunda_orchestration_sdk import CamundaAsyncClient, ConnectedJobContext, WorkerConfig

async def handle_job(job_context: ConnectedJobContext) -> dict:
    variables = job_context.variables.to_dict()
    job_context.log.info(f"Processing job {job_context.job_key}: {variables}")
    return {"result": "processed"}

async def main():
    async with CamundaAsyncClient() as client:
        config = WorkerConfig(
            job_type="my-service-task",
            job_timeout_milliseconds=30_000,
        )
        client.create_job_worker(config=config, callback=handle_job)

        # Keep workers running until cancelled
        await client.run_workers()

asyncio.run(main())
```

### Using the Client in a Job Handler

Because `ConnectedJobContext` and `SyncJobContext` include a `client` reference, your handler can make API calls during job execution — for example, publishing a message to trigger another part of the process.

**Async handlers** (`execution_strategy="async"`) — `await` the client method directly:

```python
from camunda_orchestration_sdk import ConnectedJobContext, MessagePublicationRequest

async def handle_order(job: ConnectedJobContext) -> dict:
    variables = job.variables.to_dict()
    order_id = variables["orderId"]

    await job.client.publish_message(
        data=MessagePublicationRequest(
            name="order-processed",
            correlation_key=order_id,
            time_to_live=60000,
            variables={"orderId": order_id, "status": "completed"},
        )
    )

    job.log.info(f"Published order-processed message for order {order_id}")
    return {"status": "done"}
```

**Sync (thread) handlers** (`execution_strategy="thread"`) — `job.client` is a sync `CamundaClient`, so call methods directly:

```python
from camunda_orchestration_sdk import SyncJobContext, MessagePublicationRequest

def handle_order(job: SyncJobContext) -> dict:
    variables = job.variables.to_dict()
    order_id = variables["orderId"]

    job.client.publish_message(
        data=MessagePublicationRequest(
            name="order-processed",
            correlation_key=order_id,
            time_to_live=60000,
            variables={"orderId": order_id, "status": "completed"},
        )
    )

    job.log.info(f"Published order-processed message for order {order_id}")
    return {"status": "done"}
```

> **Note:** The SDK automatically provides the right client type for each strategy — async handlers get `CamundaAsyncClient` (use `await`), thread handlers get `CamundaClient` (call directly). You don't need to create or manage these clients yourself.

### Job Logger

Each `JobContext` exposes a `log` property — a scoped logger automatically bound with the job's context (job type, worker name, and job key). Use it inside your handler for structured, per-job log output:

```python
async def handler(job: JobContext) -> dict:
    job.log.info(f"Starting work on {job.job_key}")
    # ... do work ...
    job.log.debug("Work completed successfully")
    return {"done": True}
```

The job logger inherits the SDK's logger configuration (loguru by default, or whatever you passed via `logger=`). If you injected a custom logger into the client, job handlers will use a child of that same logger.

> **Note:** When using the `"process"` execution strategy, the job logger silently degrades to a no-op (`NullLogger`) because loggers cannot be pickled across process boundaries. The worker's main-process logger still records all job lifecycle events (activation, completion, failure, errors). If you need per-job logging from a process-isolated handler, configure a logger inside the handler itself.

### Execution Strategies

Job workers support multiple execution strategies to match your workload type. Pass `execution_strategy` as a keyword argument to `create_job_worker`, or let the SDK auto-detect.

| Strategy | How it runs your handler | Context type | Best for |
|----------|--------------------------|--------------|----------|
| `"auto"` (default) | Auto-detects: `"async"` for `async def` handlers, `"thread"` for sync handlers | `ConnectedJobContext` or `SyncJobContext` | Most use cases — sensible defaults without configuration |
| `"async"` | Runs on the main `asyncio` event loop | `ConnectedJobContext` (async client) | I/O-bound async work (HTTP calls, database queries). Best throughput for handlers that call remote systems over HTTP |
| `"thread"` | Runs in a `ThreadPoolExecutor` | `SyncJobContext` (sync client) | CPU-bound work, blocking I/O (file system, synchronous HTTP libraries) |
| `"process"` | Runs in a `ProcessPoolExecutor` | `JobContext` (no client) | Heavy CPU-bound work that needs to escape the GIL (image processing, ML inference) |

> **Choosing between `"async"` and `"thread"`:** If your job handler makes HTTP calls to remote systems (APIs, databases, microservices), `"async"` delivers the best performance — it can multiplex many concurrent jobs on a single thread without blocking. Use `"thread"` when your handler performs CPU-bound computation or calls synchronous libraries that would block the event loop.

**Auto-detection logic:** If your handler is an `async def`, the strategy defaults to `"async"`. If it's a regular `def`, the strategy defaults to `"thread"`. You can override this explicitly:

```python
from camunda_orchestration_sdk import SyncJobContext, JobContext

# Force thread pool for a sync handler (receives SyncJobContext)
def io_handler(job: SyncJobContext) -> dict:
    return {"done": True}

client.create_job_worker(
    config=WorkerConfig(job_type="io-bound-task", job_timeout_milliseconds=30_000),
    callback=io_handler,
    execution_strategy="thread",
)

# Force process pool for CPU-heavy work (receives plain JobContext)
def cpu_handler(job: JobContext) -> dict:
    return {"computed": True}

client.create_job_worker(
    config=WorkerConfig(job_type="image-processing", job_timeout_milliseconds=120_000),
    callback=cpu_handler,
    execution_strategy="process",
)
```

**Process strategy caveats:** The `"process"` strategy serialises (pickles) your handler and its context to send them to a worker process. Because the SDK client cannot be pickled, handlers running under this strategy receive a plain `JobContext` (without a `client` attribute) instead of `ConnectedJobContext`/`SyncJobContext`. This means:

- Your handler function and its closure must be picklable (top-level functions work; lambdas and closures over unpicklable objects do not).
- Your handler must accept `JobContext`, not `ConnectedJobContext` or `SyncJobContext` — the type checker enforces this via overloaded signatures on `create_job_worker`.
- `job.log` degrades to a silent no-op logger in the child process (see [Job Logger](#job-logger)).
- There is additional overhead per job from serialisation and inter-process communication.

### Worker Configuration

`WorkerConfig` supports:

| Parameter | Default | Description |
|-----------|---------|-------------|
| `job_type` | *(required)* | The BPMN service task type to poll for |
| `job_timeout_milliseconds` | *(required)* | How long the worker has to complete the job |
| `request_timeout_milliseconds` | `0` | Long-poll request timeout (0 = server default) |
| `max_concurrent_jobs` | `10` | Maximum jobs executing concurrently |
| `fetch_variables` | `None` | List of variable names to fetch (None = all) |
| `worker_name` | `"camunda-python-sdk-worker"` | Identifier for this worker in Camunda |

The following are keyword-only arguments on `create_job_worker`, not part of `WorkerConfig`:

| Parameter | Default | Description |
|-----------|---------|-------------|
| `execution_strategy` | `"auto"` | `"auto"`, `"async"`, `"thread"`, or `"process"`. Controls how the handler is invoked and which context type it receives. |
| `startup_jitter_max_seconds` | `0` | Maximum random delay (in seconds) before the worker starts polling. When multiple application instances restart simultaneously, this spreads out initial activation requests to avoid saturating the server. A value of `0` (the default) means no delay. |

### Failing a Job

To explicitly fail a job with a custom error message, retry count, and backoff, raise `JobFailure` in your handler:

```python
from camunda_orchestration_sdk import ConnectedJobContext, JobFailure

async def handle_job(job: ConnectedJobContext) -> dict:
    if not job.variables.to_dict().get("required_field"):
        raise JobFailure(
            message="Missing required field",
            retries=2,
            retry_back_off=5000,  # milliseconds
        )
    return {"result": "ok"}
```

| Parameter | Default | Description |
|-----------|---------|-------------|
| `message` | *(required)* | Error message attached to the failure |
| `retries` | `None` | Remaining retries. `None` decrements the current retry count by 1 |
| `retry_back_off` | `0` | Backoff before the next retry, in milliseconds |

If an unhandled exception escapes your handler, the job is automatically failed with the exception message and the retry count decremented by 1.

### Throwing a BPMN Error

To throw a [BPMN error](https://docs.camunda.io/docs/components/modeler/bpmn/error-events/) from a job handler — for example, to trigger an error boundary event — raise `JobError`:

```python
from camunda_orchestration_sdk import ConnectedJobContext, JobError

async def handle_payment(job: ConnectedJobContext) -> dict:
    variables = job.variables.to_dict()
    if variables.get("amount", 0) > 10_000:
        raise JobError(error_code="AMOUNT_TOO_HIGH", message="Payment exceeds limit")
    return {"status": "approved"}
```

| Parameter | Default | Description |
|-----------|---------|-------------|
| `error_code` | *(required)* | The error code that is matched against BPMN error catch events |
| `message` | `""` | An optional error message for logging/diagnostics |

The `error_code` must match the error code defined on a BPMN error catch event in your process model. If no catch event matches, the job becomes an incident.

## Error Handling

The SDK raises typed exceptions for API errors. Each operation has specific exception classes for each HTTP error status code:

```python
from camunda_orchestration_sdk import CamundaClient, ProcessCreationByKey
from camunda_orchestration_sdk.errors import CreateProcessInstanceBadRequest

with CamundaClient() as client:
    try:
        result = client.create_process_instance(
            data=ProcessCreationByKey(process_definition_key=99999)
        )
    except CreateProcessInstanceBadRequest as e:
        print(f"Bad request: {e}")
```

## Logging

By default the SDK logs via [loguru](https://github.com/Delgan/loguru). You can inject any logger that exposes `debug`, `info`, `warning`, and `error` methods — including Python's built-in `logging.Logger`.

### Using the default logger (loguru)

No configuration needed. Control verbosity with `CAMUNDA_SDK_LOG_LEVEL` or loguru's own `LOGURU_LEVEL` environment variable:

```bash
CAMUNDA_SDK_LOG_LEVEL=debug python your_script.py
```

### Injecting a custom logger

Pass a `logger=` argument to `CamundaClient` or `CamundaAsyncClient`. The logger is forwarded to all internal components (auth providers, HTTP hooks, job workers).

**stdlib `logging`:**

```python
import logging
from camunda_orchestration_sdk import CamundaClient

my_logger = logging.getLogger("my_app.camunda")
my_logger.setLevel(logging.DEBUG)

client = CamundaClient(logger=my_logger)
```

**Custom logger object:**

```python
from camunda_orchestration_sdk import CamundaClient

class MyLogger:
    def debug(self, msg, *args, **kwargs):
        print(f"[DEBUG] {msg}")
    def info(self, msg, *args, **kwargs):
        print(f"[INFO] {msg}")
    def warning(self, msg, *args, **kwargs):
        print(f"[WARN] {msg}")
    def error(self, msg, *args, **kwargs):
        print(f"[ERROR] {msg}")

client = CamundaClient(logger=MyLogger())
```

### Disabling logging

Pass an instance of `NullLogger` to silence all SDK output:

```python
from camunda_orchestration_sdk import CamundaClient, NullLogger

client = CamundaClient(logger=NullLogger())
```

## Backpressure

The SDK includes built-in adaptive backpressure management that protects the Camunda cluster from overload. When the cluster returns backpressure signals (HTTP 429, 503, or `RESOURCE_EXHAUSTED`), the SDK automatically reduces outbound concurrency. When conditions improve, it gradually recovers — returning to full throughput with no manual intervention.

This is enabled by default with the `BALANCED` profile and requires no configuration. Operations that drain work from the cluster (completing jobs, failing jobs) are never throttled.

| Profile | Behavior |
|---------|----------|
| `BALANCED` (default) | Adaptive concurrency gating with AIMD-style permit management and exponential backoff at floor. |
| `LEGACY` | Observe-only — records severity but never gates or queues requests. |

Set the profile via the `CAMUNDA_SDK_BACKPRESSURE_PROFILE` environment variable.

For detailed algorithm documentation, see [docs/backpressure.md](docs/backpressure.md).

## Configuration reference

All `CAMUNDA_*` environment variables recognised by the SDK. These can also be passed as keys in the `configuration={...}` dict.

<!-- BEGIN_CONFIG_REFERENCE -->

| Variable | Default | Description |
|----------|---------|-------------|
| `ZEEBE_REST_ADDRESS` | `http://localhost:8080/v2` | REST API base URL (alias for CAMUNDA_REST_ADDRESS). |
| `CAMUNDA_REST_ADDRESS` | `http://localhost:8080/v2` | REST API base URL. `/v2` is appended automatically if missing. |
| `CAMUNDA_TOKEN_AUDIENCE` | `zeebe.camunda.io` | OAuth token audience. |
| `CAMUNDA_OAUTH_URL` | `https://login.cloud.camunda.io/oauth/token` | OAuth token endpoint URL. |
| `CAMUNDA_CLIENT_ID` | — | OAuth client ID. |
| `CAMUNDA_CLIENT_SECRET` | — | OAuth client secret. |
| `CAMUNDA_CLIENT_AUTH_CLIENTID` | — | Alias for CAMUNDA_CLIENT_ID. |
| `CAMUNDA_CLIENT_AUTH_CLIENTSECRET` | — | Alias for CAMUNDA_CLIENT_SECRET. |
| `CAMUNDA_AUTH_STRATEGY` | `NONE` | Authentication strategy: NONE, OAUTH, or BASIC. Auto-inferred from credentials if omitted. |
| `CAMUNDA_BASIC_AUTH_USERNAME` | — | Basic auth username. Required when CAMUNDA_AUTH_STRATEGY=BASIC. |
| `CAMUNDA_BASIC_AUTH_PASSWORD` | — | Basic auth password. Required when CAMUNDA_AUTH_STRATEGY=BASIC. |
| `CAMUNDA_SDK_LOG_LEVEL` | `error` | SDK log level: silent, error, warn, info, debug, trace, or silly. |
| `CAMUNDA_TOKEN_CACHE_DIR` | — | Directory for OAuth token disk cache. Disabled if unset. |
| `CAMUNDA_TOKEN_DISK_CACHE_DISABLE` | `false` | Disable OAuth token disk caching. |
| `CAMUNDA_SDK_BACKPRESSURE_PROFILE` | `BALANCED` | Backpressure profile: BALANCED (adaptive gating, default) or LEGACY (observe-only, no gating). |
| `CAMUNDA_LOAD_ENVFILE` | — | Load configuration from a `.env` file. Set to `true` (or a file path). |

<!-- END_CONFIG_REFERENCE -->


<!-- docs:cut:start -->
## Contributing

See [CONTRIBUTING.md](CONTRIBUTING.md) for development setup and generation workflow. See [MAINTAINER.md](MAINTAINER.md) for architecture and pipeline documentation.

## License

Apache-2.0
<!-- docs:cut:end -->


