Metadata-Version: 2.4
Name: artificer-dispatcher
Version: 0.2.4
Summary: Polls project management APIs for ready tickets and spawns AI agents to work on them
Author-email: Scott <me@scottrussell.net>
License-Expression: MIT
License-File: LICENSE
Keywords: agent,ai,automation,dispatcher
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Build Tools
Requires-Python: >=3.13
Requires-Dist: starlette>=0.30.0
Requires-Dist: uvicorn>=0.20.0
Description-Content-Type: text/markdown

# artificer-dispatcher

Polls task queues, dispatches agent subprocesses, and exposes an HTTP API so agents can interact with tasks without knowing which backend is in use.

## How it works

The router polls configured queues for ready tasks. When it finds one, it moves the task to an in-progress queue, spawns a subprocess (any command), and passes task details to the agent. The subprocess uses a local HTTP API to read task details, post comments, update fields, and move the task when done. The agent never talks to the backend directly.

## Key concepts

- **Backend adapters** — Protocol-based (`TaskAdapter`, 11 methods). Ships with a JSON file adapter. A Planka adapter is included as a user-land example (`planka_backend.py`). Implement the protocol for anything else (Jira, Trello, Linear, SQLite, SQS, etc.).
- **Agent adapters** — Protocol-based (`AgentAdapter`). Ships with a Claude adapter (session tracking, resume hints) and a default pass-through for any command.
- **Routes** — Flask-style `@dispatcher.route()` decorators map queues to prompt-generating functions.
- **HTTP API** — Agents hit localhost. No credentials, no backend coupling.

## Quick start

Requires Python 3.13+.

```sh
uv pip install -e .          # preferred (pip install -e . also works)
uv pip show artificer-dispatcher  # verify the install succeeded
```

Create a Python script (e.g. `run.py`):

```python
from artificer import AgentDispatcher, JsonFileAdapter

dispatcher = AgentDispatcher(
    command="claude",
    poll_interval=30,
    agent_timeout=600,
    max_concurrent_agents=3,
    queue_backend=JsonFileAdapter("/tmp/board.json"),
)

@dispatcher.route(
    args=["--agent", "engineer", "-p"],
    queue_name="Todo",
    in_progress_queue="In Progress",
)
def engineer_agent(task_id: str, task_name: str) -> str:
    return f"Work on task {task_id}: {task_name}."

if __name__ == "__main__":
    dispatcher.run(debug=True)  # enable DEBUG logging (default: False)
```

> For Planka users, see `planka_backend.py` at the repo root for a ready-made backend.

```sh
python run.py
```

This starts two things:

1. **Router** — polls configured queues, picks up tasks, moves them to in-progress, and spawns agent subprocesses.
2. **HTTP API** — listens on `http://{api_host}:{api_port}` so spawned agents can interact with tasks.

## Configuration reference

All configuration is done via the `AgentDispatcher` constructor.

### Constructor arguments

| Argument | Type | Default | Description |
|---|---|---|---|
| `command` | `str` | *(required)* | Base command to run for all routes (e.g. `"claude"`) |
| `poll_interval` | `int` | `30` | Seconds between polls |
| `agent_timeout` | `int \| None` | `None` | Default timeout in seconds for all agents |
| `max_concurrent_agents` | `int` | `3` | Max agent processes at once |
| `api_host` | `str` | `"127.0.0.1"` | HTTP API bind address |
| `api_port` | `int` | `8000` | HTTP API port |
| `queue_backend` | `TaskAdapter \| None` | `None` | Task backend (required before calling `run()`). Any object with a `create_adapter()` method also works. |
| `agent_adapters` | `dict[str, AgentAdapter] \| None` | `None` | Custom agent adapters by command name |
| `enable_queue_management` | `bool` | `False` | Enable queue CRUD HTTP endpoints |

### Route decorator

The `@dispatcher.route()` decorator registers a queue-to-command mapping. The decorated function receives `(task_id, task_name)` and returns a prompt string appended to the command arguments.

```python
@dispatcher.route(
    queue_name="My Project.My Board.Todo",          # required: queue to poll
    in_progress_queue="My Project.My Board.WIP",    # default: "In Progress"
    args=["--agent", "engineer", "-p"],              # extra args before prompt
    timeout=1800,                                    # route-specific timeout (optional)
    poll_interval=10,                                # route-specific poll interval (optional)
    priority=1,                                      # dispatch priority (optional)
)
def my_agent(task_id: str, task_name: str) -> str:
    return f"Work on task {task_id}: {task_name}."
```

### Agent timeouts

You can configure timeouts to automatically terminate agent processes that run too long:

- **`agent_timeout`** (constructor): Sets a global timeout in seconds for all agents. If not specified, agents run indefinitely.
- **`timeout`** (per-route): Sets a route-specific timeout in seconds. Overrides `agent_timeout` for that route.

When an agent times out:
1. The process receives a TERM signal and has 5 seconds to exit gracefully
2. If it doesn't exit, it receives a KILL signal
3. A comment is added to the task noting the timeout

```python
dispatcher = AgentDispatcher(
    command="my-agent",
    agent_timeout=3600,  # 1 hour default for all agents
    queue_backend=my_backend,
)

@dispatcher.route(
    queue_name="Quick Tasks",
    timeout=300,  # 5 minutes for quick tasks (overrides default)
)
def quick(task_id, task_name):
    return f"Handle {task_id}"

@dispatcher.route(queue_name="Long Tasks")
# No timeout — uses default of 3600 seconds
def long_running(task_id, task_name):
    return f"Handle {task_id}"
```

### Route priority

When `max_concurrent_agents` is limited, routes with lower `priority` values are dispatched first. This lets you ensure downstream queues (closer to completion) are serviced before upstream ones, so a task flows all the way through a pipeline before new work begins.

Routes without an explicit `priority` use their registration order as a tiebreaker.

```python
@dispatcher.route(queue_name="QA",          priority=1)  # serviced first
def qa(task_id, task_name):
    return f"Review {task_id}"

@dispatcher.route(queue_name="Engineering", priority=2)
def eng(task_id, task_name):
    return f"Implement {task_id}"

@dispatcher.route(queue_name="Todo",        priority=3)  # serviced last
def todo(task_id, task_name):
    return f"Handle {task_id}"
```

### Per-queue poll intervals

By default, all queues are polled at the global `poll_interval` rate. You can override this per-route to poll high-priority queues more frequently or low-priority queues less often:

The router's internal tick rate automatically adjusts to the shortest configured interval, so no queue is ever starved.

```python
dispatcher = AgentDispatcher(
    command="my-agent",
    poll_interval=60,  # default for all queues
    queue_backend=my_backend,
)

@dispatcher.route(queue_name="High Priority", poll_interval=10)
def urgent(task_id, task_name):
    return f"Handle {task_id}"

@dispatcher.route(queue_name="Background", poll_interval=1800)
def background(task_id, task_name):
    return f"Handle {task_id}"

@dispatcher.route(queue_name="Normal")
# No poll_interval — uses global default of 60s
def normal(task_id, task_name):
    return f"Handle {task_id}"
```

## HTTP API

| Method | Endpoint | Description |
|---|---|---|
| `GET` | `/tasks/{task_id}` | Full task info: description, labels, assignees, comments |
| `POST` | `/tasks/{task_id}/comments` | Post a comment on a task (`{"comment": "text"}`) |
| `POST` | `/tasks/{task_id}/move` | Move a task to a different queue (`{"target_queue": "name"}`) |
| `PATCH` | `/tasks/{task_id}` | Update task fields (`{"name": "...", "description": "...", "labels": [...], "assignees": [...]}`) |
| `POST` | `/tasks` | Create a new task (`{"queue_name": "...", "name": "...", "description": "..."}`) |
| `GET` | `/queues` | List all queues with task counts |
| `GET` | `/queues/{queue_name}` | Get details for a specific queue |
| `POST` | `/queues` | Create a new queue (`{"name": "..."}`) |
| `PATCH` | `/queues/{queue_name}` | Update/rename a queue (`{"name": "..."}`) |
| `DELETE` | `/queues/{queue_name}` | Delete an empty queue |
| `GET` | `/status` | Router status: active agents, available slots |

## Task lifecycle

1. Task sits in a watched queue (e.g. `Todo`)
2. Router picks it up, moves it to the in-progress queue, and assigns the authenticated user
3. Router spawns the configured command as a subprocess
4. The agent uses the HTTP API to read task details, add comments, etc.
5. When finished, the agent calls the move endpoint to move the task to a done queue

## Backends

### Planka

The Planka backend is provided as a user-land file (`planka_backend.py` at the repo root), not as part of the library. It requires `plankapy>=2.3.0` to be installed separately:

```sh
uv pip install plankapy>=2.3.0
```

Uses dot-notation for queue naming: `Project.Board.List`.

```python
from artificer import AgentDispatcher
from planka_backend import PlankaBackend

dispatcher = AgentDispatcher(
    command="my-agent",
    queue_backend=PlankaBackend(url="http://localhost:1337"),
)

@dispatcher.route(
    queue_name="My Project.My Board.Todo",
    in_progress_queue="My Project.My Board.In Progress",
    args=["-p"],
)
def handle(task_id: str, task_name: str) -> str:
    return f"Work on task {task_id}: {task_name}"
```

#### Planka authentication

Credentials can be passed directly as kwargs or resolved from environment variables:

```python
# Option 1: API token (kwarg)
PlankaBackend(url="http://localhost:3000", token="your-token-here")

# Option 2: Username + password (kwargs)
PlankaBackend(url="http://localhost:3000", username="admin", password="secret")

# Option 3: Environment variables (default when no kwargs are given)
# PLANKA_TOKEN=your-token-here
# — or —
# PLANKA_USER=admin  +  PLANKA_PASSWORD=secret
PlankaBackend(url="http://localhost:3000")
```

Credentials are resolved at `dispatcher.run()` time, not at import time. If you use `.env` files, call `dotenv.load_dotenv()` in your script before `dispatcher.run()`.

### JSON file

For development/testing or lightweight use without external services.

```python
from artificer import AgentDispatcher, JsonFileAdapter

dispatcher = AgentDispatcher(
    command="my-agent",
    queue_backend=JsonFileAdapter("/tmp/board.json"),
)
```

The JSON file structure:

```json
{
  "queues": {
    "Todo": [
      {"id": "1", "name": "Fix crash", "description": "...", "labels": [], "assignees": [], "comments": [], "tasks": []}
    ],
    "In Progress": [],
    "Done": []
  }
}
```

### Custom

Implement the `TaskAdapter` protocol (11 methods) in `artificer/adapters/base.py`:

- `get_ready_tasks(queue_names)` — Return tasks from the given queues
- `get_task(task_id)` — Return a single task by ID
- `move_task(task_id, target_queue)` — Move a task between queues
- `add_comment(task_id, text)` — Add a comment to a task
- `update_task(task_id, *, assignees, name, description, labels)` — Update task fields
- `create_task(queue_name, name, description)` — Create a new task
- `list_queues()` — List all queues with task counts
- `get_queue(queue_name)` — Get a single queue's info
- `create_queue(queue_name)` — Create a new empty queue
- `update_queue(queue_name, *, new_name)` — Rename a queue
- `delete_queue(queue_name)` — Delete an empty queue

Pass your custom adapter directly to the constructor:

```python
dispatcher = AgentDispatcher(
    command="my-agent",
    queue_backend=MyCustomAdapter(),
)
```

## Development

```sh
uv pip install -e ".[dev]"   # pip install -e ".[dev]" also works
pytest
```
