Metadata-Version: 2.4
Name: administrate
Version: 0.1.0
Summary: Python SDK for the Administrate.dev REST API
Project-URL: Homepage, https://administrate.dev
Project-URL: Documentation, https://administrate.dev/docs
Project-URL: Repository, https://github.com/administrate-dev/administrate-python
Author-email: Administrate <support@administrate.dev>
License-Expression: MIT
License-File: LICENSE
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Typing :: Typed
Requires-Python: >=3.9
Requires-Dist: httpx<1,>=0.25.0
Requires-Dist: pydantic<3,>=2.0.0
Provides-Extra: dev
Requires-Dist: mypy>=1.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.21; extra == 'dev'
Requires-Dist: pytest>=7.0; extra == 'dev'
Requires-Dist: respx>=0.21; extra == 'dev'
Requires-Dist: ruff>=0.1; extra == 'dev'
Description-Content-Type: text/markdown

# Administrate Python SDK

The official Python SDK for the [Administrate.dev](https://administrate.dev) REST API.

[Administrate.dev](https://administrate.dev) is a monitoring and management platform for AI agencies running n8n and AI automation workflows across multiple clients. It provides a single dashboard to track every workflow, every client, every failure, and all LLM costs, so you can catch problems before clients do and prove the value of your automations.

**Key platform features:**

- **Multi-instance monitoring** — See all n8n instances across every client in one place
- **Error tracking** — Real-time failure detection with automatic error categorization
- **LLM cost tracking** — Connect OpenAI, Anthropic, Azure, and OpenRouter accounts to attribute costs to specific clients
- **Workflow insights** — Execution counts, success rates, and time-saved ROI reporting
- **Sync health** — Know instantly when a data sync fails
- **Webhooks & API** — Full programmatic access for custom integrations

## Installation

```bash
pip install administrate
```

Requires Python 3.9+.

## Quick start

```python
from administrate import Administrate

client = Administrate(api_key="sk_live_...")

# Get account info
account = client.account.get()
print(account.name, account.plan)

# List all clients with auto-pagination
for c in client.clients.list():
    print(c.name, c.n8n_instances_count)

# Check for failed executions across all instances
for execution in client.executions.list(errors_only=True):
    print(f"{execution.workflow_name}: {execution.error_category}")

# Get LLM cost summary
costs = client.llm_costs.summary()
print(f"Total: ${costs.data.summary.total_cost_cents / 100:.2f}")
```

### Async usage

```python
from administrate import AsyncAdministrate

async with AsyncAdministrate(api_key="sk_live_...") as client:
    account = await client.account.get()

    async for workflow in client.workflows.list(active=True):
        print(workflow.name, workflow.is_active)
```

Every resource method available on `Administrate` has an identical async counterpart on `AsyncAdministrate`.

## Configuration

```python
from administrate import Administrate

client = Administrate(
    api_key="sk_live_...",        # Required. Must start with "sk_live_"
    base_url="https://...",       # Default: "https://app.administrate.dev"
    timeout=30.0,                 # Request timeout in seconds. Default: 30
    max_retries=3,                # Retry attempts for failed requests. Default: 3
)
```

The client can be used as a context manager to ensure the underlying HTTP connection is closed:

```python
with Administrate(api_key="sk_live_...") as client:
    account = client.account.get()
# Connection closed automatically

# Or close manually
client = Administrate(api_key="sk_live_...")
# ... use client ...
client.close()
```

You can also pass a pre-configured `httpx.Client` (or `httpx.AsyncClient` for async) if you need full control over the HTTP layer:

```python
import httpx

http_client = httpx.Client(proxy="http://proxy:8080")
client = Administrate(api_key="sk_live_...", http_client=http_client)
```

## API reference

All API keys are created in **Settings > Developers** within Administrate.dev. Tokens have three permission levels: `read`, `write`, and `full`.

### Account

```python
# Get current token info and account summary
me = client.account.me()
print(me.token.name, me.token.permission)
print(me.account.name, me.account.plan)

# Get full account details
account = client.account.get()

# Update account settings
account = client.account.update(
    name="My Agency",
    billing_email="billing@example.com",
    timezone="Australia/Brisbane",
)
```

### Clients

Clients represent the companies you manage automations for.

```python
# List all clients (auto-paginates)
for c in client.clients.list():
    print(c.name, c.code)

# Get a client (includes 7-day metrics)
c = client.clients.get("com_abc123")
print(c.metrics.success_rate, c.metrics.time_saved_minutes)

# Create a client
c = client.clients.create(
    name="Acme Corp",
    code="acme",
    contact_email="ops@acme.com",
    timezone="America/New_York",
)

# Update a client
c = client.clients.update("com_abc123", notes="Enterprise tier")

# Delete a client (requires full permission)
client.clients.delete("com_abc123")
```

### Instances

Instances are n8n deployments connected to Administrate.

```python
# List all instances
for inst in client.instances.list():
    print(inst.name, inst.sync_status)

# Filter by client or sync status
for inst in client.instances.list(client_id="com_abc123", sync_status="error"):
    print(inst.name, inst.last_sync_error)

# Get an instance (includes 7-day metrics)
inst = client.instances.get("n8n_abc123")
print(inst.metrics.executions_count, inst.metrics.success_rate)

# Connect a new n8n instance
inst = client.instances.create(
    client_id="com_abc123",
    name="Production n8n",
    base_url="https://n8n.acme.com",
    api_key="n8n_api_key_here",
)

# Trigger a sync
result = client.instances.sync("n8n_abc123", sync_type="all")

# Sync all instances at once
result = client.instances.sync_all(sync_type="workflows")

# Update an instance
inst = client.instances.update("n8n_abc123", name="Staging n8n")

# Delete an instance
client.instances.delete("n8n_abc123")
```

### Workflows

```python
# List workflows with filters
for wf in client.workflows.list(client_id="com_abc123", active=True):
    print(wf.name, wf.is_active)

# Search by name
for wf in client.workflows.list(search="onboarding"):
    print(wf.name)

# Get a workflow (includes 7-day metrics)
wf = client.workflows.get("wfl_abc123")
print(wf.metrics.success_rate, wf.metrics.time_saved_minutes)

# Set time-saved estimates (for ROI reporting)
wf = client.workflows.update(
    "wfl_abc123",
    minutes_saved_per_success=15,
    minutes_saved_per_failure=5,
)
```

### Executions

Executions are read-only records of workflow runs synced from n8n.

```python
# List executions with filters
for ex in client.executions.list(
    client_id="com_abc123",
    status="failed",
    start_date="2025-01-01",
    end_date="2025-01-31",
):
    print(ex.workflow_name, ex.status, ex.duration_ms)

# Get only errors
for ex in client.executions.list(errors_only=True):
    print(ex.error_category, ex.workflow_name)

# Get execution details (includes error message and payload)
ex = client.executions.get("exe_abc123")
print(ex.error_message)
print(ex.error_payload)
```

### Sync runs

```python
# List sync run history
for run in client.sync_runs.list(instance_id="n8n_abc123", status="failed"):
    print(run.sync_type, run.status, run.duration_seconds)

# Get a specific sync run
run = client.sync_runs.get("syn_abc123")

# Get sync health across all instances
for entry in client.sync_runs.health():
    print(entry.instance_name, entry.sync_status)
    print(f"  Workflows last synced: {entry.workflows.last_synced_at}")
    print(f"  Executions last synced: {entry.executions.last_synced_at}")
```

### Users

```python
# List team members
for user in client.users.list():
    print(user.name, user.email, user.role)

# Get a user
user = client.users.get("usr_abc123")

# Invite a new team member
invitation = client.users.invite(email="new@example.com", role="member")
print(invitation.expires_at)

# Change a user's role
user = client.users.update("usr_abc123", role="admin")

# Remove a user
client.users.delete("usr_abc123")
```

### Webhooks

```python
# List webhooks
for wh in client.webhooks.list():
    print(wh.url, wh.events, wh.enabled)

# Create a webhook
wh = client.webhooks.create(
    url="https://example.com/hook",
    events=["execution.failed", "sync.failed"],
    description="Slack failure alerts",
)
print(wh.secret)  # Save this — used to verify webhook signatures

# Update a webhook
wh = client.webhooks.update("whk_abc123", enabled=False)

# Regenerate the signing secret (old secret becomes invalid immediately)
wh = client.webhooks.regenerate_secret("whk_abc123")
print(wh.secret)

# Delete a webhook
client.webhooks.delete("whk_abc123")
```

### API tokens

```python
# List all tokens
for token in client.api_tokens.list():
    print(token.name, token.permission, token.token_hint)

# Create a token (the plain token is only returned once)
token = client.api_tokens.create(
    name="CI/CD Pipeline",
    permission="read",
    ip_allowlist=["10.0.0.0/8"],
    expires_in="90_days",
)
print(token.token)  # sk_live_... — save this immediately

# Update a token
token = client.api_tokens.update("tok_abc123", name="Updated Name")

# Revoke a token
client.api_tokens.delete("tok_abc123")
```

### LLM providers

Connect your AI provider accounts to track costs.

```python
# List providers
for provider in client.llm_providers.list():
    print(provider.name, provider.provider_type, provider.sync_status)

# Get a provider (includes 7-day metrics)
provider = client.llm_providers.get("llm_abc123")
print(provider.metrics.total_cost_cents, provider.metrics.total_tokens)

# Connect a new provider
provider = client.llm_providers.create(
    name="OpenAI Production",
    provider_type="openai",  # openai, anthropic, openrouter, or azure
    api_key="sk-...",
    organization_id="org-...",
)

# Trigger a cost sync
client.llm_providers.sync("llm_abc123")

# Update a provider
provider = client.llm_providers.update("llm_abc123", name="OpenAI Staging")

# Delete a provider
client.llm_providers.delete("llm_abc123")
```

### LLM projects

Projects are discovered automatically when syncing a provider. Assign them to clients to attribute costs.

```python
# List projects for a provider
for project in client.llm_projects.list("llm_abc123"):
    print(project.name, project.total_cost_cents, project.client_name)

# Assign a project to a client
project = client.llm_projects.update(
    "llm_abc123", "proj_456", client_id="com_abc123"
)
```

### LLM costs

```python
# Get cost summary (defaults to last 7 days)
costs = client.llm_costs.summary()
print(f"Total: ${costs.data.summary.total_cost_cents / 100:.2f}")
print(f"Tokens: {costs.data.summary.total_tokens:,}")

# Breakdown by provider
for p in costs.data.providers:
    print(f"  {p.name}: ${p.cost_cents / 100:.2f}")

# Breakdown by model
for m in costs.data.models:
    print(f"  {m.model}: ${m.cost_cents / 100:.2f}")

# Daily trend
for day in costs.data.daily:
    print(f"  {day.date}: ${day.cost_cents / 100:.2f}")

# Custom date range
costs = client.llm_costs.summary(
    start_date="2025-01-01",
    end_date="2025-01-31",
)

# Costs by client
for entry in client.llm_costs.by_client().data:
    print(f"{entry.name}: ${entry.cost_cents / 100:.2f}")

# Costs by provider
for entry in client.llm_costs.by_provider().data:
    print(f"{entry.name}: ${entry.cost_cents / 100:.2f}")
```

## Pagination

All `.list()` methods return an iterator that handles pagination automatically. By default, the API returns 25 items per page (max 100).

```python
# Auto-paginate through all results
for c in client.clients.list():
    print(c.name)

# Control page size
for c in client.clients.list(per_page=100):
    print(c.name)

# Get a single page
page = client.clients.list(per_page=10).first_page()
print(page.meta.total, page.meta.total_pages)
for c in page:
    print(c.name)
```

Async iteration works the same way:

```python
async for c in async_client.clients.list():
    print(c.name)
```

## Error handling

The SDK raises typed exceptions for all API errors:

```python
from administrate import (
    Administrate,
    APIError,
    AuthenticationError,
    NotFoundError,
    RateLimitError,
    ValidationError,
)

client = Administrate(api_key="sk_live_...")

try:
    c = client.clients.get("com_nonexistent")
except NotFoundError as e:
    print(f"Not found: {e.message}")
except AuthenticationError:
    print("Invalid API key")
except RateLimitError as e:
    print(f"Rate limited. Retry after {e.retry_after}s")
except ValidationError as e:
    print(f"Invalid params: {e.body}")
except APIError as e:
    print(f"API error {e.status_code}: {e.message}")
```

**Exception hierarchy:**

| Exception | Status code | Description |
|---|---|---|
| `AdministrateError` | — | Base exception for all SDK errors |
| `APIError` | Any non-2xx | Base for all HTTP API errors |
| `AuthenticationError` | 401 | Invalid or missing API key |
| `PermissionDeniedError` | 403 | Insufficient token permissions |
| `NotFoundError` | 404 | Resource does not exist |
| `ValidationError` | 422 | Invalid request parameters |
| `RateLimitError` | 429 | Rate limit exceeded (has `retry_after`) |
| `InternalServerError` | 5xx | Server-side error |
| `ConnectionError` | — | Failed to connect to the API |
| `TimeoutError` | — | Request timed out |

All `APIError` subclasses expose `status_code`, `response` (the raw `httpx.Response`), and `body` (parsed JSON or text).

## Retries

The SDK automatically retries failed requests with exponential backoff:

- **429 (rate limited)** — Retries after the duration specified in the `Retry-After` header
- **5xx (server errors)** — Retries with exponential backoff (0.5s, 1s, 2s, ...)
- **Connection errors and timeouts** — Retried with the same backoff schedule

By default, the SDK retries up to 3 times. Set `max_retries=0` to disable:

```python
client = Administrate(api_key="sk_live_...", max_retries=0)
```

## Requirements

- Python 3.9+
- [httpx](https://www.python-httpx.org/) >= 0.25.0
- [pydantic](https://docs.pydantic.dev/) >= 2.0.0

## License

MIT
