Metadata-Version: 2.4
Name: dubsof
Version: 0.1.1
Summary: Python SDK for the Atrium platform by Dubsof
Author-email: Dubsof <hello@dubsof.com>
License: MIT
Keywords: atrium,dubsof,sdk,automation,workflow,api
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: Intended Audience :: Information Technology
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: POSIX
Classifier: Operating System :: MacOS :: MacOS X
Classifier: Operating System :: Microsoft :: Windows
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: Internet :: WWW/HTTP
Classifier: Topic :: System :: Networking
Classifier: Framework :: AsyncIO
Classifier: Typing :: Typed
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: betterproto>=0.9.0
Requires-Dist: websockets>=12.0
Requires-Dist: httpx>=0.27
Provides-Extra: dev
Requires-Dist: pytest>=8.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.23; extra == "dev"
Requires-Dist: respx>=0.21; extra == "dev"
Dynamic: license-file

# dubsof-sdk

Python client SDK for the [Dubsof platform](https://git.intra.dubsof.com/dubsof/atrium). Connects external services to **Atrium Run** — registers tools over WebSocket and serves tool calls dispatched by the workflow execution engine.

---

## Install

```sh
pip install -e .
# or with uv
uv pip install -e .
```

**Requirements:** Python ≥ 3.10, `websockets`, `httpx`, `betterproto`.

---

## Quick start

```python
import asyncio
import dubsof
from dubsof import credentials
from dubsof.services import run

dubsof.initialize_app(
    credentials.APIKey("atk_..."),
    url="http://localhost:8080",
    client_id="my-service",
)

@run.tool("search_crm", description="Search CRM contacts by keyword")
def search_crm(query: str, tenant_id: str = "default") -> dict:
    return {"results": db.search(query)}

@run.tool("send_email", description="Send a transactional email")
def send_email(to: str, subject: str, body: str, tenant_id: str = "default") -> dict:
    mailer.send(to, subject, body)
    return {"sent": True}

asyncio.run(run.connect())   # blocks — reconnects automatically
```

`run.connect()` completes the WebSocket handshake, registers all decorated tools with the server, then serves tool calls indefinitely. It reconnects with exponential backoff (1 s → 60 s cap) on disconnect.

---

## Configuration with pydantic-settings

```python
import dubsof
from dubsof import credentials
from pydantic_settings import BaseSettings, SettingsConfigDict

class Settings(BaseSettings):
    model_config = SettingsConfigDict(env_prefix="ATRIUM_")
    api_key:   str
    url:       str = "http://localhost:8080"
    client_id: str = "my-service"

settings = Settings()   # reads ATRIUM_API_KEY, ATRIUM_URL, ATRIUM_CLIENT_ID

dubsof.initialize_app(
    credentials.APIKey(settings.api_key),
    url=settings.url,
    client_id=settings.client_id,
)
```

---

## Credentials

```python
from dubsof import credentials

# API key — create one in Settings → API Keys
cred = credentials.APIKey("atk_...")
```

`APIKey` is the only credential type today. The `Credentials` base class is designed for future additions (service accounts, OAuth) without changing call sites.

---

## Tool registration

```python
from dubsof.services import run

@run.tool(
    "tool_id",                      # stable ID registered with the server
    description="What this does",   # shown to the LLM during planning
    schema={"type": "object", ...}, # optional JSON Schema override
)
def my_tool(arg: str, tenant_id: str = "default") -> dict:
    ...
```

- `tenant_id` is injected automatically by the server — always accept it as a keyword argument with a default.
- The return value must be JSON-serialisable.
- Both sync and async handlers are supported.

Tools spread across modules are registered at import time. Import them before calling `run.connect()`:

```python
# main.py
import dubsof
from dubsof import credentials
dubsof.initialize_app(credentials.APIKey("atk_..."), url="...", client_id="my-service")

import tools.crm    # registers @run.tool decorators
import tools.email

from dubsof.services import run
asyncio.run(run.connect())
```

---

## Waiting for connection

When bootstrap logic must run after the handshake completes:

```python
async def main():
    connect_task = asyncio.create_task(run.connect())
    await run.wait_connected(timeout=15.0)   # raises TimeoutError if server unreachable

    # Safe to use run.http here
    workflows = await run.http.list_workflows()

    await connect_task
```

---

## Firing events

```python
from dubsof.services import run

# Via WebSocket (low latency, requires active connection)
await run.fire_event(trigger_id, payload={"city": "London"})

# Via REST (works independently of WebSocket lifecycle)
await run.http.fire_event(trigger_id, payload={"city": "London"}, tenant_id="acme")
```

---

## REST API (`run.http`)

`run.http` is an `HTTPClient` instance. All methods are async and raise `httpx.HTTPStatusError` on non-2xx. 429 responses are retried automatically (up to 3×) after the server-specified `Retry-After` delay.

### Workflows

```python
wf  = await run.http.create_workflow("My workflow", description="...")
wfs = await run.http.list_workflows()
wf  = await run.http.get_workflow(workflow_id)
      await run.http.update_workflow(workflow_id, name="New name")
      await run.http.delete_workflow(workflow_id)

# LLM-assisted generation from a natural language prompt
wf  = await run.http.generate_workflow("Monitor Slack and post a weather summary daily")
```

### Flows

```python
flow = await run.http.create_flow(
    workflow_id,
    title="Fetch and notify",
    description="Get weather then post to Slack",
    trigger_id=tid,
    tool_ids=[tool_id_a, tool_id_b],  # security scope — only these tools may run in this flow
)
flows = await run.http.list_flows(workflow_id)
```

### Flow dependencies

```python
# flow_b will not start until flow_a completes
await run.http.add_flow_dep(workflow_id, flow_id=flow_b, depends_on_flow_id=flow_a)
await run.http.remove_flow_dep(workflow_id, flow_id=flow_b, depends_on_flow_id=flow_a)
deps = await run.http.list_flow_deps(workflow_id)
```

### Tasks

```python
task = await run.http.create_task(
    workflow_id, flow_id,
    title="Get weather",
    description="Use weather.fetch to get current conditions for the target city.",
)
tasks = await run.http.list_tasks(workflow_id, flow_id)
```

### Triggers

```python
trigger  = await run.http.create_trigger(workflow_id, description="Manual fire")
triggers = await run.http.list_triggers(workflow_id)
```

### Executions

```python
result    = await run.http.list_executions(
    workflow_id=wf_id,
    status="running",   # pending | running | success | error
    limit=20,
)
execution = await run.http.get_execution(execution_id)
```

### Tools

```python
tools  = await run.http.list_tools(client_id="my-service")
active = await run.http.list_tools(active_only=True)
```

### Rate limits

```python
limits = await run.http.get_rate_limit()
# {"tier": "pro", "limit_minute": 300, "used_minute": 12, ...}
```

Rate limit headers (`X-RateLimit-*`) are also returned on every API response.

---

## Running tests

```sh
uv pip install -e ".[dev]"
pytest
```

Integration tests require a running Atrium server. Set `ATRIUM_URL` and `ATRIUM_API_KEY` before running them:

```sh
ATRIUM_URL=http://localhost:8080 ATRIUM_API_KEY=atk_... pytest tests/test_integration.py
```
