Metadata-Version: 2.4
Name: dubsof
Version: 0.2.1
Summary: Python SDK for the Atrium platform by Dubsof
Author-email: Dubsof <hello@dubsof.com>
License: MIT
Keywords: atrium,dubsof,sdk,automation,workflow,api
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: Intended Audience :: Information Technology
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: POSIX
Classifier: Operating System :: MacOS :: MacOS X
Classifier: Operating System :: Microsoft :: Windows
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: Internet :: WWW/HTTP
Classifier: Topic :: System :: Networking
Classifier: Framework :: AsyncIO
Classifier: Typing :: Typed
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: betterproto>=0.9.0
Requires-Dist: websockets>=12.0
Requires-Dist: httpx>=0.27
Provides-Extra: dev
Requires-Dist: pytest>=8.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.23; extra == "dev"
Requires-Dist: respx>=0.21; extra == "dev"
Dynamic: license-file

# dubsof-sdk

Python SDK for [Atrium](https://dubsof.com/) by Dubsof, the platform for workflow automation, AI composition, and service orchestration.

---

## Install

```sh
pip install dubsof
# or with uv
uv pip install dubsof
```

**Requirements:** Python ≥ 3.10, `websockets`, `httpx`, `betterproto`.

---

## Quick start

```python
import asyncio
import dubsof
from dubsof import credentials
from dubsof.services import run

dubsof.initialize_app(
    credentials.APIKey("atk_..."),
    url="http://localhost:8080",
    client_id="my-service",
)

@run.tool("search_crm", description="Search CRM contacts by keyword")
def search_crm(query: str, tenant_id: str = "default") -> dict:
    return {"results": db.search(query)}

@run.tool("send_email", description="Send a transactional email")
def send_email(to: str, subject: str, body: str, tenant_id: str = "default") -> dict:
    mailer.send(to, subject, body)
    return {"sent": True}

asyncio.run(run.connect())   # blocks — reconnects automatically
```

`run.connect()` completes the WebSocket handshake, registers all decorated tools with the server, then serves tool calls indefinitely. It reconnects with exponential backoff (1 s → 60 s cap) on disconnect.

---

## Typed return values

All HTTP client methods return typed dataclass objects instead of raw dicts. Models are importable directly from each service module:

```python
from dubsof.services.runtime import Deployment, Service, ProjectRef
from dubsof.services.compose import Project, Model, CheckResult, CompileResult, GenerateEvent, Diagnostic, DeploymentRef
from dubsof.services.run     import Workflow, Flow, Task, FlowDep, Trigger, Execution, ExecutionPage, Tool, AuditEvent, AuditPage, FireResult
```

All models are `@dataclass(slots=True)` — lightweight, no external dependencies, and IDE-friendly with full autocompletion.

---

## Configuration with pydantic-settings

```python
import dubsof
from dubsof import credentials
from pydantic_settings import BaseSettings, SettingsConfigDict

class Settings(BaseSettings):
    model_config = SettingsConfigDict(env_prefix="ATRIUM_")
    api_key:   str
    url:       str = "http://localhost:8080"
    client_id: str = "my-service"

settings = Settings()   # reads ATRIUM_API_KEY, ATRIUM_URL, ATRIUM_CLIENT_ID

dubsof.initialize_app(
    credentials.APIKey(settings.api_key),
    url=settings.url,
    client_id=settings.client_id,
)
```

---

## Credentials

```python
from dubsof import credentials

# API key — create one in Settings → API Keys
cred = credentials.APIKey("atk_...")
```

`APIKey` is the only credential type today. The `Credentials` base class is designed for future additions (service accounts, OAuth) without changing call sites.

---

## Tool registration

```python
from dubsof.services import run

@run.tool(
    "tool_id",                      # stable ID registered with the server
    description="What this does",   # shown to the LLM during planning
    schema={"type": "object", ...}, # optional JSON Schema override
)
def my_tool(arg: str, tenant_id: str = "default") -> dict:
    ...
```

- `tenant_id` is injected automatically by the server — always accept it as a keyword argument with a default.
- The return value must be JSON-serialisable.
- Both sync and async handlers are supported.

Tools spread across modules are registered at import time. Import them before calling `run.connect()`:

```python
# main.py
import dubsof
from dubsof import credentials
dubsof.initialize_app(credentials.APIKey("atk_..."), url="...", client_id="my-service")

import tools.crm    # registers @run.tool decorators
import tools.email

from dubsof.services import run
asyncio.run(run.connect())
```

---

## Waiting for connection

When bootstrap logic must run after the handshake completes:

```python
async def main():
    connect_task = asyncio.create_task(run.connect())
    await run.wait_connected(timeout=15.0)   # raises TimeoutError if server unreachable

    # Safe to use run.http here
    workflows = await run.http.list_workflows()

    await connect_task
```

---

## Firing events

```python
from dubsof.services import run

# Via WebSocket (low latency, requires active connection)
await run.fire_event(trigger_id, payload={"city": "London"})

# Via REST (works independently of WebSocket lifecycle)
await run.http.fire_event(trigger_id, payload={"city": "London"}, tenant_id="acme")
```

---

## REST API (`run.http`)

`run.http` is an `HTTPClient` instance. All methods are async, raise `httpx.HTTPStatusError` on non-2xx, and return typed model objects. 429 responses are retried automatically (up to 3×) after the server-specified `Retry-After` delay.

### Workflows

```python
wf  = await run.http.create_workflow("My workflow", description="...")
wfs = await run.http.list_workflows()   # list[Workflow]
wf  = await run.http.get_workflow(workflow_id)
      await run.http.update_workflow(workflow_id, name="New name")
      await run.http.delete_workflow(workflow_id)

print(wf.id, wf.name, wf.created_at)

# LLM-assisted generation from a natural language prompt
wf  = await run.http.generate_workflow("Monitor Slack and post a weather summary daily")
```

### Flows

```python
flow = await run.http.create_flow(
    workflow_id,
    title="Fetch and notify",
    description="Get weather then post to Slack",
    trigger_id=tid,
    tool_ids=[tool_id_a, tool_id_b],  # security scope — only these tools may run in this flow
)
flows = await run.http.list_flows(workflow_id)   # list[Flow]
print(flow.id, flow.title, flow.order, flow.task_count)
```

### Flow dependencies

```python
# flow_b will not start until flow_a completes
dep  = await run.http.add_flow_dep(workflow_id, flow_id=flow_b, depends_on_flow_id=flow_a)
      await run.http.remove_flow_dep(workflow_id, flow_id=flow_b, depends_on_flow_id=flow_a)
deps = await run.http.list_flow_deps(workflow_id)   # list[FlowDep]
print(dep.flow_id, dep.depends_on_flow_id)
```

### Tasks

```python
task = await run.http.create_task(
    workflow_id, flow_id,
    title="Get weather",
    description="Use weather.fetch to get current conditions for the target city.",
)
print(task.id, task.order, task.tool_calls)
```

### Triggers

```python
trigger  = await run.http.create_trigger(workflow_id, description="Manual fire")
triggers = await run.http.list_triggers(workflow_id)   # list[Trigger]
print(trigger.id, trigger.workflow_id)
```

### Executions

```python
page      = await run.http.list_executions(
    workflow_id=wf_id,
    status="running",   # pending | running | success | error
    limit=20,
)                       # ExecutionPage
execution = await run.http.get_execution(execution_id)   # Execution

for ex in page.items:
    print(ex.id, ex.status, ex.tenant_id)
print(page.next_cursor)   # opaque cursor for the next page, or None
```

### Audit log

```python
page  = await run.http.list_audit(product="run", limit=50)   # AuditPage
event = await run.http.get_audit_event(event_id)             # AuditEvent

for e in page.items:
    print(e.event_type, e.actor, e.resource_id, e.created_at)
```

### Tools

```python
tools  = await run.http.list_tools(client_id="my-service")   # list[Tool]
active = await run.http.list_tools(active_only=True)
print(tools[0].name, tools[0].schema, tools[0].active)
```

### Rate limits

```python
limits = await run.http.get_rate_limit()
# {"tier": "pro", "limit_minute": 300, "used_minute": 12, ...}
```

Rate limit headers (`X-RateLimit-*`) are also returned on every API response.

---

## Compose API (`compose.http`)

`compose.http` provides access to Atrium Compose — project management, AI generation, and TSL compilation.

```python
from dubsof.services import compose

# Projects
projects = await compose.http.list_projects()           # list[Project]
project  = await compose.http.create_project("My API", prompt="A REST API for managing orders")
project  = await compose.http.get_project(project_id)  # includes graph when available
           await compose.http.update_project(project_id, name="Order API")
           await compose.http.delete_project(project_id)

print(project.id, project.name, project.has_output)
if project.deployment:
    print(project.deployment.id, project.deployment.status)

# Inspect and validate the project's current output
graph  = await compose.http.parse_project(project_id)   # dict: {"ok", "graph", "types"}
check  = await compose.http.check_project(project_id)   # CheckResult
result = await compose.http.compile_project(project_id) # CompileResult

print("OK" if check.ok else "errors", len(check.diagnostics), "diagnostic(s)")
for d in check.diagnostics:
    print(d.message)

print("Services:", [s.name for s in result.services])
for e in result.errors:
    print(e.message)

# Download artefacts
zip_bytes  = await compose.http.export_project(project_id)    # compiled source ZIP
spec_bytes = await compose.http.download_openapi(project_id)  # OpenAPI JSON or ZIP
```

### AI Generation

```python
# Stream generation events in real time
async for event in compose.http.generate_project(project_id, "A REST API for managing orders"):
    if event.event == "update":
        print(event.data["phase"], event.data["status"])
    elif event.event == "build_done":
        print("Services:", [s["name"] for s in event.data.get("services", [])])
    elif event.event == "error":
        raise RuntimeError(event.data["error"])
    elif event.event == "done":
        break

# Fetch the completed project after the stream ends
project = await compose.http.get_project(project_id)
```

Generation is streamed as Server-Sent Events. The project is updated in the database as generation progresses.

### Models and usage

```python
models = await compose.http.list_models()   # list[Model]
usage  = await compose.http.get_usage()     # dict

for m in models:
    print(m.id, m.label)
```

---

## Runtime API (`runtime.http`)

`runtime.http` provides access to Atrium Runtime — deploy and manage Compose project services.

```python
from dubsof.services import runtime

# Deploy a Compose project
dep = await runtime.http.deploy_project(project_id)   # Deployment
print(dep.id, dep.status)          # dep-a1b2c3d4  creating
print(dep.project.id, dep.project.name)

# Stream until live (polls every 3s, times out after 600s)
async for status in runtime.http.stream_deployment(dep.id):
    print(status.status, status.urls)
# live  {'hello_world': 'https://hello-world.dubsof.app'}

# List all deployments for the org
deployments = await runtime.http.list_deployments()   # list[Deployment]
for d in deployments:
    print(d.project.name, d.status, d.urls)
    for svc in d.services:
        print(" ", svc.name, svc.type)

# Redeploy (creates a new deployment record)
new_dep = await runtime.http.redeploy(dep.id)
```

### Scopes required

| Method | Scope |
|--------|-------|
| `list_deployments`, `get_deployment`, `stream_deployment` | `runtime:deployments:read` |
| `deploy_project`, `redeploy` | `runtime:deployments:write` |

---

## Running tests

```sh
uv pip install -e ".[dev]"
pytest
```

Integration tests require a running Atrium server. Set `ATRIUM_URL` and `ATRIUM_API_KEY` before running them:

```sh
ATRIUM_URL=http://localhost:8080 ATRIUM_API_KEY=atk_... pytest tests/test_integration.py
```
