Metadata-Version: 2.3
Name: a2a-distributed
Version: 0.0.0
Summary: Add your description here
Author: Tomas Pilar
Author-email: Tomas Pilar <thomas7pilar@gmail.com>
Requires-Dist: a2a-redis>=0.2.1
Requires-Dist: a2a-sdk>=0.3.24
Requires-Dist: bullmq>=2.19.5
Requires-Dist: celery>=5.6.2
Requires-Dist: rq>=2.7.0
Requires-Python: >=3.13
Description-Content-Type: text/markdown

# a2a-distributed

`a2a-distributed` provides distributed execution capabilities for A2A agents. It allows you to offload agent execution to background workers using popular task queues like **Celery**, **BullMQ**, or **RQ (Redis Queue)**.

## Architecture

The package follows a Client/Worker architecture:

1.  **DistributedAgentExecutor (Client Side):** Used by the A2A server to enqueue agent execution tasks into a distributed queue.
2.  **DistributedAgentWorker (Worker Side):** A background process that listens to the queue, reconstructs the execution context, and runs the agent using a local `AgentExecutor`.

## Exports

### Base Classes
- `DistributedAgentExecutor`: Abstract base class for all distributed executors.
- `DistributedAgentWorker`: Abstract base class for all distributed workers.

### Celery Implementation
- `CeleryAgentExecutor`: Enqueues tasks to Celery.
- `CeleryAgentWorker`: Handles Celery tasks.

### BullMQ Implementation
- `BullMQAgentExecutor`: Enqueues jobs to BullMQ.
- `BullMQAgentWorker`: Processes BullMQ jobs.

### RQ Implementation
- `RQAgentExecutor`: Enqueues jobs to Redis Queue.
- `RQAgentWorker`: Processes RQ jobs.

### Utilities
- `run_worker`: A utility function to run any `DistributedAgentWorker` with built-in signal handling for graceful shutdown.

## Usage Example

### 1. Setting up the Executor (Client Side)

```python
from a2a_distributed import BullMQAgentExecutor

# Initialize the executor with connection options
executor = BullMQAgentExecutor(
    queue_name="agent-tasks",
    redis_opts={"host": "localhost", "port": 6379}
)

# Use it in your A2A server configuration
# server = A2AServer(executor=executor, ...)
```

### 2. Setting up the Worker (Worker Side)

```python
import asyncio
from a2a_distributed import BullMQAgentWorker, run_worker
from a2a.server.agent_execution import LocalAgentExecutor
from a2a_redis import RedisEventQueue

async def main():
    # 1. Local executor that actually runs the agent logic
    local_executor = LocalAgentExecutor(...) 
    
    # 2. Event queue for reporting status/results back
    event_queue = RedisEventQueue(...)
    
    # 3. The Distributed Worker
    worker = BullMQAgentWorker(
        agent_executor=local_executor,
        event_queue=event_queue,
        queue_name="agent-tasks",
        redis_opts={"host": "localhost", "port": 6379}
    )
    
    # 4. Run the worker using the utility
    await run_worker(worker)

if __name__ == "__main__":
    asyncio.run(main())
```

## Running Workers

To run a worker, simply call `await worker.run()` or use the `run_worker(worker)` utility which handles `SIGINT` and `SIGTERM` signals for you.

For Celery and RQ, the workers are often started via their respective CLI tools, but the `DistributedAgentWorker` classes provided here allow for programmatic control and integration within your own async loops.
