Metadata-Version: 2.4
Name: improvado-pipeline-sdk
Version: 0.2.0
Summary: SDK for authoring Improvado custom pipeline workflows on Temporal
License-Expression: Apache-2.0
Keywords: etl,pipeline,temporal,workflow
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3.12
Classifier: Typing :: Typed
Requires-Python: >=3.12
Requires-Dist: aiohttp>=3.9
Requires-Dist: temporalio>=1.10
Provides-Extra: lint
Requires-Dist: pyright>=1.1; extra == 'lint'
Requires-Dist: ruff>=0.4; extra == 'lint'
Provides-Extra: state
Requires-Dist: clickhouse-connect>=0.7; extra == 'state'
Description-Content-Type: text/markdown

# improvado-pipeline-sdk

SDK for authoring custom data pipeline workflows on [Temporal](https://temporal.io).

Provides type definitions, helper functions, and constants for building
ETL/ELT pipelines that run on Improvado's Temporal infrastructure.

## Install

```bash
pip install improvado-pipeline-sdk
```

## Usage

### Workflow code

```python
import dataclasses
from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    from pipeline_sdk.types import (
        DataRef,
        DateRangeRequest,
        DateRangeType,
    )
    from pipeline_sdk.activities import (
        les_extract,
        ch_load,
        prepare_credentials,
        cleanup_credentials,
        CUSTOM_ACTIVITY_QUEUE,
    )


@dataclasses.dataclass(frozen=True)
class MyPipelineParams:
    connector_id: int


@workflow.defn(sandboxed=False, name="MyPipeline")
class MyPipelineWorkflow:
    @workflow.run
    async def run(self, params: MyPipelineParams) -> dict:
        result = await les_extract(
            connector_id=params.connector_id,
            data_source="facebook",
            report_type="ad_insights",
            date_range=DateRangeRequest(
                date_range_type=DateRangeType.MANUAL,
                params={"date_from": "2026-01-01", "date_to": "2026-03-31"},
            ),
        )
        return {"data_ref": result.data_ref.uri}
```

### Activity code

```python
from temporalio import activity
from pipeline_sdk.runtime import (
    read_pipeline_secret,
    get_current_tenant,
    PipelineState,
)


@activity.defn
async def my_activity(creds) -> dict:
    tenant = get_current_tenant()
    secret = await read_pipeline_secret(creds)
    # secret.s3, secret.storage, secret.connections
    return {"tenant": tenant.agency_uuid}
```

### Worker / client setup

```python
from pipeline_sdk.tenant import (
    TenantInterceptor,       # worker side
    TenantClientInterceptor, # client side
    TenantID,
)
```

### Type checking

```bash
pyright my_workflow.py
ruff check my_workflow.py
ruff format my_workflow.py
```

Both `pyright` and `ruff` are included as dependencies.

## Package structure

| Module | Purpose |
|---|---|
| `pipeline_sdk.types` | Pure, Temporal-serializable data types and enums — `DataRef`, `TenantID`, `Cluster`, `DateRangeRequest`, `DateRangeType`, `PipelineCredentials`, `PipelineSecret`, `S3Credentials`, `StorageCredentials`, `LesActivityWithS3Result`, `PipelineLoadResult`. Safe to import from either workflow or activity code. |
| `pipeline_sdk.activities` | Predefined workflow-side wrappers — `les_extract`, `ch_load`, `prepare_credentials`, `cleanup_credentials`, plus `CUSTOM_ACTIVITY_QUEUE`. Import inside `workflow.unsafe.imports_passed_through()`. |
| `pipeline_sdk.runtime` | Activity-side helpers — `read_pipeline_secret`, `PipelineState`, `get_current_tenant`, plus `VAULT_ADDR` / `VAULT_MOUNT_POINT` for Vault config overrides. |
| `pipeline_sdk.tenant` | Tenant propagation infrastructure — `TenantInterceptor`, `TenantClientInterceptor`, `build_tenant_headers`, plus `TenantID` / `get_current_tenant` re-exports. |
