Metadata-Version: 2.4
Name: improvado-pipeline-sdk
Version: 0.9.1
Summary: SDK for authoring Improvado custom pipeline workflows on Temporal
License-Expression: Apache-2.0
Keywords: etl,pipeline,temporal,workflow
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Typing :: Typed
Requires-Python: >=3.10
Requires-Dist: aiohttp>=3.9
Requires-Dist: clickhouse-connect>=0.7
Requires-Dist: nexus-rpc>=1.1
Requires-Dist: pyright>=1.1
Requires-Dist: ruff>=0.4
Requires-Dist: temporalio>=1.26
Description-Content-Type: text/markdown

# improvado-pipeline-sdk

SDK for authoring custom data pipeline workflows on [Temporal](https://temporal.io).

Provides type definitions, helper functions, and constants for building
ETL/ELT pipelines that run on Improvado's Temporal infrastructure.

## Install

```bash
pip install improvado-pipeline-sdk
```

## Usage

### Workflow code

```python
import dataclasses
from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    from pipeline_sdk.types import (
        DataRef,
        DateRangeRequest,
        DateRangeType,
    )
    from pipeline_sdk.activities import (
        les_extract,
        ch_load,
        prepare_credentials,
        cleanup_credentials,
    )


@dataclasses.dataclass(frozen=True)
class MyPipelineParams:
    connector_id: int


@workflow.defn(sandboxed=False, name="MyPipeline")
class MyPipelineWorkflow:
    @workflow.run
    async def run(self, params: MyPipelineParams) -> dict:
        result = await les_extract(
            connector_id=params.connector_id,
            data_source="facebook",
            report_type="ad_insights",
            date_range=DateRangeRequest(
                date_range_type=DateRangeType.MANUAL,
                params={"date_from": "2026-01-01", "date_to": "2026-03-31"},
            ),
        )
        return {"data_ref": result.data_ref.uri}
```

### Activity code

```python
from temporalio import activity
from pipeline_sdk.runtime import (
    read_pipeline_secret,
    get_current_tenant,
    PipelineState,
)


@activity.defn
async def my_activity(creds) -> dict:
    tenant = get_current_tenant()
    secret = await read_pipeline_secret(creds)
    # secret.s3, secret.storage, secret.connections
    return {"tenant": tenant.agency_uuid}
```

### Worker / client setup

```python
from pipeline_sdk.tenant import (
    TenantInterceptor,       # worker side
    TenantClientInterceptor, # client side
    TenantID,
)
```

### Run a workflow locally against production Temporal

For inner-loop debugging — write your workflow on your laptop and run
it against production Temporal under your tenant identity. Activities
defined in your file run on the laptop; cross-queue activities such as
`les_extract`, `ch_load`, `prepare_credentials` are dispatched to
Improvado shared queues and execute on Improvado infra.

```bash
pip install improvado-pipeline-sdk

# Mint a Temporal JWT — ask your platform team for the runbook.
export TEMPORAL_API_URL=flow-api.tools.improvado.io:443
export TEMPORAL_AUTH_JWT=$(cat /tmp/temporal-token)
export TENANT_AGENCY_UUID=fc010783-7a09-449b-84ab-0c1660b15245
export TENANT_WORKSPACE_ID=42
# optional: TENANT_USER_ID=99       # ADR-010 user-scope tenants
# optional: TEMPORAL_TLS=false      # debug only — local docker-compose
# optional: VAULT_ADDR / VAULT_MOUNT_POINT — point read_pipeline_secret at prod
```

**One-shot mode (default)** — point at a workflow file, the runner
spins up a worker on the tenant queue, starts the workflow, prints
the result, and exits:

```bash
python -m pipeline_sdk hello_sdk.py
python -m pipeline_sdk hello_sdk.py --params '{"label": "hi"}'
python -m pipeline_sdk ./pipelines/ --name HelloSdkWorkflow --params-file params.json
```

**Worker-only mode** — long-running worker that polls the tenant
queue `tenant-{agency}-{workspace}[-{user}]`. Trigger runs through
Workflow Manager API or Temporal CLI:

```bash
python -m pipeline_sdk ./pipelines/ --worker-only
# or use the dedicated entry: improvado-pipeline-run ./pipelines/ --worker-only
```

`Ctrl-C` drains and exits in both modes.

### Type checking

```bash
pyright my_workflow.py
ruff check my_workflow.py
ruff format my_workflow.py
```

Both `pyright` and `ruff` are included as dependencies.

## Package structure

| Module | Purpose |
|---|---|
| `pipeline_sdk.types` | Pure, Temporal-serializable data types and enums — `DataRef`, `TenantID`, `Cluster`, `DateRangeRequest`, `DateRangeType`, `PipelineCredentials`, `PipelineSecret`, `S3Credentials`, `StorageCredentials`, `LesActivityWithS3Result`, `PipelineLoadResult`. Safe to import from either workflow or activity code. |
| `pipeline_sdk.activities` | Predefined workflow-side wrappers — `les_extract`, `ch_load`, `prepare_credentials`, `cleanup_credentials`. Import inside `workflow.unsafe.imports_passed_through()`. Custom activities defined in your own module inherit the workflow's task queue — do not pass `task_queue` to `workflow.execute_activity`. |
| `pipeline_sdk.runtime` | Activity-side helpers — `read_pipeline_secret`, `PipelineState`, `get_current_tenant`, plus `VAULT_ADDR` / `VAULT_MOUNT_POINT` for Vault config overrides. |
| `pipeline_sdk.tenant` | Tenant propagation infrastructure — `TenantInterceptor`, `TenantClientInterceptor`, `build_tenant_headers`, plus `TenantID` / `get_current_tenant` re-exports. |
