Metadata-Version: 2.4
Name: orbflow-sdk
Version: 0.1.0
Summary: Python SDK for building Orbflow workflow plugins
Project-URL: Homepage, https://github.com/orbflow-dev/orbflow-python
Project-URL: Repository, https://github.com/orbflow-dev/orbflow-python
Project-URL: Documentation, https://github.com/orbflow-dev/orbflow-python/tree/main/docs
Project-URL: Issues, https://github.com/orbflow-dev/orbflow-python/issues
Project-URL: Changelog, https://github.com/orbflow-dev/orbflow-python/releases
Author: Orbflow Authors
License-Expression: Apache-2.0
License-File: LICENSE
Keywords: automation,grpc,orbflow,plugin,workflow
Classifier: Development Status :: 3 - Alpha
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Python: >=3.10
Requires-Dist: grpcio>=1.60.0
Requires-Dist: protobuf>=4.25.0
Provides-Extra: dev
Requires-Dist: grpcio-tools>=1.60.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23; extra == 'dev'
Requires-Dist: pytest-cov>=5.0; extra == 'dev'
Requires-Dist: pytest>=8.0; extra == 'dev'
Description-Content-Type: text/markdown

# orbflow-sdk

Python SDK for building [Orbflow](https://github.com/orbflow-dev/orbflow) workflow plugins.

Build custom action and trigger nodes that appear in the Orbflow visual workflow builder and execute within the distributed engine. No protobuf code generation required.

## Table of Contents

- [Installation](#installation)
- [Quick Start](#quick-start)
- [Deploying to Orbflow](#deploying-to-orbflow)
- [API Reference](#api-reference)
  - [@plugin — Plugin Decorator](#plugin--plugin-decorator)
  - [@action — Action Node](#action--action-node)
  - [@trigger — Trigger Node](#trigger--trigger-node)
  - [Field — Field Builder](#field--field-builder)
  - [run() — gRPC Server](#run--grpc-server)
- [Handler Context](#handler-context-executecontext)
- [Return Values](#return-values)
- [Plugin Manifest](#plugin-manifest)
- [Examples](#examples)
- [Architecture](#architecture)
- [Development](#development)
- [License](#license)

## Installation

```bash
pip install orbflow-sdk
```

**Requirements:** Python >= 3.10

**Dependencies** (installed automatically):
- `grpcio` >= 1.60.0
- `protobuf` >= 4.25.0

## Quick Start

Create a file called `main.py`:

```python
from orbflow_sdk import plugin, action, Field, run

@plugin(name="my-plugin", version="1.0.0", author="You", category="utility")
class MyPlugin:

    @action(
        inputs=[Field.string("name", required=True)],
        outputs=[Field.string("greeting")],
    )
    async def greet(self, ctx):
        return {"greeting": f"Hello, {ctx.input['name']}!"}

if __name__ == "__main__":
    run(MyPlugin)
```

Run it locally:

```bash
python main.py
# [orbflow-sdk] Plugin 'my-plugin' v1.0.0 serving on 0.0.0.0:50051 (1 actions)
```

Your plugin is now running a gRPC server. The Orbflow engine will discover it, fetch its node schemas, and make the "Greet" action available in the workflow builder.

The `ref` and display name auto-derive from the method name (`greet` -> `plugin:greet`, `"Greet"`). Override with explicit `ref=` and `name=` if needed.

## Deploying to Orbflow

There are two ways to connect your plugin to a running Orbflow instance.

### Option 1: Managed Plugin (Recommended)

Place your plugin in the Orbflow plugins directory and let the engine manage its lifecycle.

**1. Create the plugin directory:**

```
plugins/
└── my-plugin/
    ├── main.py              # Entry point
    ├── pyproject.toml       # Dependencies
    └── orbflow-plugin.json  # Plugin manifest
```

**2. Create the manifest** (`orbflow-plugin.json`):

```json
{
  "name": "my-plugin",
  "version": "1.0.0",
  "description": "My custom plugin",
  "author": "Your Name",
  "language": "python",
  "protocol": {
    "Grpc": { "default_port": 50051 }
  },
  "node_types": ["plugin:greet"]
}
```

**3. Start Orbflow.** The engine will:
- Detect your plugin in the `plugins/` directory
- Create a virtual environment and install dependencies automatically
- Launch your plugin with `python main.py`
- Assign a port via `ORBFLOW_PLUGIN_PORT` environment variable
- Poll the health check endpoint until the plugin is ready
- Call `GetSchemas` to register your nodes in the workflow builder

**Reading the assigned port:**

```python
import os

if __name__ == "__main__":
    port = int(os.environ.get("ORBFLOW_PLUGIN_PORT", "50051"))
    run(MyPlugin, port=port)
```

### Option 2: Standalone Plugin

Run your plugin independently and tell Orbflow where to find it.

**1. Start your plugin:**

```bash
python main.py
# Listening on 0.0.0.0:50051
```

**2. Register in Orbflow config** (`configs/orbflow.yaml`):

```yaml
plugins:
  dir: "./plugins"
  grpc:
    - name: "my-plugin"
      address: "http://localhost:50051"
      timeout_secs: 30
```

**3. Restart Orbflow.** The engine connects to your plugin's gRPC endpoint, calls `GetSchemas`, and registers the nodes.

This approach is useful for:
- Plugins running on a different machine or in a separate container
- Development -- run the plugin with hot reload while Orbflow runs separately
- Plugins that need to manage their own lifecycle

### Verifying Deployment

Once deployed, your plugin's nodes appear in the Orbflow workflow builder's node picker. You can verify by:

1. Opening the workflow builder UI at `http://localhost:3000`
2. Clicking the "+" button to open the node picker
3. Searching for your plugin's action name (e.g., "Greet")
4. The node should appear with the icon and category you defined

Or via the API:

```bash
curl http://localhost:8080/api/node-types | jq '.data[] | select(.ref | startswith("plugin:"))'
```

## API Reference

### `@plugin` — Plugin Decorator

Marks a class as an Orbflow plugin.

```python
@plugin(
    name="orbflow-weather",       # Unique package name
    version="1.0.0",              # SemVer version
    author="Your Name",           # Shown in marketplace
    category="utility",           # Category tab in marketplace
    icon="cloud",                 # Lucide icon name
    description="Get weather forecasts",
)
class WeatherPlugin:
    ...
```

### `@action` — Action Node

Turns a method into a workflow action node.

```python
@action(
    inputs=[
        Field.string("city", required=True).label("City Name"),
    ],
    outputs=[
        Field.number("temperature").label("Temperature (F)"),
        Field.string("condition").label("Condition"),
    ],
    parameters=[
        Field.string("units").default("fahrenheit").enum("fahrenheit", "celsius"),
    ],
)
async def weather_forecast(self, ctx):
    city = ctx.input["city"]
    units = ctx.parameters.get("units", "fahrenheit")
    return {"temperature": 72, "condition": "sunny"}
```

**Naming rules:**
- Action refs auto-derive from the method name: `weather_forecast` becomes `plugin:weather-forecast`
- Display names auto-capitalize: `weather_forecast` becomes `"Weather Forecast"`
- Override with explicit `ref=` and `name=` parameters

**Inputs** are data flowing from upstream nodes. **Parameters** are static configuration set in the builder. **Outputs** define data passed to downstream nodes.

### `@action` / `@trigger` Parameters

| Parameter | Type | Required | Default |
|-----------|------|----------|---------|
| `ref` | `str` | No | Auto: `plugin:<method-name>` |
| `name` | `str` | No | Auto: capitalized method name |
| `description` | `str` | No | `""` |
| `inputs` | `list[Field]` | No | `()` |
| `outputs` | `list[Field]` | No | `()` |
| `parameters` | `list[Field]` | No | `()` |

### `@trigger` — Trigger Node

For nodes that start a workflow:

```python
from orbflow_sdk import trigger

@trigger(
    outputs=[Field.string("filename"), Field.number("size")],
)
async def on_file_upload(self, ctx):
    return {"filename": "report.pdf", "size": 1024}
```

### `Field` — Field Builder

Fluent field definitions with chainable methods.

**Field types:**

| Factory | Type | UI Control |
|---------|------|------------|
| `Field.string(key)` | `"string"` | Text input |
| `Field.number(key)` | `"number"` | Number input |
| `Field.boolean(key)` | `"boolean"` | Toggle switch |
| `Field.object(key)` | `"object"` | JSON editor |
| `Field.array(key)` | `"array"` | Array editor |
| `Field.credential(key)` | `"credential"` | Credential picker |

**Chainable methods (all return `self`):**

```python
Field.string("name", required=True)
    .label("Full Name")          # Display label (defaults to key)
    .description("Enter name")   # Help text shown on hover
    .default("World")            # Default value
    .enum("a", "b", "c")        # Dropdown options
```

**Credential fields** accept type filtering to restrict which stored credentials are selectable:

```python
Field.credential("api_key").types("openai", "anthropic")
```

### `run()` — gRPC Server

Starts the gRPC server. Blocks until SIGTERM/SIGINT.

```python
run(MyPlugin)

# With options:
run(MyPlugin, host="0.0.0.0", port=50051, max_workers=10)
```

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `plugin_class` | `type` | Required | The `@plugin`-decorated class |
| `host` | `str` | `"0.0.0.0"` | Bind address |
| `port` | `int` | `50051` | gRPC port |
| `max_workers` | `int` | `10` | Max concurrent RPC handlers |

**Server details:**
- Uses `grpcio` for gRPC transport, wire-compatible with Orbflow's Rust `tonic` client
- Graceful shutdown on SIGTERM/SIGINT
- Error messages are sanitized -- full errors logged to console, generic "internal error" sent to caller

**When managed by Orbflow**, read the port from the environment:

```python
import os

run(MyPlugin, port=int(os.environ.get("ORBFLOW_PLUGIN_PORT", "50051")))
```

## Handler Context (`ExecuteContext`)

Every action/trigger handler receives an `ExecuteContext`:

| Property | Type | Description |
|----------|------|-------------|
| `ctx.input` | `dict[str, Any]` | Data from upstream nodes (mapped via CEL expressions or direct wiring) |
| `ctx.parameters` | `dict[str, Any]` | Static config values set in the workflow builder UI |
| `ctx.config` | `dict[str, Any]` | Resolved credentials and node configuration |
| `ctx.capabilities` | `dict[str, Any]` | Output from capability nodes (e.g., database connections) |
| `ctx.instance_id` | `str` | Workflow execution instance ID |
| `ctx.node_id` | `str` | This node's ID in the workflow DAG |
| `ctx.plugin_ref` | `str` | Action ref (e.g., `"plugin:weather-forecast"`) |
| `ctx.attempt` | `int` | Retry attempt number (0-based) |

## Return Values

Handlers can return data in three ways:

```python
# 1. Dict (most common) -- becomes the node's output data
async def my_action(self, ctx):
    return {"result": "hello", "count": 42}

# 2. ActionResult -- explicit error (for controlled error reporting)
from orbflow_sdk import ActionResult

async def my_action(self, ctx):
    return ActionResult(error="Something went wrong")

# 3. None -- no output data (for side-effect-only nodes like "send notification")
async def my_action(self, ctx):
    await send_notification(ctx.input["message"])
```

**Error handling:**

```python
# Raising an exception marks the node as failed:
async def my_action(self, ctx):
    raise RuntimeError("API rate limit exceeded")

# Or return an explicit error with partial data:
async def my_action(self, ctx):
    return ActionResult(error="Warning: some items failed")
```

## Plugin Manifest

When deploying as a managed plugin, create an `orbflow-plugin.json` in your plugin directory:

```json
{
  "name": "my-plugin",
  "version": "1.0.0",
  "description": "Short description of what this plugin does",
  "author": "Your Name",
  "license": "MIT",
  "language": "python",
  "protocol": {
    "Grpc": { "default_port": 50051 }
  },
  "node_types": [
    "plugin:action-one",
    "plugin:action-two"
  ],
  "tags": ["utility", "transform"],
  "repository": "https://github.com/your-org/your-plugin",
  "icon": "zap",
  "category": "utility",
  "color": "#7C5CFC"
}
```

| Field | Required | Description |
|-------|----------|-------------|
| `name` | Yes | Plugin name (1-64 chars, alphanumeric + hyphens + underscores) |
| `version` | Yes | Semver version string |
| `language` | Yes | `"python"`, `"typescript"`, `"javascript"`, or custom |
| `protocol` | Yes | `{ "Grpc": { "default_port": 50051 } }` for gRPC plugins |
| `node_types` | Yes | Array of `plugin:*` refs this plugin provides |
| `description` | No | Short description for the marketplace |
| `author` | No | Author name |
| `icon` | No | Lucide icon name for the node in the builder |
| `category` | No | Marketplace category |
| `color` | No | Hex color for the node border |
| `tags` | No | Searchable tags |
| `repository` | No | Source code URL |
| `license` | No | SPDX license identifier |

## Examples

### UUID Generator

A simple utility plugin with parameters:

```python
import uuid
from orbflow_sdk import plugin, action, Field, run

@plugin(name="orbflow-uuid-gen", version="1.0.0", author="Orbflow", category="utility", icon="hash")
class UuidGenPlugin:

    @action(
        ref="plugin:uuid-gen",
        name="UUID Generator",
        outputs=[Field.string("uuid"), Field.array("uuids")],
        parameters=[
            Field.number("count").default(1).description("How many UUIDs"),
            Field.string("format").default("hyphenated").enum("hyphenated", "simple", "upper"),
        ],
    )
    async def generate(self, ctx):
        count = min(max(int(ctx.parameters.get("count", 1)), 1), 1000)
        uuids = [str(uuid.uuid4()) for _ in range(count)]
        return {"uuid": uuids[0], "uuids": uuids}

if __name__ == "__main__":
    run(UuidGenPlugin)
```

### Credential-Authenticated API Plugin

Using the credential system to securely access external APIs:

```python
import json
import urllib.request
from orbflow_sdk import plugin, action, Field, ActionResult, run

@plugin(name="orbflow-ai-codegen", version="1.0.0", author="Orbflow", category="ai", icon="terminal")
class AiCodegenPlugin:

    @action(
        ref="plugin:ai-codegen",
        name="AI Code Generator",
        inputs=[Field.string("prompt", required=True)],
        outputs=[Field.string("code"), Field.string("model")],
        parameters=[
            Field.string("model").default("gpt-4o-mini"),
            Field.credential("credential_id").types("openai", "anthropic"),
        ],
    )
    async def generate(self, ctx):
        api_key = ctx.config.get("api_key", "")
        if not api_key:
            return ActionResult(error="Missing API key -- attach a credential")

        body = json.dumps({
            "model": ctx.parameters.get("model", "gpt-4o-mini"),
            "messages": [{"role": "user", "content": ctx.input["prompt"]}],
        }).encode()

        base_url = ctx.config.get("base_url", "https://api.openai.com/v1").rstrip("/")
        req = urllib.request.Request(
            f"{base_url}/chat/completions",
            data=body,
            headers={"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"},
        )
        with urllib.request.urlopen(req, timeout=60) as resp:
            data = json.loads(resp.read())

        return {"code": data["choices"][0]["message"]["content"], "model": data.get("model", "")}

if __name__ == "__main__":
    run(AiCodegenPlugin)
```

**Manifest for this plugin** (`orbflow-plugin.json`):

```json
{
  "name": "orbflow-ai-codegen",
  "version": "1.0.0",
  "language": "python",
  "protocol": { "Grpc": { "default_port": 50051 } },
  "node_types": ["plugin:ai-codegen"]
}
```

## Architecture

```mermaid
graph TD
    subgraph Engine["Orbflow Engine"]
        Server["Server\n(HTTP / gRPC)"]
        Worker["Worker\n(task execution)"]
        Loader["Plugin Loader\n(discovery + lifecycle)"]
    end

    Loader -- "gRPC (HTTP/2)" --> YourPlugin["Your Plugin\n(orbflow-sdk Python)"]
    Loader -- "gRPC (HTTP/2)" --> OtherPlugin["Other Plugins\n(TypeScript SDK, custom)"]
    Worker --> Loader
    Server --> Worker

    style Engine fill:#1a1a2e,stroke:#7C5CFC,stroke-width:2px,color:#fff
    style YourPlugin fill:#7C5CFC,stroke:#5a3fd4,stroke-width:2px,color:#fff
    style OtherPlugin fill:#2d2d44,stroke:#555,stroke-width:1px,color:#aaa
    style Server fill:#2d2d44,stroke:#7C5CFC,color:#fff
    style Worker fill:#2d2d44,stroke:#7C5CFC,color:#fff
    style Loader fill:#2d2d44,stroke:#7C5CFC,color:#fff
```

**How it works:**

1. **Startup**: Orbflow scans the `plugins/` directory, finds your `orbflow-plugin.json` manifest
2. **Install**: Creates a venv and installs dependencies if `pyproject.toml` exists
3. **Launch**: Starts your plugin with `python main.py`, passing `ORBFLOW_PLUGIN_PORT`
4. **Health check**: Polls `HealthCheck` RPC every 500ms until the plugin reports healthy (30s timeout)
5. **Schema discovery**: Calls `GetSchemas` RPC to learn about your plugin's nodes (inputs, outputs, parameters, icons, categories)
6. **Registration**: Registers each `plugin:*` ref as a node type in the engine, available in the builder UI
7. **Execution**: When a workflow runs your node, the engine calls `Execute` RPC with input data, your handler runs, and the result flows to downstream nodes
8. **Shutdown**: On SIGTERM, the engine gracefully stops all managed plugin processes

**gRPC service** (`orbflow.plugin.v1.OrbflowPlugin`):

| RPC | Description |
|-----|-------------|
| `Execute(ExecuteRequest) -> ExecuteResponse` | Run an action with input data |
| `GetSchemas(GetSchemasRequest) -> GetSchemasResponse` | Return all node schemas |
| `HealthCheck(HealthCheckRequest) -> HealthCheckResponse` | Report health and version |

**Security**: The engine strips sensitive environment variables (DATABASE_URL, API tokens, encryption keys) before launching plugin processes. Plugins only receive `ORBFLOW_PLUGIN_PORT` and non-sensitive env vars.

## Development

### Setup

```bash
git clone https://github.com/orbflow-dev/orbflow-python.git
cd orbflow-python
pip install -e ".[dev]"
```

### Commands

```bash
pytest                    # Run tests
pytest --cov=orbflow_sdk  # Run tests with coverage
pytest -x                 # Stop on first failure
pytest -k test_fields     # Run specific test file
```

### Project Structure

```
src/orbflow_sdk/
├── __init__.py            # Public API exports
├── decorators.py          # @plugin, @action, @trigger decorators
├── fields.py              # Field builder (Field.string, Field.number, etc.)
├── types.py               # Core type definitions (FieldDef, ExecuteContext, etc.)
├── server.py              # gRPC server (grpcio)
├── _proto_codec.py        # Hand-rolled protobuf encoder/decoder
├── schema.py              # Node schema building for GetSchemas RPC
├── convert.py             # Proto <-> SDK type conversion helpers
├── subprocess_runner.py   # Legacy subprocess mode runner
└── gen/                   # Generated protobuf stubs

tests/                     # pytest tests (8 test files)
docs/
├── getting-started.md     # Getting started guide
├── cookbook.md             # Common patterns and recipes
└── reference.md           # Full API reference
```

### Testing

```python
import pytest
from orbflow_sdk.types import ExecuteContext
from main import MyPlugin

@pytest.mark.asyncio
async def test_greet():
    plugin = MyPlugin()
    ctx = ExecuteContext(
        instance_id="test", node_id="n1", plugin_ref="plugin:greet", attempt=0,
        input={"name": "World"},
    )
    result = await plugin.greet(ctx)
    assert result["greeting"] == "Hello, World!"
```

## License

[Apache-2.0](LICENSE)
