Metadata-Version: 2.4
Name: agentexec
Version: 0.1.6
Summary: Production-ready orchestration for OpenAI Agents with Redis-backed coordination, activity tracking, and workflow management
Project-URL: Homepage, https://github.com/Agent-CI/agentexec
Project-URL: Documentation, https://github.com/Agent-CI/agentexec#readme
Project-URL: Repository, https://github.com/Agent-CI/agentexec
Project-URL: Issues, https://github.com/Agent-CI/agentexec/issues
Author-email: Agent CI <hello@agent-ci.com>, Travis Dent <root@a10k.co>
License: MIT
Keywords: agents,async,background-workers,openai,orchestration,redis
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Requires-Python: >=3.12
Requires-Dist: openai-agents>=0.1.0
Requires-Dist: pydantic-settings>=2.5.0
Requires-Dist: pydantic>=2.12.0
Requires-Dist: redis>=7.0.1
Requires-Dist: sqlalchemy>=2.0.44
Description-Content-Type: text/markdown

# `agentexec`

[![PyPI](https://img.shields.io/pypi/v/agentexec?color=blue)](https://pypi.org/project/agentexec/)
[![Python](https://img.shields.io/badge/python-3.12+-blue)](https://www.python.org/)
[![License](https://img.shields.io/badge/license-MIT-green)](LICENSE)
[![Type Checked](https://img.shields.io/badge/type%20checked-ty-blue)](https://github.com/astral-sh/ty)
[![Code Style](https://img.shields.io/badge/code%20style-ruff-orange)](https://github.com/astral-sh/ruff)

**Production-ready orchestration for OpenAI Agents SDK** with Redis-backed task queues, SQLAlchemy activity tracking, and multiprocessing worker pools.

---

## The Problem

You've built an AI agent. It works great in development. Now you need to run it in production:

**Customer Support Automation** - A user submits a ticket. Your agent needs to research their account, check previous interactions, and draft a response. This takes 2-3 minutes. You can't block the HTTP request.

**Document Processing Pipeline** - Users upload contracts for analysis. Each document needs OCR, entity extraction, clause identification, and risk scoring. You need to process dozens concurrently while tracking progress.

**Research & Reporting** - Your agent researches companies, gathers data from multiple sources, and generates reports. Users need to see "Gathering financials... 40%" not just a spinning loader.

**Multi-Agent Workflows** - One agent discovers leads, fans out to research each one, then a final agent aggregates results. You need coordination, not chaos.

Running AI agents in production requires:

- **Background execution** - Agents take minutes; users shouldn't wait
- **Progress tracking** - Know what your agents are doing in real-time
- **Fault tolerance** - Handle failures gracefully with full error traces
- **Scalability** - Process multiple tasks across worker processes
- **Observability** - Complete audit trail of agent activities
- **User interfaces** - Components to build status dashboards and CLI monitors

`agentexec` provides all of this out of the box.

---

## Installation

```bash
uv add agentexec
```

**Requirements:**
- Python 3.12+
- Redis
- SQLAlchemy-compatible database (PostgreSQL, MySQL, SQLite)

---

## Quick Start

A typical agentexec application has a few files working together. Here's a complete working example showing each part:

### 1. Database Setup

```python
# db.py
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import agentexec as ax

ax.CONF.redis_url = "redis://localhost:6379/0"

engine = create_engine("sqlite:///agents.db")
ax.Base.metadata.create_all(engine)  # Creates activity tracking tables

def get_db():
    with Session(engine) as db:
        yield db
```

### 2. Define Your Task

```python
# worker.py
from uuid import UUID
from pydantic import BaseModel
from agents import Agent
import agentexec as ax
from .db import engine

class ResearchContext(BaseModel):
    company: str

pool = ax.Pool(engine=engine)

@pool.task("research_company")
async def research_company(agent_id: UUID, context: ResearchContext) -> str:
    runner = ax.OpenAIRunner(agent_id)

    agent = Agent(
        name="Research Agent",
        instructions=f"Research {context.company}. {runner.prompts.report_status}",
        tools=[runner.tools.report_status],  # Agent can report its own progress
        model="gpt-4o",
    )

    result = await runner.run(agent, input="Begin research")
    return result.final_output

if __name__ == "__main__":
    pool.run()
```

### 3. Queue Tasks and Track Progress

```python
# views.py
from uuid import UUID
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
import agentexec as ax
from .worker import ResearchContext
from .db import get_db

router = APIRouter()

@router.post("/research")
async def start_research(company: str) -> dict:
    task = await ax.enqueue("research_company", ResearchContext(company=company))
    return {"agent_id": str(task.agent_id), "status": "queued"}  # Return agent_id for status polling

@router.get("/research/{agent_id}")
def get_status(agent_id: UUID, db: Session = Depends(get_db)) -> ax.activity.ActivityDetailSchema:
    return ax.activity.detail(db, agent_id=agent_id)  # Query by agent_id
```

### 4. Run Workers

```bash
python worker.py
```

That's it. Tasks are queued to Redis, workers process them in parallel, progress is tracked in your database, and your API stays responsive.

---

## Supported Patterns

### Activity Metadata (Multi-Tenancy)

Attach arbitrary metadata when enqueueing tasks for filtering and tenant isolation:

```python
task = await ax.enqueue(
    "process_document",
    context,
    metadata={"organization_id": "org-123", "user_id": "user-456"}
)

# Filter activities by metadata
activities = ax.activity.list(db, metadata_filter={"organization_id": "org-123"})
detail = ax.activity.detail(db, agent_id, metadata_filter={"organization_id": "org-123"})

# Access metadata programmatically (excluded from API serialization by default)
org_id = detail.metadata["organization_id"]
```

### Automatic Activity Tracking

Every task gets full lifecycle tracking without manual updates:

```python
runner = ax.OpenAIRunner(agent_id=agent_id)
result = await runner.run(agent, input="...")

# Activity automatically transitions:
# QUEUED → RUNNING → COMPLETE (or ERROR on failure)
```

### Agent Self-Reporting

Agents can report their own progress:

```python
agent = Agent(
    instructions=f"Do research. {runner.prompts.report_status}",
    tools=[runner.tools.report_status],
)
# Agent calls: report_status("Analyzing financials", 60)
```

### Manual Progress Updates

Update progress explicitly from your task:

```python
ax.activity.update(agent_id, "Processing batch 3 of 10", percentage=30)
```

### Task Locking

When multiple tasks of the same type are queued for the same user, they may need to run sequentially because each task reads and writes shared state. Use `lock_key` to ensure only one task with the same evaluated key runs at a time:

```python
@pool.task("associate_observation", lock_key="user:{user_id}")
async def associate(agent_id: UUID, context: ObservationContext):
    ...

# Or with add_task()
pool.add_task("associate_observation", handler, lock_key="user:{user_id}")
```

The `lock_key` is a string template evaluated against the task context fields. When a worker dequeues a task whose lock is held, it puts the task back at the end of the queue and moves on. The lock is released automatically when the task completes or errors.

The lock TTL (`AGENTEXEC_LOCK_TTL`, default 1800s) is a safety net for worker process death — locks are always explicitly released on task completion or error. Set this higher than your longest expected task duration.

**Note:** When a task is requeued due to a held lock, it goes to the back of the queue. This means strict FIFO ordering is not guaranteed between tasks sharing the same lock key — if tasks T2 and T3 are both waiting on T1's lock, either could run next after T1 completes.

### Priority Queue

Control task execution order:

```python
await ax.enqueue("urgent_task", context, priority=ax.Priority.HIGH)  # Front of queue
await ax.enqueue("batch_job", context, priority=ax.Priority.LOW)     # Back of queue
```

### Max Turns Recovery

Gracefully handle conversation limits:

```python
runner = ax.OpenAIRunner(
    agent_id=agent_id,
    max_turns_recovery=True,
    wrap_up_prompt="Please summarize your findings.",
)
result = await runner.run(agent, max_turns=15)
# If agent hits max turns, automatically continues with wrap-up
```

### Multi-Step Pipelines

Orchestrate complex workflows with parallel execution:

```python
import asyncio

pipeline = ax.Pipeline(pool)

class ResearchPipeline(pipeline.Base):
    @pipeline.step(0, "parallel research")
    async def gather_data(self, context: InputContext) -> tuple[BrandResult, MarketResult]:
        return await asyncio.gather(
            research_brand(context),
            research_market(context),
        )

    @pipeline.step(1, "analysis")
    async def analyze(self, brand: BrandResult, market: MarketResult) -> FinalReport:
        return await analyze_results(brand, market)

# Queue pipeline
task = await pipeline.enqueue(context=InputContext(company="Anthropic"))
```

### Dynamic Fan-Out with Tracker

Coordinate dynamically-queued tasks:

```python
tracker = ax.Tracker("research", batch_id)

@function_tool
async def queue_research(company: str) -> None:
    """Discovery agent calls this for each company found."""
    tracker.incr()
    await ax.enqueue("research", ResearchContext(company=company, batch_id=batch_id))

@function_tool
async def save_result(result: ResearchResult) -> None:
    """Research agent calls this when done."""
    save_to_database(result)
    tracker.decr()
    if tracker.complete:
        await ax.enqueue("aggregate", AggregateContext(batch_id=batch_id))
```

---

## Integration

### Running Alongside Your Application

If you have an existing FastAPI/Flask/Django backend, run the worker pool in a separate process:

```python
# main.py - Your API server
from fastapi import FastAPI
import agentexec as ax

app = FastAPI()

@app.post("/process")
async def process(data: str) -> dict:
    task = await ax.enqueue("process_data", ProcessContext(data=data))
    return {"agent_id": task.agent_id}
```

```python
# worker.py - Run separately
from .tasks import pool

if __name__ == "__main__":
    pool.run()
```

**Terminal 1:** Start your API server

```bash
uvicorn main:app
```

**Terminal 2:** Start the workers

```bash
python worker.py
```

### As a Standalone Worker Service

```python
# worker.py
import os
from uuid import UUID
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import agentexec as ax

engine = create_engine(os.environ["DATABASE_URL"])
pool = ax.Pool(engine=engine)

@pool.task("my_task")
async def my_task(agent_id: UUID, context: MyContext) -> None:
    # Your task implementation
    pass

if __name__ == "__main__":
    try:
        pool.run()
    except KeyboardInterrupt:
        with Session(engine) as db:
            ax.activity.cancel_pending(db)
```

### Docker Deployment

**1. Create your worker Dockerfile:**

```dockerfile
# Dockerfile.worker
FROM ghcr.io/agent-ci/agentexec-worker:latest

COPY ./src /app/src
ENV AGENTEXEC_WORKER_MODULE=src.worker
```

**2. Create your worker module:**

```python
# src/worker.py
import atexit
import os
from uuid import UUID
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import agentexec as ax

engine = create_engine(os.environ["DATABASE_URL"])
pool = ax.Pool(engine=engine)

def cleanup() -> None:
    with Session(engine) as db:
        ax.activity.cancel_pending(db)

atexit.register(cleanup)

@pool.task("my_task")
async def my_task(agent_id: UUID, context: MyContext) -> None:
    pass
```

**3. Build and run:**

```bash
docker build -f Dockerfile.worker -t my-worker .
```

```bash
docker run -e DATABASE_URL=... -e REDIS_URL=... -e OPENAI_API_KEY=... my-worker
```

---

## Backend Architecture

### Redis

agentexec uses Redis for task queuing, result storage, real-time log streaming, and coordination between workers. We chose Redis because it provides exactly the primitives we need (lists, pubsub, atomic counters) with minimal operational overhead.

**AWS Compatible:** Since we use standard Redis features, AWS ElastiCache works out of the box.

```bash
AGENTEXEC_REDIS_URL=redis://localhost:6379/0
# or
AGENTEXEC_REDIS_URL=redis://my-cluster.abc123.use1.cache.amazonaws.com:6379
```

### Extensible State Backend

The state backend is pluggable. We're adding support for additional backends (DynamoDB, PostgreSQL, in-memory for testing). You can also implement your own:

```bash
AGENTEXEC_STATE_BACKEND=agentexec.state.redis_backend  # Default
AGENTEXEC_STATE_BACKEND=myapp.state.dynamodb_backend   # Custom
```

### Database

Activity tracking uses SQLAlchemy with two tables:

**`agentexec_activity`** - Main activity records
- `agent_id` - Unique identifier (UUID)
- `agent_type` - Task name
- `metadata` - JSON field for custom data (e.g., tenant info)
- `created_at`, `updated_at` - Timestamps

**`agentexec_activity_log`** - Status and progress
- `activity_id` - Foreign key
- `message` - Log message
- `status` - QUEUED, RUNNING, COMPLETE, ERROR, CANCELED
- `percentage` - Progress (0-100)

---

## Building Your Own Interface

### Data Structures

The activity tracking module exposes Pydantic schemas for building APIs:

```python
from agentexec.activity.schemas import (
    ActivityListSchema,      # Paginated list response
    ActivityListItemSchema,  # Single item in list (lightweight)
    ActivityDetailSchema,    # Full activity with log history
    ActivityLogSchema,       # Single log entry
)
```

**List activities:**

```python
with Session(engine) as db:
    result = ax.activity.list(db, page=1, page_size=20)
    # Returns ActivityListSchema:
    # {
    #   "items": [...],      # List of ActivityListItemSchema
    #   "total": 150,
    #   "page": 1,
    #   "page_size": 20,
    #   "total_pages": 8
    # }
```

**Get activity detail:**

```python
activity = ax.activity.detail(db, agent_id=agent_id)
# Returns ActivityDetailSchema:
# {
#   "id": "...",
#   "agent_id": "...",
#   "agent_type": "research_company",
#   "created_at": "2024-01-15T10:30:00Z",
#   "updated_at": "2024-01-15T10:32:45Z",
#   "logs": [
#     {"status": "queued", "message": "Waiting to start", "percentage": 0, ...},
#     {"status": "running", "message": "Gathering data", "percentage": 30, ...},
#     {"status": "complete", "message": "Done", "percentage": 100, ...}
#   ]
# }
```

**Count active agents:**

```python
count = ax.activity.active_count(db)
# Returns number of agents with status QUEUED or RUNNING
```

### Building a CLI Monitor

```python
# cli_monitor.py
from rich.live import Live
from rich.table import Table
from sqlalchemy import Engine
from sqlalchemy.orm import Session
import agentexec as ax

def build_table(db: Session) -> Table:
    table = Table(title=f"Active Agents: {ax.activity.active_count(db)}")
    table.add_column("Status")
    table.add_column("Task")
    table.add_column("Message")
    table.add_column("Progress")

    for item in ax.activity.list(db, page=1, page_size=10).items:
        table.add_row(
            item.status,
            item.agent_type,
            item.latest_log_message or "",
            f"{item.percentage}%",
        )
    return table

def monitor(engine: Engine) -> None:
    with Live(refresh_per_second=1) as live:
        while True:
            with Session(engine) as db:
                live.update(build_table(db))

if __name__ == "__main__":
    from .db import engine
    monitor(engine)
```

---

## UI Components

The `agentexec-ui` package provides React components for building monitoring interfaces:

```bash
npm install agentexec-ui
```

### Components

```tsx
import {
  TaskList,
  TaskDetail,
  ActiveAgentsBadge,
  StatusBadge,
  ProgressBar,
} from 'agentexec-ui';

// Display paginated task list
<TaskList
  items={activities.items}
  loading={isLoading}
  onTaskClick={(agentId) => setSelected(agentId)}
  selectedAgentId={selectedId}
/>

// Full activity detail view
<TaskDetail
  activity={activityDetail}
  loading={isDetailLoading}
  error={error}
  onClose={() => setSelected(null)}
/>

// Active count badge
<ActiveAgentsBadge count={activeCount} loading={isLoading} />

// Individual status indicators
<StatusBadge status="running" />
<ProgressBar percentage={65} status="running" />
```

### TypeScript Types

```typescript
import type {
  Status,           // 'queued' | 'running' | 'complete' | 'error' | 'canceled'
  ActivityLog,
  ActivityDetail,
  ActivityListItem,
  ActivityList,
} from 'agentexec-ui';
```

These types mirror the Python API schemas (`ActivityDetailSchema`, `ActivityListSchema`, etc.), so your API responses integrate directly with the components.

The components are headless (no built-in styling) and work with any CSS framework. See `examples/openai-agents-fastapi/ui/` for a complete React app with TanStack Query integration.

---

## Module Reference

### Task Queue

```python
import agentexec as ax

task = await ax.enqueue(task_name, context, priority=ax.Priority.LOW)
result = await ax.get_result(task, timeout=300)
results = await ax.gather(task1, task2, task3)
```

### Worker Pool

```python
import agentexec as ax

pool = ax.Pool(engine=engine)
pool = ax.Pool(database_url="postgresql://...")

@pool.task("name")
async def handler(agent_id: UUID, context: MyContext) -> None: ...

@pool.task("name", lock_key="user:{user_id}")  # Sequential per user
async def locked(agent_id: UUID, context: MyContext) -> None: ...

pool.run()       # Blocking - runs workers
pool.start()     # Non-blocking - starts workers in background
pool.shutdown()  # Graceful shutdown
```

### Activity Tracking

```python
import agentexec as ax

# Create activity (returns agent_id for tracking)
agent_id = ax.activity.create(task_name, message="Starting...")

# Update progress
ax.activity.update(agent_id, message, percentage=50)
ax.activity.complete(agent_id, message="Done")
ax.activity.error(agent_id, error="Failed: ...")

# Query activities
activities = ax.activity.list(db, page=1, page_size=20)
activity = ax.activity.detail(db, agent_id=agent_id)
count = ax.activity.active_count(db)

# Cleanup
canceled = ax.activity.cancel_pending(db)
```

### Runners

```python
import agentexec as ax

runner = ax.OpenAIRunner(
    agent_id=agent_id,
    max_turns_recovery=True,
    wrap_up_prompt="Summarize...",
)

runner.prompts.report_status  # Instruction text for agents
runner.tools.report_status    # Pre-bound function tool

result = await runner.run(agent, input="...", max_turns=15)
result = await runner.run_streamed(agent, input="...", max_turns=15)

# Base class for custom runners
class MyRunner(ax.BaseAgentRunner):
    async def run(self, agent: Agent, input: str) -> RunResult: ...
```

### Pipelines

```python
import agentexec as ax

pipeline = ax.Pipeline(pool)

class MyPipeline(pipeline.Base):
    @pipeline.step(0, "description")
    async def step_one(self, context): ...
```

### Tracker

```python
import agentexec as ax

tracker = ax.Tracker("name", batch_id)
tracker.incr()
if tracker.complete: ...  # All tasks done
```

### Database

```python
import agentexec as ax

ax.Base  # SQLAlchemy declarative base for activity tables
```

---

## Configuration

All settings via environment variables:

```bash
# Redis (required)
AGENTEXEC_REDIS_URL=redis://localhost:6379/0

# Workers
AGENTEXEC_NUM_WORKERS=4
AGENTEXEC_QUEUE_NAME=agentexec_tasks
AGENTEXEC_GRACEFUL_SHUTDOWN_TIMEOUT=300

# Database
AGENTEXEC_TABLE_PREFIX=agentexec_

# Results
AGENTEXEC_RESULT_TTL=3600

# Task locking
AGENTEXEC_LOCK_TTL=1800

# State backend
AGENTEXEC_STATE_BACKEND=agentexec.state.redis_backend
AGENTEXEC_KEY_PREFIX=agentexec

# Activity messages (customizable)
AGENTEXEC_ACTIVITY_MESSAGE_CREATE="Waiting to start."
AGENTEXEC_ACTIVITY_MESSAGE_STARTED="Task started."
AGENTEXEC_ACTIVITY_MESSAGE_COMPLETE="Task completed successfully."
AGENTEXEC_ACTIVITY_MESSAGE_ERROR="Task failed with error: {error}"
```

---

## Development

```bash
# Clone repository
git clone https://github.com/Agent-CI/agentexec
cd agentexec

# Install dependencies
uv sync

# Run tests
uv run pytest

# Type checking
uv run ty check

# Linting
uv run ruff check src/

# Formatting
uv run ruff format src/
```

### Contributing

1. Fork the repository
2. Create a feature branch
3. Make your changes with tests
4. Run `uv run pytest` and `uv run ty check`
5. Submit a pull request

---

## License

MIT License - see [LICENSE](LICENSE) for details.

---

## Links

- **PyPI**: [agentexec](https://pypi.org/project/agentexec/)
- **npm**: [agentexec-ui](https://www.npmjs.com/package/agentexec-ui)
- **Documentation**: [docs/](docs/)
- **Example App**: [examples/openai-agents-fastapi/](examples/openai-agents-fastapi/)
- **Multi-Tenancy Example**: [examples/multi-tenancy/](examples/multi-tenancy/)
- **Issues**: [GitHub Issues](https://github.com/Agent-CI/agentexec/issues)
