Metadata-Version: 2.4
Name: inspectica-workflow-sdk
Version: 1.1.0
Summary: Python SDK for Inspectica Workflow Server - Activity worker implementation
Author: Inspectica Team
License: MIT
Project-URL: Homepage, https://github.com/inspectica/workflow-server
Project-URL: Documentation, https://github.com/inspectica/workflow-server/tree/main/packages/python-sdk
Project-URL: Repository, https://github.com/inspectica/workflow-server
Keywords: workflow,orchestration,kafka,distributed
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.10
Description-Content-Type: text/markdown
Requires-Dist: aiokafka>=0.10.0
Requires-Dist: aiohttp>=3.9.0
Requires-Dist: pydantic>=2.5.0
Requires-Dist: aws-msk-iam-sasl-signer-python>=1.0.0
Provides-Extra: dev
Requires-Dist: pytest>=8.0.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.23.0; extra == "dev"
Requires-Dist: ruff>=0.2.0; extra == "dev"

# @inspectica/workflow-sdk (Python)

Python SDK for Inspectica Workflow Server - Execute workflows and activities.

## Installation

### From PyPI (when published)
```bash
pip install inspectica-workflow-sdk
```

### Local Development Install
```bash
# From the python-sdk directory
pip install -e .

# Or from the repo root
pip install -e ./packages/python-sdk
```

## Quick Start

```python
import logging
from workflow_sdk import Worker, workflow, activity

logging.basicConfig(level=logging.INFO)

# Define an activity
@activity.defn
async def say_hello(input_data: dict) -> dict:
    name = input_data.get("name", "World")
    return {"message": f"Hello, {name}!"}

# Define a workflow
@workflow.defn
class HelloWorldWorkflow:
    @workflow.run
    async def run(self, input_data: dict) -> dict:
        result = await workflow.execute_activity(say_hello, input_data)
        return result

# Create a worker with API key (get from server UI)
worker = Worker(
    server_url="http://localhost:8080",
    api_key="wk_abc123...",
)

# Register workflows and activities
worker.register(
    workflows=[HelloWorldWorkflow],
    activities=[say_hello],
)

# Start the worker
worker.run()
```

The worker will:
1. Connect to the server with the API key
2. Receive Kafka credentials and topic names automatically
3. Send periodic heartbeats (visible in server UI)
4. Gracefully disconnect on shutdown

## Configuration

### Worker Configuration

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `server_url` | str | Yes | Workflow server URL |
| `api_key` | str | Yes | Worker API key (from server UI) |
| `group_id` | str | No | Consumer group ID (default: auto-generated) |
| `max_concurrent_workflows` | int | No | Max concurrent workflows (default: 10) |

```python
worker = Worker(
    server_url="https://workflow.example.com",
    api_key="wk_abc123...",
    max_concurrent_workflows=10,
)
```

## Activities

Activities are the building blocks of workflows. They represent individual units of work.

```python
from workflow_sdk import activity

@activity.defn
async def my_activity(input_data: dict) -> dict:
    # input_data is the JSON input from the workflow
    # Return value is serialized to JSON
    return {"result": "done"}

# Sync activities are also supported
@activity.defn
def sync_activity(input_data: dict) -> dict:
    return {"upper": input_data.get("text", "").upper()}
```

### Error Handling

Raise exceptions to signal activity failure:

```python
@activity.defn
async def risky_activity(input_data: dict) -> dict:
    if not input_data.get("valid"):
        raise ValueError("Invalid input data")
    
    result = await external_api_call(input_data)
    return {"result": result}
```

## Workflows

Workflows orchestrate activities and define the business logic.

```python
from workflow_sdk import workflow, activity

@activity.defn
async def step1(input_data: dict) -> dict:
    return {"result": input_data["data"] + "_step1"}

@activity.defn
async def step2(input_data: dict) -> dict:
    return {"result": input_data["data"] + "_step2"}

@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self, input_data: dict) -> dict:
        # Execute activities sequentially
        result1 = await workflow.execute_activity(step1, {"data": input_data["initial"]})
        result2 = await workflow.execute_activity(step2, {"data": result1["result"]})
        
        return {"final_result": result2["result"]}
```

### Accessing Workflow Context

You can access the workflow context to get metadata (like access tokens):

```python
from workflow_sdk import workflow

@workflow.defn
class AuthenticatedWorkflow:
    @workflow.run
    async def run(self, input_data: dict) -> dict:
        ctx = workflow.get_current_context()
        
        # Access metadata passed from the workflow submission
        access_token = ctx.metadata.get("access_token")
        workflow_id = ctx.workflow_id
        
        # Use the token for authorization checks
        # ...
        
        return {"success": True}
```

### Cross-Worker Activity Execution

You can call activities registered on other workers by using the activity name as a string:

```python
@workflow.defn
class CrossWorkerWorkflow:
    @workflow.run
    async def run(self, input_data: dict) -> dict:
        # Call an activity on another worker by name
        result = await workflow.execute_activity(
            "remote_activity_name",  # Activity registered on another worker
            {"data": input_data["data"]},
            start_to_close_timeout_ms=120000,  # timeout in ms
        )
        
        return result
```

## Complete Example

```python
import logging
import os
from workflow_sdk import Worker, workflow, activity

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)

@activity.defn
async def say_hello(input_data: dict) -> dict:
    name = input_data.get("name", "World")
    return {"message": f"Hello, {name}!"}

@activity.defn
async def process_order(input_data: dict) -> dict:
    order_id = input_data.get("order_id")
    return {"order_id": order_id, "status": "processed"}

@workflow.defn
class OrderWorkflow:
    @workflow.run
    async def run(self, input_data: dict) -> dict:
        greeting = await workflow.execute_activity(say_hello, {"name": "Customer"})
        order = await workflow.execute_activity(process_order, input_data)
        return {"greeting": greeting, "order": order}

worker = Worker(
    server_url=os.environ.get("WORKFLOW_SERVER_URL", "http://localhost:8080"),
    api_key=os.environ.get("WORKFLOW_API_KEY"),
)

worker.register(
    workflows=[OrderWorkflow],
    activities=[say_hello, process_order],
)

if __name__ == "__main__":
    worker.run()
```

## API Reference

### Worker

```python
class Worker:
    def __init__(
        self,
        server_url: str,
        api_key: str,
        group_id: str | None = None,
        max_concurrent_workflows: int = 10,
    ): ...
    
    def register(
        self,
        workflows: list[type] | None = None,
        activities: list[Callable] | None = None,
    ) -> None: ...
    
    def run(self) -> None: ...  # Blocking
    
    async def stop(self) -> None: ...
```

### Workflow Context

```python
class WorkflowContext:
    workflow_id: str
    metadata: dict[str, Any]
    activity_results: list[Any]
    
    async def execute_activity(
        self,
        activity: Callable | str,
        input_data: Any = None,
        start_to_close_timeout_ms: int = 60000,
    ) -> Any: ...
```

## Features

- **Workflow Execution**: Define and execute complex workflows with multiple activities
- **Activity Replay**: Automatic replay of completed activities on workflow resume
- **Cross-Worker Routing**: Execute activities on different workers via the server
- **Concurrency Control**: Limit concurrent workflow executions to prevent OOM
- **AWS MSK IAM Support**: Built-in support for AWS MSK with IAM authentication
- **Graceful Shutdown**: Proper cleanup on SIGINT/SIGTERM signals
- **Heartbeat**: Automatic heartbeat to keep worker connection alive

## Development

```bash
# Create conda environment
conda create -n workflow-sdk python=3.11
conda activate workflow-sdk

# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Lint
ruff check .
```

## License

MIT
