Metadata-Version: 2.4
Name: agentexec
Version: 0.2.0
Summary: Production-ready orchestration for OpenAI Agents with Redis-backed coordination, activity tracking, and workflow management
Project-URL: Homepage, https://github.com/Agent-CI/agentexec
Project-URL: Documentation, https://github.com/Agent-CI/agentexec#readme
Project-URL: Repository, https://github.com/Agent-CI/agentexec
Project-URL: Issues, https://github.com/Agent-CI/agentexec/issues
Author-email: Agent CI <hello@agent-ci.com>, Travis Dent <root@a10k.co>
License: MIT
Keywords: agents,async,background-workers,openai,orchestration,redis
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Requires-Python: >=3.12
Requires-Dist: croniter>=6.0.0
Requires-Dist: openai-agents>=0.1.0
Requires-Dist: pydantic-settings>=2.5.0
Requires-Dist: pydantic>=2.12.0
Requires-Dist: redis>=7.0.1
Requires-Dist: sqlalchemy[asyncio]>=2.0.44
Provides-Extra: kafka
Requires-Dist: aiokafka>=0.11.0; extra == 'kafka'
Description-Content-Type: text/markdown

# `agentexec`

[![PyPI](https://img.shields.io/pypi/v/agentexec?color=blue)](https://pypi.org/project/agentexec/)
[![Python](https://img.shields.io/badge/python-3.12+-blue)](https://www.python.org/)
[![License](https://img.shields.io/badge/license-MIT-green)](LICENSE)
[![Type Checked](https://img.shields.io/badge/type%20checked-ty-blue)](https://github.com/astral-sh/ty)
[![Code Style](https://img.shields.io/badge/code%20style-ruff-orange)](https://github.com/astral-sh/ruff)

**Production-ready orchestration for OpenAI Agents SDK** with Redis-backed task queues, SQLAlchemy activity tracking, and multiprocessing worker pools.

---

## The Problem

You've built an AI agent. It works great in development. Now you need to run it in production:

**Customer Support Automation** - A user submits a ticket. Your agent needs to research their account, check previous interactions, and draft a response. This takes 2-3 minutes. You can't block the HTTP request.

**Document Processing Pipeline** - Users upload contracts for analysis. Each document needs OCR, entity extraction, clause identification, and risk scoring. You need to process dozens concurrently while tracking progress.

**Research & Reporting** - Your agent researches companies, gathers data from multiple sources, and generates reports. Users need to see "Gathering financials... 40%" not just a spinning loader.

**Multi-Agent Workflows** - One agent discovers leads, fans out to research each one, then a final agent aggregates results. You need coordination, not chaos.

Running AI agents in production requires:

- **Background execution** - Agents take minutes; users shouldn't wait
- **Progress tracking** - Know what your agents are doing in real-time
- **Fault tolerance** - Handle failures gracefully with full error traces
- **Scalability** - Process multiple tasks across worker processes
- **Observability** - Complete audit trail of agent activities
- **User interfaces** - Components to build status dashboards and CLI monitors

`agentexec` provides all of this out of the box.

---

## Installation

```bash
uv add agentexec
```

**Requirements:**
- Python 3.12+
- Redis
- SQLAlchemy-compatible database (PostgreSQL, MySQL, SQLite)

---

## Quick Start

A typical agentexec application has a few files working together. Here's a complete working example showing each part:

### 1. Database Setup

```python
# db.py
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import agentexec as ax

ax.CONF.redis_url = "redis://localhost:6379/0"

engine = create_engine("sqlite:///agents.db")
ax.Base.metadata.create_all(engine)  # Creates activity tracking tables

def get_db():
    with Session(engine) as db:
        yield db
```

### 2. Define Your Task

```python
# worker.py
from uuid import UUID
from pydantic import BaseModel
from agents import Agent
import agentexec as ax
from .db import engine

class ResearchContext(BaseModel):
    company: str

pool = ax.Pool(engine=engine)

@pool.task("research_company")
async def research_company(agent_id: UUID, context: ResearchContext) -> str:
    runner = ax.OpenAIRunner(agent_id)

    agent = Agent(
        name="Research Agent",
        instructions=f"Research {context.company}. {runner.prompts.report_status}",
        tools=[runner.tools.report_status],  # Agent can report its own progress
        model="gpt-4o",
    )

    result = await runner.run(agent, input="Begin research")
    return result.final_output

if __name__ == "__main__":
    pool.run()
```

### 3. Queue Tasks and Track Progress

```python
# views.py
from uuid import UUID
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
import agentexec as ax
from .worker import ResearchContext
from .db import get_db

router = APIRouter()

@router.post("/research")
async def start_research(company: str) -> dict:
    task = await ax.enqueue("research_company", ResearchContext(company=company))
    return {"agent_id": str(task.agent_id), "status": "queued"}  # Return agent_id for status polling

@router.get("/research/{agent_id}")
async def get_status(agent_id: UUID) -> ax.activity.ActivityDetailSchema:
    return await ax.activity.detail(agent_id=agent_id)
```

### 4. Run Workers

```bash
python worker.py
```

That's it. Tasks are queued to Redis, workers process them in parallel, progress is tracked in your database, and your API stays responsive.

---

## Supported Patterns

### Activity Metadata (Multi-Tenancy)

Attach arbitrary metadata when enqueueing tasks for filtering and tenant isolation:

```python
task = await ax.enqueue(
    "process_document",
    context,
    metadata={"organization_id": "org-123", "user_id": "user-456"}
)

# Filter activities by metadata
activities = await ax.activity.list(metadata_filter={"organization_id": "org-123"})
detail = await ax.activity.detail(agent_id=agent_id, metadata_filter={"organization_id": "org-123"})

# Access metadata programmatically (excluded from API serialization by default)
org_id = detail.metadata["organization_id"]
```

### Automatic Activity Tracking

Every task gets full lifecycle tracking without manual updates:

```python
runner = ax.OpenAIRunner(agent_id=agent_id)
result = await runner.run(agent, input="...")

# Activity automatically transitions:
# QUEUED → RUNNING → COMPLETE (or ERROR on failure)
```

### Agent Self-Reporting

Agents can report their own progress:

```python
agent = Agent(
    instructions=f"Do research. {runner.prompts.report_status}",
    tools=[runner.tools.report_status],
)
# Agent calls: report_status("Analyzing financials", 60)
```

### Manual Progress Updates

Update progress explicitly from your task:

```python
await ax.activity.update(agent_id, "Processing batch 3 of 10", percentage=30)
```

### Task Locking

When multiple tasks of the same type are queued for the same user, they may need to run sequentially because each task reads and writes shared state. Use `lock_key` to ensure only one task with the same evaluated key runs at a time:

```python
@pool.task("associate_observation", lock_key="user:{user_id}")
async def associate(agent_id: UUID, context: ObservationContext):
    ...

# Or with add_task()
pool.add_task("associate_observation", handler, lock_key="user:{user_id}")
```

The `lock_key` is a string template evaluated against the task context fields. Tasks with the same evaluated lock key are routed to a dedicated partition queue (`{prefix}:{lock_key}`) where they execute one at a time. Workers skip locked partitions and move on to the next available one — no requeuing, no wasted cycles.

The lock is released automatically when a task completes or errors. The lock TTL (`AGENTEXEC_LOCK_TTL`, default 1800s) is a safety net for worker process death (OOM, SIGKILL) — under normal operation, locks are always explicitly released. Set this higher than your longest expected task duration.

### Scheduled Tasks

Run tasks on a recurring interval using cron expressions:

```python
# Decorator — registers the task and schedules it in one step
@pool.schedule("refresh_cache", "*/5 * * * *")
async def refresh(agent_id: UUID, context: RefreshContext):
    ...

# With context and repeat limit
@pool.schedule("sync_users", "0 * * * *", context=SyncContext(full=True), repeat=3)
async def sync(agent_id: UUID, context: SyncContext):
    ...
```

For tasks registered separately, use `pool.add_schedule()`:

```python
pool.add_schedule("refresh_cache", "*/5 * * * *", RefreshContext(scope="all"))
pool.add_schedule("refresh_cache", "0 * * * *", RefreshContext(scope="users"), repeat=3)
```

The scheduler runs automatically inside `pool.run()`. Cron expressions are evaluated in the configured timezone (`AGENTEXEC_SCHEDULER_TIMEZONE`, default UTC) so schedules read naturally regardless of server timezone. Next-run times are computed from the intended anchor time, not wall clock, to prevent cumulative drift.

### Priority Queue

Control task execution order:

```python
await ax.enqueue("urgent_task", context, priority=ax.Priority.HIGH)  # Front of queue
await ax.enqueue("batch_job", context, priority=ax.Priority.LOW)     # Back of queue
```

### Max Turns Recovery

Gracefully handle conversation limits:

```python
runner = ax.OpenAIRunner(
    agent_id=agent_id,
    max_turns_recovery=True,
    wrap_up_prompt="Please summarize your findings.",
)
result = await runner.run(agent, max_turns=15)
# If agent hits max turns, automatically continues with wrap-up
```

### Multi-Step Pipelines

Orchestrate complex workflows with parallel execution:

```python
import asyncio

pipeline = ax.Pipeline(pool)

class ResearchPipeline(pipeline.Base):
    @pipeline.step(0, "parallel research")
    async def gather_data(self, context: InputContext) -> tuple[BrandResult, MarketResult]:
        return await asyncio.gather(
            research_brand(context),
            research_market(context),
        )

    @pipeline.step(1, "analysis")
    async def analyze(self, brand: BrandResult, market: MarketResult) -> FinalReport:
        return await analyze_results(brand, market)

# Queue pipeline
task = await pipeline.enqueue(context=InputContext(company="Anthropic"))
```

### Dynamic Fan-Out with Tracker

Coordinate dynamically-queued tasks:

```python
tracker = ax.Tracker("research", batch_id)

@function_tool
async def queue_research(company: str) -> None:
    """Discovery agent calls this for each company found."""
    tracker.incr()
    await ax.enqueue("research", ResearchContext(company=company, batch_id=batch_id))

@function_tool
async def save_result(result: ResearchResult) -> None:
    """Research agent calls this when done."""
    save_to_database(result)
    tracker.decr()
    if tracker.complete:
        await ax.enqueue("aggregate", AggregateContext(batch_id=batch_id))
```

---

## Integration

### Running Alongside Your Application

If you have an existing FastAPI/Flask/Django backend, run the worker pool in a separate process:

```python
# main.py - Your API server
from fastapi import FastAPI
import agentexec as ax

app = FastAPI()

@app.post("/process")
async def process(data: str) -> dict:
    task = await ax.enqueue("process_data", ProcessContext(data=data))
    return {"agent_id": task.agent_id}
```

```python
# worker.py - Run separately
from .tasks import pool

if __name__ == "__main__":
    pool.run()
```

**Terminal 1:** Start your API server

```bash
uvicorn main:app
```

**Terminal 2:** Start the workers

```bash
python worker.py
```

### As a Standalone Worker Service

```python
# worker.py
import os
from uuid import UUID
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import agentexec as ax

engine = create_engine(os.environ["DATABASE_URL"])
pool = ax.Pool(engine=engine)

@pool.task("my_task")
async def my_task(agent_id: UUID, context: MyContext) -> None:
    # Your task implementation
    pass

if __name__ == "__main__":
    try:
        pool.run()
    except KeyboardInterrupt:
        asyncio.run(ax.activity.cancel_pending())
```

### Docker Deployment

**1. Create your worker Dockerfile:**

```dockerfile
# Dockerfile.worker
FROM ghcr.io/agent-ci/agentexec-worker:latest

COPY ./src /app/src
ENV AGENTEXEC_WORKER_MODULE=src.worker
```

**2. Create your worker module:**

```python
# src/worker.py
import atexit
import os
from uuid import UUID
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import agentexec as ax

engine = create_engine(os.environ["DATABASE_URL"])
pool = ax.Pool(engine=engine)

async def cleanup() -> None:
    await ax.activity.cancel_pending()

atexit.register(lambda: asyncio.run(cleanup()))

@pool.task("my_task")
async def my_task(agent_id: UUID, context: MyContext) -> None:
    pass
```

**3. Build and run:**

```bash
docker build -f Dockerfile.worker -t my-worker .
```

```bash
docker run -e DATABASE_URL=... -e REDIS_URL=... -e OPENAI_API_KEY=... my-worker
```

---

## Backend Architecture

### Redis (Default)

agentexec uses Redis for task queuing, result storage, and coordination between workers. The queue uses a partitioned design where tasks with a `lock_key` go to dedicated partition queues (`{prefix}:{lock_key}`) and are serialized by a lock, while tasks without a lock key go to the default queue for concurrent processing.

Workers dequeue using Redis `SCAN`, which iterates keys in hash-table order — effectively random. This provides fair distribution across partitions without explicit round-robin. See `examples/queue-fairness/` for benchmarks showing uniform distribution at 1000+ partitions.

**AWS Compatible:** Standard Redis features only — AWS ElastiCache works out of the box.

```bash
AGENTEXEC_REDIS_URL=redis://localhost:6379/0
# or
AGENTEXEC_REDIS_URL=redis://my-cluster.abc123.use1.cache.amazonaws.com:6379
```

### Kafka (Experimental)

Kafka can be used as an alternative backend for task queuing and schedule storage. Activity tracking always uses PostgreSQL regardless of backend — Kafka is not a KV store, so state operations (`get`/`set`, counters) are not supported and will raise `NotImplementedError`.

```bash
pip install agentexec[kafka]

AGENTEXEC_STATE_BACKEND=agentexec.state.kafka
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
```

Kafka uses consumer groups for work distribution instead of Redis's scan-based dequeue. Topics are auto-created on first use. Schedule storage uses a compacted topic that is replayed on each poll.

**When to consider Kafka:**
- You already run Kafka and want to avoid adding Redis
- You need durable, replayable task queues with built-in replication
- You want partition-level ordering guarantees (tasks with the same key go to the same partition)

**Limitations:**
- No KV state — `backend.state.get/set/delete` and counters raise `NotImplementedError`
- No partition-level locking (Kafka partition assignment handles isolation instead)
- Schedule `get_due()` replays the entire compacted topic on every poll
- `lock_key` is used as a Kafka partition key (routing), not as a mutex

See [Kafka configuration](#kafka-settings) below for all available settings.

### Extensible State Backend

The state backend is pluggable. Implement `BaseBackend` with `state`, `queue`, and `schedule` sub-backends:

```bash
AGENTEXEC_STATE_BACKEND=agentexec.state.redis   # Default
AGENTEXEC_STATE_BACKEND=agentexec.state.kafka    # Experimental
AGENTEXEC_STATE_BACKEND=myapp.state.custom       # Custom (must export Backend class)
```

### Database

Activity tracking uses SQLAlchemy with two tables (always PostgreSQL/SQLite, independent of the state backend):

**`agentexec_activity`** - Main activity records
- `agent_id` - Unique identifier (UUID)
- `agent_type` - Task name
- `metadata` - JSON field for custom data (e.g., tenant info)
- `created_at`, `updated_at` - Timestamps

**`agentexec_activity_log`** - Status and progress
- `activity_id` - Foreign key
- `message` - Log message
- `status` - QUEUED, RUNNING, COMPLETE, ERROR, CANCELED
- `percentage` - Progress (0-100)

---

## Building Your Own Interface

### Data Structures

The activity tracking module exposes Pydantic schemas for building APIs:

```python
from agentexec.activity.schemas import (
    ActivityListSchema,      # Paginated list response
    ActivityListItemSchema,  # Single item in list (lightweight)
    ActivityDetailSchema,    # Full activity with log history
    ActivityLogSchema,       # Single log entry
)
```

**List activities:**

```python
result = await ax.activity.list(page=1, page_size=20)
# Returns ActivityListSchema:
# {
#   "items": [...],      # List of ActivityListItemSchema
#   "total": 150,
#   "page": 1,
#   "page_size": 20,
#   "total_pages": 8
# }
```

**Get activity detail:**

```python
activity = await ax.activity.detail(agent_id=agent_id)
# Returns ActivityDetailSchema:
# {
#   "agent_id": "...",
#   "agent_type": "research_company",
#   "created_at": "2024-01-15T10:30:00Z",
#   "updated_at": "2024-01-15T10:32:45Z",
#   "logs": [
#     {"status": "queued", "message": "Waiting to start", "percentage": 0, ...},
#     {"status": "running", "message": "Gathering data", "percentage": 30, ...},
#     {"status": "complete", "message": "Done", "percentage": 100, ...}
#   ]
# }
```

**Count active agents:**

```python
count = await ax.activity.count_active()
# Returns number of agents with status QUEUED or RUNNING
```

### Building a CLI Monitor

```python
# cli_monitor.py
from rich.live import Live
from rich.table import Table
from sqlalchemy import Engine
from sqlalchemy.orm import Session
import agentexec as ax

def build_table(db: Session) -> Table:
    count = asyncio.run(ax.activity.count_active())
    table = Table(title=f"Active Agents: {count}")
    table.add_column("Status")
    table.add_column("Task")
    table.add_column("Message")
    table.add_column("Progress")

    activities = asyncio.run(ax.activity.list(page=1, page_size=10))
    for item in activities.items:
        table.add_row(
            item.status,
            item.agent_type,
            item.latest_log_message or "",
            f"{item.percentage}%",
        )
    return table

def monitor(engine: Engine) -> None:
    with Live(refresh_per_second=1) as live:
        while True:
            with Session(engine) as db:
                live.update(build_table(db))

if __name__ == "__main__":
    from .db import engine
    monitor(engine)
```

---

## UI Components

The `agentexec-ui` package provides React components for building monitoring interfaces:

```bash
npm install agentexec-ui
```

### Components

```tsx
import {
  TaskList,
  TaskDetail,
  ActiveAgentsBadge,
  StatusBadge,
  ProgressBar,
} from 'agentexec-ui';

// Display paginated task list
<TaskList
  items={activities.items}
  loading={isLoading}
  onTaskClick={(agentId) => setSelected(agentId)}
  selectedAgentId={selectedId}
/>

// Full activity detail view
<TaskDetail
  activity={activityDetail}
  loading={isDetailLoading}
  error={error}
  onClose={() => setSelected(null)}
/>

// Active count badge
<ActiveAgentsBadge count={activeCount} loading={isLoading} />

// Individual status indicators
<StatusBadge status="running" />
<ProgressBar percentage={65} status="running" />
```

### TypeScript Types

```typescript
import type {
  Status,           // 'queued' | 'running' | 'complete' | 'error' | 'canceled'
  ActivityLog,
  ActivityDetail,
  ActivityListItem,
  ActivityList,
} from 'agentexec-ui';
```

These types mirror the Python API schemas (`ActivityDetailSchema`, `ActivityListSchema`, etc.), so your API responses integrate directly with the components.

The components are headless (no built-in styling) and work with any CSS framework. See `examples/openai-agents-fastapi/ui/` for a complete React app with TanStack Query integration.

---

## Module Reference

### Task Queue

```python
import agentexec as ax

task = await ax.enqueue(task_name, context, priority=ax.Priority.LOW)
result = await ax.get_result(task, timeout=300)
results = await ax.gather(task1, task2, task3)
```

### Worker Pool

```python
import agentexec as ax

pool = ax.Pool(engine=engine)
pool = ax.Pool(database_url="postgresql://...")

@pool.task("name")
async def handler(agent_id: UUID, context: MyContext) -> None: ...

@pool.task("name", lock_key="user:{user_id}")  # Sequential per user
async def locked(agent_id: UUID, context: MyContext) -> None: ...

@pool.schedule("name", "*/5 * * * *")  # Register + schedule in one step
async def scheduled(agent_id: UUID, context: MyContext) -> None: ...

pool.add_schedule("name", "0 * * * *", MyContext(), repeat=3)  # Schedule separately

pool.run()       # Blocking - runs workers + scheduler + retry handling
pool.start()     # Non-blocking - starts workers in background
pool.shutdown()  # Graceful shutdown
```

### Activity Tracking

```python
import agentexec as ax

# Create activity (returns agent_id for tracking)
agent_id = await ax.activity.create(task_name, message="Starting...")

# Update progress
await ax.activity.update(agent_id, message, percentage=50)
await ax.activity.complete(agent_id, message="Done")
await ax.activity.error(agent_id, message="Failed: ...")

# Query activities (uses database session)
activities = await ax.activity.list(page=1, page_size=20)
activity = await ax.activity.detail(agent_id=agent_id)
count = await ax.activity.count_active()

# Cleanup
canceled = await ax.activity.cancel_pending()
```

### Runners

```python
import agentexec as ax

runner = ax.OpenAIRunner(
    agent_id=agent_id,
    max_turns_recovery=True,
    wrap_up_prompt="Summarize...",
)

runner.prompts.report_status  # Instruction text for agents
runner.tools.report_status    # Pre-bound function tool

result = await runner.run(agent, input="...", max_turns=15)
result = await runner.run_streamed(agent, input="...", max_turns=15)

# Base class for custom runners
class MyRunner(ax.BaseAgentRunner):
    async def run(self, agent: Agent, input: str) -> RunResult: ...
```

### Pipelines

```python
import agentexec as ax

pipeline = ax.Pipeline(pool)

class MyPipeline(pipeline.Base):
    @pipeline.step(0, "description")
    async def step_one(self, context): ...
```

### Tracker

```python
import agentexec as ax

tracker = ax.Tracker("name", batch_id)
tracker.incr()
if tracker.complete: ...  # All tasks done
```

### Database

```python
import agentexec as ax

ax.Base  # SQLAlchemy declarative base for activity tables
```

---

## Configuration

All settings via environment variables:

```bash
# Redis
AGENTEXEC_REDIS_URL=redis://localhost:6379/0    # Also accepts REDIS_URL
AGENTEXEC_REDIS_POOL_SIZE=10
AGENTEXEC_REDIS_POOL_TIMEOUT=5

# Workers
AGENTEXEC_NUM_WORKERS=4
AGENTEXEC_QUEUE_PREFIX=agentexec_tasks          # Also accepts AGENTEXEC_QUEUE_NAME
AGENTEXEC_GRACEFUL_SHUTDOWN_TIMEOUT=300
AGENTEXEC_MAX_TASK_RETRIES=3                    # 0 to disable retries

# Database
AGENTEXEC_TABLE_PREFIX=agentexec_

# Results
AGENTEXEC_RESULT_TTL=3600

# Task locking (Redis backend only)
AGENTEXEC_LOCK_TTL=1800

# Scheduling
AGENTEXEC_SCHEDULER_TIMEZONE=UTC
AGENTEXEC_SCHEDULER_POLL_INTERVAL=10

# State backend
AGENTEXEC_STATE_BACKEND=agentexec.state.redis   # or agentexec.state.kafka
AGENTEXEC_KEY_PREFIX=agentexec

# Activity messages (customizable)
AGENTEXEC_ACTIVITY_MESSAGE_CREATE="Waiting to start."
AGENTEXEC_ACTIVITY_MESSAGE_STARTED="Task started."
AGENTEXEC_ACTIVITY_MESSAGE_COMPLETE="Task completed successfully."
AGENTEXEC_ACTIVITY_MESSAGE_ERROR="Task failed with error: {error}"
```

### Kafka Settings

These settings only apply when using the Kafka state backend (`AGENTEXEC_STATE_BACKEND=agentexec.state.kafka`):

```bash
KAFKA_BOOTSTRAP_SERVERS=localhost:9092          # Also accepts AGENTEXEC_KAFKA_BOOTSTRAP_SERVERS
AGENTEXEC_KAFKA_DEFAULT_PARTITIONS=6            # Partitions for auto-created topics
AGENTEXEC_KAFKA_REPLICATION_FACTOR=1            # Replication factor for auto-created topics
AGENTEXEC_KAFKA_MAX_BATCH_SIZE=16384            # Producer max batch size (bytes)
AGENTEXEC_KAFKA_LINGER_MS=5                     # Producer linger time (ms)
AGENTEXEC_KAFKA_RETENTION_MS=-1                 # Retention for compacted topics (-1 = forever)
```

For single-node development, set `KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1` on your broker or consumer groups will hang.

---

## Development

```bash
# Clone repository
git clone https://github.com/Agent-CI/agentexec
cd agentexec

# Install dependencies
uv sync

# Run tests
uv run pytest

# Type checking
uv run ty check

# Linting
uv run ruff check src/

# Formatting
uv run ruff format src/
```

### Contributing

1. Fork the repository
2. Create a feature branch
3. Make your changes with tests
4. Run `uv run pytest` and `uv run ty check`
5. Submit a pull request

---

## License

MIT License - see [LICENSE](LICENSE) for details.

---

## Links

- **PyPI**: [agentexec](https://pypi.org/project/agentexec/)
- **npm**: [agentexec-ui](https://www.npmjs.com/package/agentexec-ui)
- **Documentation**: [docs/](docs/)
- **Example App**: [examples/openai-agents-fastapi/](examples/openai-agents-fastapi/)
- **Multi-Tenancy Example**: [examples/multi-tenancy/](examples/multi-tenancy/)
- **Queue Fairness Benchmark**: [examples/queue-fairness/](examples/queue-fairness/)
- **Issues**: [GitHub Issues](https://github.com/Agent-CI/agentexec/issues)
