Metadata-Version: 2.4
Name: coordination
Version: 0.1.0
Summary: Queue/event agent communication layer for multi-agent systems
Author: Autonomous Ventures
License-Expression: MIT
Project-URL: Homepage, https://github.com/AutonomousVentures/coordination
Project-URL: Source, https://github.com/AutonomousVentures/coordination
Project-URL: BugTracker, https://github.com/AutonomousVentures/coordination/issues
Keywords: agent,communication,event-stream,task-queue,multi-agent,fastapi
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: Scientific/Engineering :: Artificial Intelligence
Requires-Python: >=3.12
Description-Content-Type: text/markdown
Requires-Dist: fastapi>=0.115.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: uvicorn[standard]>=0.30.0
Provides-Extra: dev
Requires-Dist: pytest>=8.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.21.0; extra == "dev"
Requires-Dist: httpx>=0.27.0; extra == "dev"
Requires-Dist: asgi-lifespan>=2.0.0; extra == "dev"

# Coordination

Queue/event agent communication layer for multi-agent systems.

## Overview

Coordination provides the messaging and task infrastructure for agent-to-agent
communication in multi-agent systems. It runs as an async FastAPI server with
SQLite-backed persistence, supporting:

- **Task queue** — priority-ordered task dispatch with claim/dead-letter/work-steal semantics
- **Event stream** — publish/subscribe event bus for inter-agent signals
- **Agent registry** — service discovery for agent capabilities and routing
- **Pre-execution memory** — strategic context recall before task assignment

The package is **lightweight by design** (~300 KB, ~1,800 LOC across 10 source files).
It uses FastAPI+Uvicorn for the async server, Pydantic for message validation,
and aiosqlite for persistence with zero external infrastructure.

## Components

| Module | Responsibility |
|--------|---------------|
| `task_queue.py` | Priority-ordered task queue with claim, release, dead-letter, and re-enqueue |
| `event_stream.py` | Pub/sub event bus for inter-agent communication |
| `agent_registry.py` | Agent capability discovery and health tracking |
| `pre_execution_memory.py` | Strategic context retrieval from prior task executions |
| `message.py` | Pydantic models for all wire formats |
| `storage.py` | Abstract storage interface |
| `sqlite_storage.py` | SQLite-backed persistence with aiosqlite |
| `config.py` | Server configuration from YAML/env |
| `server.py` | FastAPI application with all route handlers |
| `__main__.py` | CLI entry point: `python -m coordination` |

## Installation

```bash
pip install coordination
```

Or from source:

```bash
git clone <repo-url>
cd autonomous-ventures/coordination
pip install -e .
```

## Usage

Start the coordination server:

```bash
coordination
# or
python -m coordination
```

With custom config:

```bash
COORDINATION_HOST=0.0.0.0 COORDINATION_PORT=8080 coordination
```

The server serves:
- `POST /tasks` — enqueue a new task
- `GET /tasks/claim/{agent_id}` — claim next available task for an agent
- `POST /tasks/{task_id}/complete` — mark task as done
- `POST /tasks/{task_id}/fail` — mark task as failed
- `POST /events` — publish an event to the event bus
- `GET /events/{agent_id}` — poll events for an agent
- `GET /agents` — list registered agents
- `GET /health` — health check endpoint

## Python API

### Task Queue

```python
from coordination.task_queue import TaskQueue
from coordination.message import Task

queue = TaskQueue(storage=...)

# Enqueue a task with priority (0 = highest)
task = Task(queue="default", payload={"action": "research"}, priority=5)
await queue.enqueue(task)

# Claim the next available task for an agent
claimed = await queue.claim("agent-1")
```

### Event Stream

```python
from coordination.event_stream import EventStream

stream = EventStream(storage=...)

# Subscribe to event types
await stream.subscribe("agent-1", ["task.assigned", "agent.updated"])

# Publish an event (delivered to all matching subscribers)
await stream.publish({
    "type": "task.assigned",
    "source": "coordinator",
    "payload": {"task_id": "...", "assignee": "agent-1"}
})

# Poll for new events
events = await stream.poll("agent-1")
```

### Agent Registry

```python
from coordination.agent_registry import AgentRegistry

registry = AgentRegistry(storage=...)

# Register an agent with capabilities
await registry.register("agent-1", capabilities=["research", "summarize"])

# Find agents by capability
agents = await registry.find_by_capability("research")
```

## Configuration

Configure via environment variables:

| Variable | Default | Description |
|----------|---------|-------------|
| `COORDINATION_HOST` | `127.0.0.1` | Server bind address |
| `COORDINATION_PORT` | `8000` | Server port |
| `COORDINATION_DB_PATH` | `coordination.db` | SQLite database path |
| `COORDINATION_LOG_LEVEL` | `info` | Logging level |

## Architecture

```
┌──────────────────────────────────────────────┐
│              Coordination Server              │
│  ┌──────────┐ ┌──────────┐ ┌──────────────┐  │
│  │Task Queue│ │   Event  │ │Agent Registry│  │
│  │Priority  │ │   Stream │ │Capability    │  │
│  │+ Claim   │ │ Pub/Sub  │ │Discovery     │  │
│  └──────────┘ └──────────┘ └──────────────┘  │
│  ┌────────────────────────────────────────┐   │
│  │        SQLite Storage (aiosqlite)      │   │
│  └────────────────────────────────────────┘   │
│  ┌────────────────────────────────────────┐   │
│  │  Pre-Execution Memory (strategic ctx)  │   │
│  └────────────────────────────────────────┘   │
└──────────────────────────────────────────────┘
```

## Development

```bash
pip install -e ".[dev]"
pytest tests/ -v
```

All 103 tests pass across task queue, event stream, agent registry, SQLite storage,
and integration tests.

## License

MIT — see [LICENSE](/LICENSE).

## Maintainer

Published by [Autonomous Ventures](https://github.com/AutonomousVentures).
Initial release: v0.1.0 (May 2026). Beta — `Development Status :: 4 - Beta`.
