# eggai-clutch

> Multi-strategy agent orchestration library for Python. Define agent pipelines with decorators, control flow with exceptions, run locally or distributed. Install with `pip install eggai-clutch`.

eggai-clutch orchestrates async Python functions (agents) into pipelines. Four execution strategies: SEQUENTIAL (agents execute in registration order), ROUND_ROBIN (cycle through agents until termination), GRAPH (follow explicit edges between agents), and SELECTOR (dynamic routing via classifier function). Control flow uses exceptions: `Terminate(result)` stops execution and returns the result, `Handover(agent_name, data)` transfers control to a specific agent.

Agents receive data and return transformed data. Use Pydantic models for type safety. The `@clutch.agent()` decorator registers functions. All agents must be async. The final agent should raise `Terminate(result)` to return.

Key exports: `from eggai_clutch import Clutch, ClutchTask, StepEvent, Strategy, Terminate, Handover`. Access execution context with `from eggai_clutch.message import get_context`.

## Core Concepts

### Creating a Pipeline
```python
from eggai_clutch import Clutch, Strategy, Terminate

# Default is SEQUENTIAL strategy
clutch = Clutch("pipeline-name")

# Or specify strategy explicitly
clutch = Clutch("pipeline-name", Strategy.SEQUENTIAL)
clutch = Clutch("router", Strategy.SELECTOR)
clutch = Clutch("workflow", Strategy.GRAPH)
clutch = Clutch("iterative", Strategy.ROUND_ROBIN, max_turns=10)
```

### Registering Agents
```python
@clutch.agent()
async def my_agent(data):
    # Process data
    return transformed_data

# With Pydantic models for type safety
from pydantic import BaseModel

class MyData(BaseModel):
    input: str
    result: str = ""

@clutch.agent()
async def typed_agent(data: MyData) -> MyData:
    data.result = process(data.input)
    return data
```

### Running the Pipeline
```python
# Simple blocking run
result = await clutch.run({"input": "value"})

# With timeout
result = await clutch.run(data, timeout=30.0)

# Async task handle for more control
task = await clutch.submit(data)
print(task.id)           # Unique task ID
print(task.done)         # Check completion (non-blocking)
result = await task      # Wait for result
result = await task.result(timeout=10.0)
task.cancel()            # Cancel execution

# Stream step-by-step events
async for event in clutch.stream(data):
    print(f"Step: {event.step}, Final: {event.final}")
    if event.final:
        result = event.data
```

## Strategy Guide

### SEQUENTIAL - Linear Pipeline
Agents execute in the order they were registered. Output of each agent becomes input to the next.

```python
from eggai_clutch import Clutch, Terminate

clutch = Clutch("pipeline")

@clutch.agent()
async def step_1(data):
    data["step1"] = True
    return data

@clutch.agent()
async def step_2(data):
    data["step2"] = True
    return data

@clutch.agent()
async def step_3(data):
    data["step3"] = True
    raise Terminate(data)  # Return final result

result = await clutch.run({"initial": True})
# result = {"initial": True, "step1": True, "step2": True, "step3": True}
```

### SELECTOR - Dynamic Routing
A selector function determines which agent handles each request. Only one agent executes per run.

```python
from eggai_clutch import Clutch, Strategy, Terminate

clutch = Clutch("router", Strategy.SELECTOR)

@clutch.selector()
async def route(data) -> str:
    """Return the name of the agent to handle this request."""
    if data.get("type") == "billing":
        return "billing_handler"
    elif data.get("type") == "technical":
        return "tech_handler"
    return "general_handler"

@clutch.agent()
async def billing_handler(data):
    data["handled_by"] = "billing"
    raise Terminate(data)

@clutch.agent()
async def tech_handler(data):
    data["handled_by"] = "technical"
    raise Terminate(data)

@clutch.agent()
async def general_handler(data):
    data["handled_by"] = "general"
    raise Terminate(data)

result = await clutch.run({"type": "billing", "query": "refund"})
# result = {"type": "billing", "query": "refund", "handled_by": "billing"}
```

### GRAPH - Explicit Workflow
Define explicit edges between agents. Useful for complex workflows with branching.

```python
from eggai_clutch import Clutch, Strategy, Terminate

clutch = Clutch("workflow", Strategy.GRAPH)

@clutch.agent(edges=["validate"])  # After parse, go to validate
async def parse(data):
    data["parsed"] = True
    return data

@clutch.agent(edges=["enrich", "store"])  # Can go to either
async def validate(data):
    data["valid"] = True
    return data

@clutch.agent(edges=["store"])
async def enrich(data):
    data["enriched"] = True
    return data

@clutch.agent()  # No edges = terminal node
async def store(data):
    data["stored"] = True
    raise Terminate(data)
```

### ROUND_ROBIN - Iterative Processing
Cycles through agents repeatedly until `Terminate` is raised or `max_turns` is reached.

```python
from eggai_clutch import Clutch, Strategy, Terminate

clutch = Clutch("iterator", Strategy.ROUND_ROBIN, max_turns=10)

@clutch.agent()
async def refine(data):
    data["iterations"] = data.get("iterations", 0) + 1
    if data["iterations"] >= 3:
        raise Terminate(data)
    return data

@clutch.agent()
async def evaluate(data):
    data["score"] = calculate_score(data)
    return data
```

## Control Flow

### Terminate - Stop and Return
```python
from eggai_clutch import Terminate

@clutch.agent()
async def final_step(data):
    # Stop pipeline and return this result
    raise Terminate({"status": "complete", "data": data})
```

### Handover - Transfer to Specific Agent
```python
from eggai_clutch import Handover

@clutch.agent()
async def router(data):
    if data.get("needs_review"):
        # Jump directly to reviewer agent
        raise Handover("reviewer", data)
    return data

@clutch.agent()
async def reviewer(data):
    data["reviewed"] = True
    raise Terminate(data)
```

## Context Access

Access execution context within agents:

```python
from eggai_clutch.message import get_context

@clutch.agent()
async def context_aware_agent(data):
    ctx = get_context()

    # Available context fields:
    print(ctx.data)      # Current data being processed
    print(ctx.source)    # Name of previous agent (or "_start")
    print(ctx.state)     # ClutchState object
    print(ctx.metadata)  # Custom metadata dict

    # ClutchState fields:
    print(ctx.state.turn)     # Current turn number
    print(ctx.state.history)  # List of previous steps
    print(ctx.state.members)  # List of agent names

    return data
```

## Hooks

Monitor and intercept pipeline execution:

```python
async def on_request_handler(data):
    print(f"Pipeline starting with: {data}")
    return data  # Can modify input

async def on_response_handler(result):
    print(f"Pipeline completed with: {result}")
    return result  # Can modify output

async def on_step_handler(step_name, data):
    print(f"Step {step_name} completed")

clutch = Clutch(
    "pipeline",
    on_request=on_request_handler,
    on_response=on_response_handler,
    on_step=on_step_handler,
)
```

## Complete Examples

### Example 1: RAG Document Pipeline
Sequential pipeline for document processing with chunking, embedding, and summarization.

```python
import asyncio
from pydantic import BaseModel
from eggai_clutch import Clutch, Terminate

class Document(BaseModel):
    path: str
    content: str = ""
    chunks: list[str] = []
    embeddings: list[list[float]] = []
    summary: str = ""

clutch = Clutch("rag-indexer")

@clutch.agent()
async def loader(doc: Document) -> Document:
    """Load document content."""
    doc.content = await read_file(doc.path)
    return doc

@clutch.agent()
async def chunker(doc: Document) -> Document:
    """Split into chunks."""
    chunk_size = 500
    doc.chunks = [doc.content[i:i+chunk_size]
                  for i in range(0, len(doc.content), chunk_size)]
    return doc

@clutch.agent()
async def embedder(doc: Document) -> Document:
    """Generate embeddings for each chunk."""
    doc.embeddings = await embedding_api(doc.chunks)
    return doc

@clutch.agent()
async def summarizer(doc: Document) -> Document:
    """Generate document summary."""
    doc.summary = await llm_summarize(doc.content[:2000])
    return doc

@clutch.agent()
async def indexer(doc: Document) -> Document:
    """Store in vector database."""
    await vector_db.upsert(doc)
    raise Terminate(doc)

async def main():
    result = await clutch.run(Document(path="/docs/report.pdf"))
    print(f"Indexed {len(result['chunks'])} chunks")

asyncio.run(main())
```

### Example 2: Support Ticket Triage
Selector strategy for intent-based routing.

```python
import asyncio
from pydantic import BaseModel
from eggai_clutch import Clutch, Strategy, Terminate

class Ticket(BaseModel):
    query: str
    user_id: str
    category: str = ""
    response: str = ""

clutch = Clutch("support", Strategy.SELECTOR)

@clutch.selector()
async def classify(ticket: Ticket) -> str:
    """Route based on query content."""
    query = ticket.query.lower()
    if any(w in query for w in ["refund", "charge", "bill"]):
        return "billing"
    elif any(w in query for w in ["bug", "error", "crash"]):
        return "technical"
    elif any(w in query for w in ["cancel", "delete account"]):
        return "retention"
    return "general"

@clutch.agent()
async def billing(ticket: Ticket) -> Ticket:
    account = await db.get_account(ticket.user_id)
    ticket.response = await llm(f"Billing help: {ticket.query}\nAccount: {account}")
    ticket.category = "billing"
    raise Terminate(ticket)

@clutch.agent()
async def technical(ticket: Ticket) -> Ticket:
    issues = await db.search_known_issues(ticket.query)
    ticket.response = await llm(f"Tech help: {ticket.query}\nKnown issues: {issues}")
    ticket.category = "technical"
    raise Terminate(ticket)

@clutch.agent()
async def retention(ticket: Ticket) -> Ticket:
    await db.create_high_priority_ticket(ticket.query)
    ticket.response = "Escalated to retention team."
    ticket.category = "retention"
    raise Terminate(ticket)

@clutch.agent()
async def general(ticket: Ticket) -> Ticket:
    ticket.response = await llm(f"Help: {ticket.query}")
    ticket.category = "general"
    raise Terminate(ticket)

async def main():
    result = await clutch.run(Ticket(
        query="I was charged twice",
        user_id="user123"
    ))
    print(f"Category: {result['category']}")
    print(f"Response: {result['response']}")

asyncio.run(main())
```

### Example 3: Code Review Pipeline
Sequential pipeline with multiple analysis stages.

```python
import ast
import asyncio
from pydantic import BaseModel
from eggai_clutch import Clutch, Terminate

class CodeReview(BaseModel):
    code: str
    functions: list[str] = []
    lint_issues: list[str] = []
    security_issues: list[str] = []
    ai_review: str = ""

clutch = Clutch("code-review")

@clutch.agent()
async def parser(review: CodeReview) -> CodeReview:
    """Extract code metadata."""
    tree = ast.parse(review.code)
    review.functions = [
        node.name for node in ast.walk(tree)
        if isinstance(node, ast.FunctionDef)
    ]
    return review

@clutch.agent()
async def linter(review: CodeReview) -> CodeReview:
    """Run static analysis."""
    review.lint_issues = await run_pylint(review.code)
    return review

@clutch.agent()
async def security_scanner(review: CodeReview) -> CodeReview:
    """Check for vulnerabilities."""
    review.security_issues = await run_bandit(review.code)
    return review

@clutch.agent()
async def ai_reviewer(review: CodeReview) -> CodeReview:
    """Generate AI review with full context."""
    context = f"""
    Functions: {review.functions}
    Lint issues: {review.lint_issues}
    Security issues: {review.security_issues}
    """
    review.ai_review = await llm(f"Review this code:\n{context}")
    raise Terminate(review)

async def main():
    code = '''
    def calculate_total(items):
        return sum(item["price"] * item["qty"] for item in items)
    '''
    result = await clutch.run(CodeReview(code=code))
    print(result["ai_review"])

asyncio.run(main())
```

### Example 4: Concurrent Task Processing
Process multiple items concurrently using submit().

```python
import asyncio
from eggai_clutch import Clutch, Terminate

clutch = Clutch("processor")

@clutch.agent()
async def process(data):
    result = await heavy_computation(data["item"])
    raise Terminate({"item": data["item"], "result": result})

async def main():
    items = [{"item": f"item_{i}"} for i in range(10)]

    # Submit all tasks concurrently
    tasks = [await clutch.submit(item) for item in items]

    # Wait for all results
    results = await asyncio.gather(*tasks)

    for r in results:
        print(f"{r['item']}: {r['result']}")

asyncio.run(main())
```

### Example 5: Streaming Progress
Monitor pipeline progress in real-time.

```python
import asyncio
from eggai_clutch import Clutch, Terminate

clutch = Clutch("pipeline")

@clutch.agent()
async def step_1(data):
    await asyncio.sleep(1)
    return {"step": 1, **data}

@clutch.agent()
async def step_2(data):
    await asyncio.sleep(1)
    return {"step": 2, **data}

@clutch.agent()
async def step_3(data):
    await asyncio.sleep(1)
    raise Terminate({"step": 3, **data})

async def main():
    async for event in clutch.stream({"start": True}):
        if event.final:
            print(f"Complete: {event.data}")
        else:
            print(f"Progress: {event.step} finished")

asyncio.run(main())
```

## Distributed Mode

Run agents across multiple processes or machines:

```python
from eggai import RedisTransport
from eggai_clutch import Clutch, Terminate

# Create transport
transport = RedisTransport(url="redis://localhost:6379")

# Create clutch with transport
clutch = Clutch("distributed-pipeline", transport=transport)

@clutch.agent()
async def worker(data):
    # This runs on distributed workers
    result = await process(data)
    raise Terminate(result)

async def main():
    # Submit work (routes to workers)
    task = await clutch.submit({"work": "data"})
    result = await task.result(timeout=60.0)

    # Cleanup
    await clutch.stop()
```

Supported transports: InMemory (default), Redis Streams, Kafka.

## API Reference

- [Source: clutch.py](https://github.com/eggai-tech/eggai-clutch/blob/main/eggai_clutch/clutch.py): Clutch, ClutchTask, StepEvent, agent decorator
- [Source: strategy.py](https://github.com/eggai-tech/eggai-clutch/blob/main/eggai_clutch/strategy.py): Strategy enum
- [Source: exceptions.py](https://github.com/eggai-tech/eggai-clutch/blob/main/eggai_clutch/exceptions.py): Terminate, Handover
- [Source: message.py](https://github.com/eggai-tech/eggai-clutch/blob/main/eggai_clutch/message.py): ClutchContext, ClutchState, get_context()

## Documentation

- [README](https://github.com/eggai-tech/eggai-clutch/blob/main/README.md): Full documentation
- [PyPI](https://pypi.org/project/eggai-clutch/): Package info

## Examples

- [rag_pipeline.py](https://github.com/eggai-tech/eggai-clutch/blob/main/examples/rag_pipeline.py): Document processing
- [support_triage.py](https://github.com/eggai-tech/eggai-clutch/blob/main/examples/support_triage.py): Intent routing
- [code_review.py](https://github.com/eggai-tech/eggai-clutch/blob/main/examples/code_review.py): Multi-stage analysis

## Optional

- [Tests](https://github.com/eggai-tech/eggai-clutch/tree/main/tests): Test suite
- [EggAI SDK](https://github.com/eggai-tech/eggai): Distributed transports
- [Demo App](https://github.com/eggai-tech/eggai-clutch-demo): Full RAG app with UI
