Metadata-Version: 2.4
Name: basion-agent
Version: 0.6.0
Summary: Python SDK for Basion AI Agent framework - handles agent registration, message consumption, and streaming responses
Author-email: Basion AI <support@basion.ai>
License: MIT
Project-URL: Homepage, https://github.com/basion-us/basion-agent
Project-URL: Repository, https://github.com/basion-us/basion-agent
Project-URL: Documentation, https://github.com/basion-us/basion-agent#readme
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: grpcio>=1.70.0
Requires-Dist: grpcio-tools>=1.70.0
Requires-Dist: protobuf>=5.29.0
Requires-Dist: requests>=2.31.0
Requires-Dist: aiohttp>=3.9.0
Provides-Extra: dev
Requires-Dist: pytest>=7.4.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.21.0; extra == "dev"
Requires-Dist: pytest-cov>=4.1.0; extra == "dev"
Requires-Dist: pytest-mock>=3.11.0; extra == "dev"
Requires-Dist: respx>=0.20.0; extra == "dev"
Provides-Extra: langgraph
Requires-Dist: langgraph>=1.0.0; extra == "langgraph"
Requires-Dist: langchain-core>=0.3.0; extra == "langgraph"
Provides-Extra: pydantic
Requires-Dist: pydantic-ai>=1.0.0; extra == "pydantic"
Dynamic: license-file

# Basion Agent SDK

Python SDK for building AI agents in the Basion AI platform. Provides agent registration, message handling via Kafka (through Agent Gateway), streaming responses, and integrations with LangGraph and Pydantic AI.

## Overview

The Basion Agent SDK (`basion-agent`) enables developers to build AI agents that integrate with the Basion AI Management platform. Agents register themselves, receive messages through Kafka topics, and stream responses back to users.

### Key Features

- **Agent Registration**: Automatic registration with AI Inventory
- **Message Handling**: Decorator-based handlers with sender filtering
- **Response Streaming**: Chunked responses via Kafka/Centrifugo
- **Structural Streaming**: Rich UI components (Artifacts, Surfaces, TextBlocks, Steppers)
- **Conversation History**: Access to message history via Conversation Store
- **Memory V2 (mem0)**: Long-term memory via mem0.ai Cloud — ingest and semantic search
- **Memory** *(deprecated)*: Semantic search over long-term user and conversation memory
- **Attachments**: Download and process file attachments (images, PDFs, etc.)
- **Knowledge Graph**: Query biomedical knowledge graphs (diseases, proteins, phenotypes)
- **Remote Logging**: Send logs to Loki via the gateway for centralized monitoring
- **LangGraph Integration**: HTTP-based checkpoint saver for LangGraph graphs
- **Pydantic AI Integration**: Persistent message history for Pydantic AI agents
- **CLI**: Run agents with `basion-agent run main:app`
- **Error Handling**: Automatic error responses to users on handler failures

## Architecture

```
┌─────────────────────────────────────────────────────────────────────────────┐
│                           Your Agent Application                            │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│    BasionAgentApp                                                            │
│    ├── register_me() → Agent                                                │
│    │   ├── @on_message decorator                                            │
│    │   ├── streamer() → Streamer                                            │
│    │   │   └── stream_by() → Structural (Artifact, Surface, TextBlock, etc) │
│    │   └── tools → Tools                                                    │
│    │       └── knowledge_graph → KnowledgeGraphTool                         │
│    └── run() → Start consume loop                                           │
│                                                                             │
│    Message Context:                                                         │
│    ├── message.conversation → Conversation (history, metadata)              │
│    ├── message.memory_v2 → MemoryV2 (mem0 ingest + search)                 │
│    ├── message.memory → Memory (semantic search) [deprecated]               │
│    └── message.attachments → List[AttachmentInfo] (file downloads)          │
│                                                                             │
│    Extensions:                                                              │
│    ├── HTTPCheckpointSaver (LangGraph)                                      │
│    └── PydanticAIMessageStore (Pydantic AI)                                 │
│                                                                             │
└──────────────────────────────────┬──────────────────────────────────────────┘
                                   │ gRPC (Kafka) + HTTP (APIs)
                                   ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│                           Agent Gateway                                      │
├─────────────────────────────────────────────────────────────────────────────┤
│  gRPC: AgentStream (bidirectional)      HTTP: /s/{service}/* proxy          │
│  - Auth                                 - /s/ai-inventory/*                 │
│  - Subscribe/Unsubscribe                - /s/conversation-store/*           │
│  - Produce/Consume messages             - /s/ai-memory/*                    │
│                                         - /s/attachment/*                    │
│                                         - /s/memory/* (mem0)                │
│                                         - /s/knowledge-graph/*              │
│                                         - /loki/api/v1/push (logging)       │
└─────────────────┬───────────────────────────────────┬───────────────────────┘
                  │                                   │
                  ▼                                   ▼
           ┌──────────────┐                  ┌────────────────────┐
           │    Kafka     │                  │   AI Inventory /   │
           │  {agent}.inbox │                │ Conversation Store │
           └──────────────┘                  │ AI Memory / KG     │
                                             └────────────────────┘
```

### Message Flow

```mermaid
sequenceDiagram
    participant User
    participant Provider
    participant Router
    participant Gateway as Agent Gateway
    participant Agent as Your Agent
    participant ConvStore as Conversation Store

    User->>Provider: Send message
    Provider->>Router: Kafka: router.inbox
    Router->>Gateway: Kafka: {agent}.inbox
    Gateway->>Agent: gRPC stream: message

    Agent->>ConvStore: Get conversation history
    ConvStore-->>Agent: Message history

    loop Streaming Response
        Agent->>Gateway: gRPC stream: content chunk
        Gateway->>Router: Kafka: router.inbox
        Router->>Provider: Kafka: user.inbox
        Provider->>User: WebSocket: chunk
    end

    Agent->>Gateway: gRPC stream: done=true
```

## Installation

```bash
# Basic installation
pip install basion-agent

# With LangGraph support
pip install basion-agent[langgraph]

# With Pydantic AI support
pip install basion-agent[pydantic]

# Development installation
pip install -e ".[dev]"
```

## Quick Start

```python
from basion_agent import BasionAgentApp

# Initialize the app
app = BasionAgentApp(
    gateway_url="agent-gateway:8080",
    api_key="your-api-key"
)

# Register an agent
agent = app.register_me(
    name="my-assistant",
    about="A helpful AI assistant",
    document="Answers general questions and provides helpful information.",
    representation_name="My Assistant"
)

# Handle messages
@agent.on_message
async def handle_message(message, sender):
    # Access conversation history
    history = await message.conversation.get_history(limit=10)

    # Stream response
    async with agent.streamer(message) as s:
        s.stream("Hello! ")
        s.stream("How can I help you today?")

# Run the agent
app.run()
```

## CLI

Run agents using uvicorn-style import strings:

```bash
# Run 'app' from main.py
basion-agent run main:app

# Run 'app' from main.py (defaults to :app)
basion-agent run main

# Run 'application' from myagent.py
basion-agent run myagent:application

# Show version
basion-agent version
```

Your agent file should define a `BasionAgentApp` instance:

```python
# main.py
app = BasionAgentApp(gateway_url="...", api_key="...")
agent = app.register_me(name="my-agent", ...)

@agent.on_message
async def handle(message, sender):
    ...
```

## Configuration

### Environment Variables

| Variable | Description | Default |
|----------|-------------|---------|
| `GATEWAY_URL` | Agent Gateway endpoint | Required |
| `GATEWAY_API_KEY` | API key for authentication | Required |

### BasionAgentApp Options

```python
app = BasionAgentApp(
    gateway_url="agent-gateway:8080",    # Gateway endpoint
    api_key="key",                        # Authentication key
    heartbeat_interval=60,                # Heartbeat frequency (seconds)
    max_concurrent_tasks=100,             # Max concurrent message handlers
    error_message_template="...",         # Error message sent to users
    secure=False,                         # Use TLS for gRPC and HTTPS for HTTP
    enable_remote_logging=False,          # Send logs to Loki via gateway
    remote_log_level=logging.INFO,        # Min log level for remote logging
    remote_log_batch_size=100,            # Logs per batch
    remote_log_flush_interval=5.0,        # Seconds between flushes
)
```

## API Reference

### BasionAgentApp

Main application class for initializing and running agents.

```python
app = BasionAgentApp(gateway_url, api_key)

# Register an agent
agent = app.register_me(
    name="agent-name",           # Unique identifier (used for routing)
    about="Short description",   # Brief description for agent selection
    document="Full docs...",     # Detailed documentation
    representation_name="Name",  # Display name (optional)
    metadata={"key": "value"},   # Additional metadata (optional)
    category_name="my-category", # Category in kebab-case (optional, auto-created)
    tag_names=["tag-1", "tag-2"],# Tags in kebab-case (optional, auto-created)
    example_prompts=["Ask me anything"],  # Example prompts for users (optional)
    is_experimental=False,       # Mark as experimental (optional)
    force_update=False,          # Bypass content hash check (optional)
    base_url="http://...",       # Base URL for agent's frontend service (optional)
    related_pages=[              # Related pages (optional)
        {"name": "Docs", "endpoint": "/docs"}
    ],
)

# Start consuming messages
app.run()  # Blocks until shutdown
```

### Agent

Handles message registration and response streaming.

```python
# Register message handler (all senders)
@agent.on_message
async def handle(message, sender):
    pass

# Filter by sender
@agent.on_message(senders=["user"])
async def handle_user(message, sender):
    pass

# Exclude sender
@agent.on_message(senders=["~other-agent"])
async def handle_not_other(message, sender):
    pass
```

### Message

Represents an incoming message with conversation context, memory, and attachments.

```python
@agent.on_message
async def handle(message, sender):
    message.content             # Message content
    message.conversation_id     # Conversation ID
    message.user_id             # User ID
    message.metadata            # Optional message metadata (dict)
    message.schema              # Optional message schema (dict)

    # Conversation history
    history = await message.conversation.get_history(limit=10)

    # Memory V2 (mem0) — ingest and search long-term memory
    if message.memory_v2:
        await message.memory_v2.ingest(role="user", content=message.content)
        results = await message.memory_v2.search(query="diagnosis")

    # Memory (deprecated — use memory_v2 instead)
    results = await message.memory.query_about_user("diagnosis", limit=5)

    # Attachments
    if message.has_attachments():
        count = message.get_attachment_count()
        attachments = message.get_attachments()

        # Download first attachment
        data = await message.get_attachment_bytes()
        base64_str = await message.get_attachment_base64()
        buffer = await message.get_attachment_buffer()

        # Download specific attachment by index
        data = await message.get_attachment_bytes_at(1)

        # Download all attachments at once
        all_bytes = await message.get_all_attachment_bytes()
        all_base64 = await message.get_all_attachment_base64()

        # Inspect attachment metadata
        for att in attachments:
            att.filename        # "document.pdf"
            att.content_type    # "application/pdf"
            att.size            # bytes
            att.url             # download URL
            att.file_extension  # "pdf"
            att.file_type       # "pdf"
            att.is_image()      # True/False
            att.is_pdf()        # True/False
```

### Conversation

Access conversation history and metadata.

```python
@agent.on_message
async def handle(message, sender):
    conv = message.conversation

    # Get message history
    history = await conv.get_history(limit=10)
    history = await conv.get_history(role="user", limit=20, offset=0)

    # Get conversation metadata
    metadata = await conv.get_metadata()

    # Get messages where this agent was sender or recipient
    agent_history = await conv.get_agent_history(limit=50)
    agent_history = await conv.get_agent_history(agent_name="other-agent")
```

### Memory V2 (mem0)

Long-term memory powered by [mem0.ai](https://mem0.ai). Automatically ingests messages and provides semantic search across a user's history. Accessed via `message.memory_v2`.

```python
@agent.on_message
async def handle(message, sender):
    mem = message.memory_v2

    # Ingest the user's message into mem0
    await mem.ingest(role="user", content=message.content)

    # Search for relevant memories
    results = await mem.search(query="previous diagnosis")
    for r in results:
        r.get("memory")  # Extracted memory text

    # Ingest assistant response too
    response = "Here is my answer..."
    await mem.ingest(role="assistant", content=response)
```

**Methods:**

| Method | Description |
|--------|-------------|
| `ingest(role, content)` | Ingest a message into mem0. Role is `'user'`, `'assistant'`, or `'system'`. |
| `search(query)` | Semantic search across the user's mem0 memories. Returns a list of results. |

**How it works:** The `MemoryV2Client` sends HTTP requests to the memory service (`/s/memory/*` via the gateway proxy), which wraps the mem0.ai Cloud API. Messages are ingested per-user and per-conversation. Search returns semantically relevant memories extracted by mem0.

### Memory *(Deprecated)*

> **Note:** `message.memory` is deprecated. Use `message.memory_v2` (mem0) instead.

Semantic search over long-term user and conversation memory. Accessed via `message.memory`.

```python
@agent.on_message
async def handle(message, sender):
    mem = message.memory

    # Search user's long-term memory
    results = await mem.query_about_user(
        query="previous diagnosis",
        limit=10,           # Max results (1-100)
        threshold=70,       # Similarity threshold 0-100
        context_messages=2, # Surrounding messages to include (0-20)
    )
    for r in results:
        r.message.content   # Matched message content
        r.score             # Similarity score
        r.context           # List of surrounding MemoryMessage objects

    # Search conversation memory
    results = await mem.query_about_conversation(
        query="what was discussed",
        limit=5,
    )

    # Get user summary (aggregated across all conversations)
    summary = await mem.get_user_summary()
    if summary:
        summary.text           # Summary text
        summary.message_count  # Total messages
        summary.last_updated   # Timestamp
```

### Streamer

Streams response chunks back to the user (or another agent).

```python
@agent.on_message
async def handle(message, sender):
    # Basic streaming (auto-finishes on exit)
    async with agent.streamer(message) as s:
        s.stream("Chunk 1...")
        s.stream("Chunk 2...")

    # Streaming with options
    async with agent.streamer(
        message,
        send_to="user",      # or another agent name
        awaiting=True,        # Set awaiting_route to this agent
    ) as s:
        # Non-persisted content (not saved to DB)
        s.stream("Thinking...", persist=False, event_type="thinking")

        # Persisted content
        s.stream("Here's my response...")

        # write() is an alias for stream()
        s.write("More content...")

        # Set metadata on the message
        s.set_message_metadata({"source": "search"})

        # Set response schema for forms
        s.set_response_schema({
            "type": "object",
            "properties": {
                "name": {"type": "string"}
            }
        })

    # Manual streaming (without context manager)
    s = agent.streamer(message)
    s.stream("Hello...")
    await s.finish()
```

### Structural Streaming

Rich UI components streamed alongside text content. Use `s.stream_by()` to bind a structural component to the streamer.

#### Artifact

Artifacts represent files, images, or embeds that are generated and displayed. Artifact data is persisted to the database.

```python
from basion_agent import Artifact

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        artifact = Artifact()

        # Show progress
        s.stream_by(artifact).generating("Creating chart...", progress=0.5)

        # Complete with result
        s.stream_by(artifact).done(
            url="https://example.com/chart.png",
            type="image",        # image, iframe, document, video, audio, code, link, file
            title="Sales Chart",
            description="Q4 sales data",
            metadata={"width": 800, "height": 600}
        )

        # Or signal an error
        # s.stream_by(artifact).error("Failed to generate chart")

        s.stream("Here's your chart!")
```

#### Surface

Surfaces are interactive embedded components (iframes, widgets). Similar API to Artifact.

```python
from basion_agent import Surface

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        surface = Surface()
        s.stream_by(surface).generating("Loading widget...")
        s.stream_by(surface).done(
            url="https://example.com/calendar",
            type="iframe",
            title="Calendar Widget",
        )
```

#### TextBlock

Collapsible text blocks with streaming title/body and visual variants. TextBlock events are not persisted.

```python
from basion_agent import TextBlock

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        block = TextBlock()

        # Set visual variant: thinking, note, warning, error, success
        s.stream_by(block).set_variant("thinking")

        # Stream title (appends)
        s.stream_by(block).stream_title("Deep ")
        s.stream_by(block).stream_title("Analysis...")

        # Stream body (appends)
        s.stream_by(block).stream_body("Step 1: Checking patterns\n")
        s.stream_by(block).stream_body("Step 2: Validating\n")

        # Replace title/body entirely
        s.stream_by(block).update_title("Analysis Complete")
        s.stream_by(block).update_body("All checks passed.")

        # Mark as done
        s.stream_by(block).done()

        s.stream("Analysis finished!")
```

#### Stepper

Multi-step progress indicators. Stepper events are not persisted.

```python
from basion_agent import Stepper

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        stepper = Stepper(steps=["Fetch", "Process", "Report"])

        s.stream_by(stepper).start_step(0)
        # ... do work ...
        s.stream_by(stepper).complete_step(0)

        s.stream_by(stepper).start_step(1)
        # ... do work ...
        s.stream_by(stepper).complete_step(1)

        # Add a step dynamically
        s.stream_by(stepper).add_step("Verify")

        s.stream_by(stepper).start_step(2)
        s.stream_by(stepper).complete_step(2)

        s.stream_by(stepper).start_step(3)
        # Update label mid-step
        s.stream_by(stepper).update_step_label(3, "Verify (Final)")
        s.stream_by(stepper).complete_step(3)

        # Or signal failure
        # s.stream_by(stepper).fail_step(1, error="Timeout")

        s.stream_by(stepper).done()
        s.stream("All steps complete!")
```

### Knowledge Graph (Tools)

Query biomedical knowledge graphs for diseases, proteins, phenotypes, drugs, and pathways. Accessed via `agent.tools.knowledge_graph`.

```python
@agent.on_message
async def handle(message, sender):
    kg = agent.tools.knowledge_graph

    # Search diseases
    diseases = await kg.search_diseases(name="Huntington", limit=5)
    disease = await kg.get_disease(disease_id=123)

    # Search proteins/genes
    proteins = await kg.search_proteins(symbol="BRCA1", limit=10)

    # Search phenotypes (HPO terms)
    phenotypes = await kg.search_phenotypes(name="seizure", hpo_id="HP:0001250")

    # Search drugs
    drugs = await kg.search_drugs(name="aspirin")

    # Search pathways
    pathways = await kg.search_pathways(name="apoptosis")

    # Find similar diseases (by shared phenotypes)
    similar = await kg.find_similar_diseases("Huntington Disease", limit=10)
    for s in similar:
        s.disease_name       # Disease name
        s.similarity_score   # 0.0 - 1.0
        s.shared_count       # Number of shared phenotypes

    # Find similar diseases (by shared genes)
    similar = await kg.find_similar_diseases_by_genes("Huntington Disease")

    # Get entity connections
    edges = await kg.get_entity_network("BRCA1", "protein")
    for e in edges:
        e.source_id, e.source_type
        e.target_id, e.target_type
        e.relation_type

    # k-hop graph traversal
    subgraph = await kg.k_hop_traversal("BRCA1", "protein", k=2, limit_edges=100)

    # Shortest path between entities
    path = await kg.find_shortest_path(
        start_name="BRCA1", start_type="protein",
        end_name="Breast Cancer", end_type="disease",
        max_hops=5
    )
    for step in path:
        step.node_id, step.node_name, step.node_type, step.relation
```

### Remote Logging (Loki)

Send agent logs to Loki via the gateway for centralized monitoring. Logs are batched and sent in the background.

```python
import logging

app = BasionAgentApp(
    gateway_url="agent-gateway:8080",
    api_key="key",
    enable_remote_logging=True,       # Enable Loki logging
    remote_log_level=logging.INFO,    # Min level (default: INFO)
    remote_log_batch_size=100,        # Logs per batch (default: 100)
    remote_log_flush_interval=5.0,    # Flush every N seconds (default: 5.0)
)

# Then use standard Python logging - it will be sent to Loki automatically
logger = logging.getLogger(__name__)
logger.info("Agent started", extra={"custom_field": "value"})
```

## Extensions

### LangGraph Integration

Use `HTTPCheckpointSaver` to persist LangGraph state via the Conversation Store checkpoint API.

```python
from basion_agent import BasionAgentApp
from basion_agent.extensions.langgraph import HTTPCheckpointSaver
from langgraph.graph import StateGraph

app = BasionAgentApp(gateway_url="...", api_key="...")
checkpointer = HTTPCheckpointSaver(app=app)

# Define your LangGraph
graph = StateGraph(MyState)
# ... add nodes and edges ...
compiled = graph.compile(checkpointer=checkpointer)

agent = app.register_me(name="langgraph-agent", ...)

@agent.on_message
async def handle(message, sender):
    config = {"configurable": {"thread_id": message.conversation_id}}

    async with agent.streamer(message) as s:
        # Graph state persists across messages via checkpointer
        result = await compiled.ainvoke(
            {"messages": [message.content]},
            config
        )
        s.stream(result["messages"][-1])

app.run()
```

### Pydantic AI Integration

Use `PydanticAIMessageStore` to persist Pydantic AI message history.

```python
from basion_agent import BasionAgentApp
from basion_agent.extensions.pydantic_ai import PydanticAIMessageStore
from pydantic_ai import Agent as PydanticAgent

app = BasionAgentApp(gateway_url="...", api_key="...")
store = PydanticAIMessageStore(app=app)

my_llm = PydanticAgent('openai:gpt-4o', system_prompt="You are helpful.")

agent = app.register_me(name="pydantic-agent", ...)

@agent.on_message
async def handle(message, sender):
    # Load previous messages
    history = await store.load(message.conversation_id)

    async with agent.streamer(message) as s:
        async with my_llm.run_stream(
            message.content,
            message_history=history
        ) as result:
            async for chunk in result.stream_text():
                s.stream(chunk)

        # Save updated history
        await store.save(message.conversation_id, result.all_messages())

app.run()
```

## Advanced Usage

### Agent-Initiated (Proactive) Conversations

Agents can proactively start new conversations with users — without waiting for the user to message first. This enables outreach scenarios like health check-ins, reminders, onboarding follow-ups, and scheduled notifications.

#### How It Works

```mermaid
sequenceDiagram
    participant Agent as Your Agent
    participant ConvStore as Conversation Store
    participant Gateway as Agent Gateway
    participant Router
    participant Provider
    participant User

    Agent->>ConvStore: POST /conversations/ (is_new=true, locked_by=agent)
    ConvStore-->>Agent: conversation_id

    loop Streaming First Message
        Agent->>Gateway: gRPC stream: content chunk
        Gateway->>Router: Kafka: router.inbox
        Router->>Provider: Kafka: user.inbox
        Provider->>User: WebSocket: chunk
    end

    Agent->>Gateway: gRPC stream: done=true
    Provider->>ConvStore: Persist message, unlock conversation

    Note over User: User sees new conversation in sidebar (bold)
    User->>Provider: Reply
    Provider->>Router: Kafka: router.inbox
    Router->>Agent: Kafka: {agent}.inbox (via awaiting_route)
    Note over Agent: Normal @on_message handler runs
```

1. **Agent creates conversation** via the conversation store API with `is_new=True`, `current_route`, and `locked_by` set atomically.
2. **Agent streams the first message** through the normal Kafka pipeline (router → user.inbox → provider → Centrifugo → user).
3. **Provider persists** the assistant message and unlocks the conversation on `done=true`.
4. **User sees a new bold conversation** in their sidebar (the `is_new` flag triggers bold rendering in the frontend).
5. **User replies** — if `awaiting=True` was set, the reply is routed to the agent's `@on_message` handler via `awaiting_route`.

#### Basic Usage

```python
agent = app.register_me(name="outreach-agent", ...)

@agent.on_message
async def handle_reply(message, sender):
    # User replied to our proactive conversation
    async with agent.streamer(message) as s:
        s.stream("Thanks for responding to my check-in!")

# Start a proactive conversation (e.g., from a scheduler, webhook, or API endpoint)
async def send_check_in(user_id: str):
    conv_id, streamer = await agent.start_conversation(
        user_id=user_id,
        title="Weekly Health Check-in",
        awaiting=True,
    )
    async with streamer as s:
        s.stream("Hi! It's time for your weekly check-in. How are you feeling today?")
    # When the user replies, handle_reply() runs
```

#### API Reference

```python
async def start_conversation(
    user_id: str,
    title: str = "Agent-initiated conversation",
    awaiting: bool = False,
    response_schema: Optional[Dict[str, Any]] = None,
    message_metadata: Optional[Dict[str, Any]] = None,
    metadata: Optional[Dict[str, Any]] = None,
) -> Tuple[str, Streamer]:
```

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `user_id` | `str` | Required | Target user UUID |
| `title` | `str` | `"Agent-initiated conversation"` | Conversation title shown in sidebar |
| `awaiting` | `bool` | `False` | If True, sets `awaiting_route` so user reply comes back to this agent |
| `response_schema` | `dict` | `None` | JSON Schema for structured user response (renders a form) |
| `message_metadata` | `dict` | `None` | Metadata attached to the streamed message |
| `metadata` | `dict` | `None` | Metadata stored on the conversation itself |

**Returns:** `Tuple[str, Streamer]` — `(conversation_id, streamer)`

#### With Response Schema (Form)

```python
async def request_symptom_report(user_id: str):
    conv_id, streamer = await agent.start_conversation(
        user_id=user_id,
        title="Symptom Report",
        response_schema={
            "type": "object",
            "title": "How are you feeling?",
            "properties": {
                "pain_level": {"type": "integer", "minimum": 0, "maximum": 10, "title": "Pain Level"},
                "notes": {"type": "string", "title": "Additional Notes"},
            },
            "required": ["pain_level"],
        },
    )
    async with streamer as s:
        s.stream("Please fill out this quick symptom report:")
    # User sees a form; their submission arrives in @on_message as JSON
```

#### With Message Metadata

```python
conv_id, streamer = await agent.start_conversation(
    user_id=user_id,
    title="Appointment Reminder",
    message_metadata={"reminder_type": "appointment", "appointment_id": "abc-123"},
    metadata={"source": "scheduler", "scheduled_at": "2024-01-15T09:00:00Z"},
)
async with streamer as s:
    s.stream("Reminder: You have an appointment tomorrow at 10 AM.")
```

#### Key Behaviors

- **`is_new=True`** is always set on agent-initiated conversations, causing them to appear bold in the user's sidebar until opened.
- **`awaiting`** defaults to `False`. Set `awaiting=True` explicitly if you want the user's reply to route back to this agent's `@on_message` handler.
- **`locked_by`** is set at creation to prevent race conditions, then automatically cleared by the provider when `done=true` is received.
- **`current_route`** is preserved by the router when it detects an agent-to-user response (the router's `isAgentToUserResponse` check).
- The streamer returned by `start_conversation()` works identically to `agent.streamer(message)` — supports `stream()`, `write()`, `set_response_schema()`, `set_message_metadata()`, `stream_by()` for structural elements, and the `async with` context manager.

### Inter-Agent Communication

Agents can send messages to other agents using the `send_to` parameter.

```python
@agent.on_message(senders=["user"])
async def handle_user(message, sender):
    # Forward to specialist agent
    async with agent.streamer(message, send_to="specialist-agent") as s:
        s.stream("Forwarding your question to the specialist...")

@agent.on_message(senders=["specialist-agent"])
async def handle_specialist(message, sender):
    # Respond to user with specialist's answer
    async with agent.streamer(message, send_to="user") as s:
        s.stream(f"The specialist says: {message.content}")
```

### Dynamic Forms with Response Schema

Request structured input from users using JSON Schema forms.

```python
@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message, awaiting=True) as s:
        s.stream("Please fill out this form:")
        s.set_response_schema({
            "type": "object",
            "title": "Contact Information",
            "properties": {
                "name": {"type": "string", "title": "Full Name"},
                "email": {"type": "string", "format": "email"},
                "message": {"type": "string", "title": "Message"}
            },
            "required": ["name", "email"]
        })

# When user submits form, message.content will be JSON
@agent.on_message
async def handle_form(message, sender):
    import json
    data = json.loads(message.content)
    name = data.get("name")
    # Process form data...
```

### Error Handling

Customize error handling behavior:

```python
app = BasionAgentApp(
    gateway_url="...",
    api_key="...",
    error_message_template="Sorry, something went wrong. Please try again."
)

agent = app.register_me(...)

# Disable automatic error responses
agent.send_error_responses = False

@agent.on_message
async def handle(message, sender):
    try:
        # Your logic
        pass
    except Exception as e:
        # Custom error handling
        async with agent.streamer(message) as s:
            s.stream(f"I encountered an issue: {str(e)}")
```

## Project Structure

```
ai-framework/
├── src/
│   └── basion_agent/
│       ├── __init__.py              # Package exports
│       ├── app.py                   # BasionAgentApp
│       ├── agent.py                 # Agent class
│       ├── message.py               # Message class (attachments, memory)
│       ├── streamer.py              # Streamer class (stream_by, structural)
│       ├── conversation.py          # Conversation helper (history, metadata)
│       ├── conversation_client.py   # HTTP client for Conversation Store
│       ├── conversation_message.py  # ConversationMessage dataclass
│       ├── memory_v2.py              # Memory V2 context (mem0 ingest + search)
│       ├── memory_v2_client.py      # HTTP client for mem0 memory service
│       ├── memory.py                # Memory context (deprecated)
│       ├── memory_client.py         # HTTP client for AI Memory (deprecated)
│       ├── attachment_client.py     # HTTP client for attachments (download)
│       ├── checkpoint_client.py     # HTTP client for checkpoints
│       ├── agent_state_client.py    # HTTP client for agent state
│       ├── gateway_client.py        # gRPC client for Agent Gateway
│       ├── gateway_pb2.py           # Generated protobuf
│       ├── gateway_pb2_grpc.py      # Generated gRPC stubs
│       ├── heartbeat.py             # Heartbeat manager
│       ├── loki_handler.py          # Loki remote log handler
│       ├── cli.py                   # CLI (basion-agent run)
│       ├── exceptions.py            # Custom exceptions
│       ├── structural/
│       │   ├── __init__.py
│       │   ├── base.py              # StructuralStreamer base class
│       │   ├── artifact.py          # Artifact (image, file, iframe)
│       │   ├── surface.py           # Surface (interactive embeds)
│       │   ├── text_block.py        # TextBlock (collapsible text)
│       │   └── stepper.py           # Stepper (multi-step progress)
│       ├── tools/
│       │   ├── __init__.py
│       │   ├── container.py         # Tools container (lazy init)
│       │   └── knowledge_graph.py   # Knowledge Graph client
│       └── extensions/
│           ├── __init__.py
│           ├── langgraph.py         # LangGraph HTTPCheckpointSaver
│           └── pydantic_ai.py       # Pydantic AI MessageStore
├── pyproject.toml
└── README.md
```

## Development

### Running Tests

```bash
# Install dev dependencies
pip install -e ".[dev]"

# Run tests with coverage
pytest

# Run specific test file
pytest tests/test_agent.py -v
```

### Regenerating Protobuf

If the gateway.proto file changes:

```bash
python -m grpc_tools.protoc \
    -I../../agent-gateway/proto \
    --python_out=src/basion_agent \
    --grpc_python_out=src/basion_agent \
    ../../agent-gateway/proto/gateway.proto
```

## Dependencies

| Package | Purpose |
|---------|---------|
| grpcio | gRPC communication with Agent Gateway |
| grpcio-tools | Protobuf compilation |
| protobuf | Message serialization |
| requests | Sync HTTP for registration |
| aiohttp | Async HTTP for runtime operations |

### Optional Dependencies

| Package | Install Command | Purpose |
|---------|-----------------|---------|
| langgraph | `pip install basion-agent[langgraph]` | LangGraph checkpoint integration |
| pydantic-ai | `pip install basion-agent[pydantic]` | Pydantic AI message history |
