Metadata-Version: 2.4
Name: basion-agent
Version: 0.7.0
Summary: Python SDK for Basion AI Agent framework - handles agent registration, message consumption, and streaming responses
Author-email: Basion AI <support@basion.ai>
License: MIT
Project-URL: Homepage, https://github.com/basion-us/basion-agent
Project-URL: Repository, https://github.com/basion-us/basion-agent
Project-URL: Documentation, https://github.com/basion-us/basion-agent#readme
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: grpcio>=1.70.0
Requires-Dist: grpcio-tools>=1.70.0
Requires-Dist: protobuf>=5.29.0
Requires-Dist: requests>=2.31.0
Requires-Dist: aiohttp>=3.9.0
Provides-Extra: dev
Requires-Dist: pytest>=7.4.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.21.0; extra == "dev"
Requires-Dist: pytest-cov>=4.1.0; extra == "dev"
Requires-Dist: pytest-mock>=3.11.0; extra == "dev"
Requires-Dist: respx>=0.20.0; extra == "dev"
Provides-Extra: langgraph
Requires-Dist: langgraph>=1.0.0; extra == "langgraph"
Requires-Dist: langchain-core>=0.3.0; extra == "langgraph"
Provides-Extra: pydantic
Requires-Dist: pydantic-ai>=1.0.0; extra == "pydantic"
Dynamic: license-file

# Basion Agent SDK

Python SDK for building AI agents on the Basion platform. Agents register themselves, receive messages through Kafka, and stream responses back to users in real time.

## Installation

```bash
pip install basion-agent

# With LangGraph support
pip install basion-agent[langgraph]

# With Pydantic AI support
pip install basion-agent[pydantic]
```

## Quick Start

```python
from basion_agent import BasionAgentApp

app = BasionAgentApp(
    gateway_url="agent-gateway:8080",
    api_key="your-api-key",
)

agent = app.register_me(
    name="rare-disease-assistant",
    about="Answers questions about rare diseases, symptoms, and treatments",
    document="A medical assistant specializing in rare diseases. Can look up conditions, find related diseases, and help patients understand their diagnosis.",
)

@agent.on_message
async def handle(message, sender):
    history = await message.conversation.get_history(limit=10)

    async with agent.streamer(message) as s:
        s.stream(f"You asked: {message.content}\n\n")
        s.stream("Let me look into that for you...")

app.run()
```

## CLI

```bash
# Run your agent
basion-agent run main:app

# Defaults to :app
basion-agent run main

# Custom app name
basion-agent run myagent:application

# Show version
basion-agent version
```

## Configuration

```python
app = BasionAgentApp(
    gateway_url="agent-gateway:8080",       # Required
    api_key="key",                           # Required
    heartbeat_interval=60,                   # Heartbeat frequency in seconds
    max_concurrent_tasks=100,                # Max concurrent message handlers
    error_message_template="...",            # Error message sent to users on failure
    secure=False,                            # TLS for gRPC and HTTPS for HTTP
    enable_remote_logging=False,             # Send logs to Loki via gateway
    remote_log_level=logging.INFO,           # Min log level for remote logging
    remote_log_batch_size=100,               # Logs per batch
    remote_log_flush_interval=5.0,           # Seconds between flushes
)
```

| Environment Variable | Description |
|---|---|
| `GATEWAY_URL` | Agent Gateway endpoint |
| `GATEWAY_API_KEY` | API key for authentication |

## Agent Registration

```python
agent = app.register_me(
    name="rare-disease-assistant",            # Unique identifier (used for Kafka routing)
    about="Rare disease medical assistant",   # Short description for agent selection
    document="Full documentation...",         # Detailed docs used by the router
    representation_name="Dr. Assistant",      # Display name (optional)
    base_url="http://...",                    # Base URL for frontend service (optional)
    metadata={"specialty": "rare-diseases"},  # Additional metadata (optional)
    category_name="medical",                  # Category in kebab-case (optional)
    tag_names=["rare-disease", "genetics"],   # Tags in kebab-case (optional)
    example_prompts=[                         # Example prompts shown to users (optional)
        "What is Huntington disease?",
        "Find diseases similar to Cystic Fibrosis",
    ],
    is_experimental=False,                    # Mark as experimental (optional)
    related_pages=[                           # Related pages (optional)
        {"name": "Resources", "endpoint": "/resources"}
    ],
)
```

## Message Handling

### Basic Handler

```python
@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        s.stream("Hello! How can I help?")
```

Handlers can be `async` or synchronous. Sync handlers run in a thread pool automatically.

### Sender Filtering

```python
# Handle messages from users only
@agent.on_message(senders=["user"])
async def handle_user(message, sender):
    ...

# Handle messages from a specific agent
@agent.on_message(senders=["triage-agent"])
async def handle_triage(message, sender):
    ...

# Exclude a specific sender
@agent.on_message(senders=["~notification-agent"])
async def handle_others(message, sender):
    ...
```

### Error Handling

If a handler throws, the SDK automatically sends an error message to the user. Customize or disable this:

```python
# Custom error message
app = BasionAgentApp(
    ...,
    error_message_template="Sorry, something went wrong. Please try again.",
)

# Or disable per-agent
agent.send_error_responses = False

@agent.on_message
async def handle(message, sender):
    try:
        ...
    except Exception as e:
        async with agent.streamer(message) as s:
            s.stream(f"I ran into an issue: {e}")
```

## Message

The `message` object provides access to content, conversation history, memory, and attachments.

```python
@agent.on_message
async def handle(message, sender):
    message.content              # Message text
    message.conversation_id      # Conversation UUID
    message.user_id              # User UUID
    message.metadata             # Optional dict (from frontend)
    message.schema               # Optional dict (for form responses)
    message.headers              # Raw Kafka headers
    message.conversation         # Conversation helper
    message.memory_v2            # mem0 memory (ingest + search)
    message.memory               # Deprecated memory (use memory_v2)
```

## Streaming Responses

### Basic Streaming

```python
@agent.on_message
async def handle(message, sender):
    # Context manager — auto-finishes on exit
    async with agent.streamer(message) as s:
        s.stream("Looking up information on Huntington disease...\n\n")
        s.stream("Huntington disease is a progressive neurodegenerative disorder...")

    # Or manual control
    s = agent.streamer(message)
    s.stream("Hello!")
    await s.finish()
```

### Streamer Options

```python
# Route response to another agent
async with agent.streamer(message, send_to="specialist-agent") as s:
    s.stream("Forwarding to the specialist...")

# Set awaiting — next user reply routes back to this agent
async with agent.streamer(message, awaiting=True) as s:
    s.stream("What symptoms are you experiencing?")
```

### Non-Persisted Content

Chunks that appear in real-time but don't get saved to conversation history:

```python
async with agent.streamer(message) as s:
    s.stream("Searching knowledge graph...", persist=False, event_type="thinking")
    # ... do work ...
    s.stream("Here are the results:")  # This gets persisted
```

### Message Metadata

Attach metadata to the response message:

```python
async with agent.streamer(message) as s:
    s.set_message_metadata({"source": "knowledge_graph", "confidence": 0.92})
    s.stream("Based on the knowledge graph, ...")
```

### Response Schema (Forms)

Request structured input from users with JSON Schema. The frontend renders a form, and the user's submission arrives as JSON in the next message.

```python
@agent.on_message
async def handle(message, sender):
    # Check if this is a form submission
    if message.schema:
        import json
        data = json.loads(message.content)
        severity = data["severity"]
        async with agent.streamer(message) as s:
            s.stream(f"Thank you. You reported severity level {severity}.")
        return

    # Ask for structured input
    async with agent.streamer(message, awaiting=True) as s:
        s.stream("Please describe your symptoms:")
        s.set_response_schema({
            "type": "object",
            "title": "Symptom Report",
            "properties": {
                "severity": {
                    "type": "integer",
                    "minimum": 1,
                    "maximum": 10,
                    "title": "Pain Severity (1-10)",
                },
                "location": {
                    "type": "string",
                    "title": "Where do you feel pain?",
                },
                "duration": {
                    "type": "string",
                    "title": "How long have you had these symptoms?",
                },
            },
            "required": ["severity"],
        })
```

## Conversation History

```python
@agent.on_message
async def handle(message, sender):
    conv = message.conversation

    # Get message history
    history = await conv.get_history(limit=20)
    history = await conv.get_history(role="user", limit=10, offset=0)

    # Get conversation metadata
    metadata = await conv.get_metadata()

    # Get messages where this agent was sender or recipient
    agent_history = await conv.get_agent_history(limit=50)
    agent_history = await conv.get_agent_history(agent_name="triage-agent")
```

## Memory V2 (mem0)

Long-term memory powered by [mem0.ai](https://mem0.ai). Ingest messages and semantically search across a user's history.

```python
@agent.on_message
async def handle(message, sender):
    mem = message.memory_v2

    # Ingest the user's message
    await mem.ingest(role="user", content=message.content)

    # Search for relevant past context
    results = await mem.search(query="diagnosis history")
    for r in results:
        memory_text = r.get("memory")  # Extracted memory text

    async with agent.streamer(message) as s:
        if results:
            s.stream("Based on what I remember:\n")
            for r in results:
                s.stream(f"- {r.get('memory')}\n")
            s.stream("\n")
        s.stream(f"Regarding your question: {message.content}")

    # Ingest the assistant's response too
    await mem.ingest(role="assistant", content="...")
```

| Method | Description |
|---|---|
| `ingest(role, content)` | Ingest a message. Role: `'user'`, `'assistant'`, or `'system'`. |
| `search(query)` | Semantic search across the user's memories. Returns a list of dicts. |

## Attachments

Download and process file attachments (images, PDFs, etc.).

```python
@agent.on_message
async def handle(message, sender):
    if not message.has_attachments():
        return

    count = message.get_attachment_count()
    attachments = message.get_attachments()

    async with agent.streamer(message) as s:
        s.stream(f"Received {count} file(s).\n\n")

        for i, att in enumerate(attachments):
            s.stream(f"**{att.filename}** ({att.content_type}, {att.size} bytes)\n")

            if att.is_image():
                base64_str = await message.get_attachment_base64_at(i)
                # Send to vision model...

            elif att.is_pdf():
                pdf_bytes = await message.get_attachment_bytes_at(i)
                # Parse PDF...

        # Or download everything at once
        all_bytes = await message.get_all_attachment_bytes()
        all_base64 = await message.get_all_attachment_base64()
```

### Attachment Methods

| Method | Returns | Description |
|---|---|---|
| `has_attachments()` | `bool` | Whether the message has any attachments |
| `get_attachment_count()` | `int` | Number of attachments |
| `get_attachments()` | `List[AttachmentInfo]` | List of attachment metadata |
| `get_attachment_bytes()` | `bytes` | Download first attachment as bytes |
| `get_attachment_base64()` | `str` | Download first attachment as base64 |
| `get_attachment_buffer()` | `BytesIO` | Download first attachment as BytesIO |
| `get_attachment_bytes_at(i)` | `bytes` | Download attachment at index `i` |
| `get_attachment_base64_at(i)` | `str` | Download attachment at index `i` as base64 |
| `get_attachment_buffer_at(i)` | `BytesIO` | Download attachment at index `i` as BytesIO |
| `get_all_attachment_bytes()` | `List[bytes]` | Download all attachments |
| `get_all_attachment_base64()` | `List[str]` | Download all as base64 |
| `get_all_attachment_buffers()` | `List[BytesIO]` | Download all as BytesIO |

### AttachmentInfo Properties

| Property | Type | Example |
|---|---|---|
| `filename` | `str` | `"genetic_report.pdf"` |
| `content_type` | `str` | `"application/pdf"` |
| `size` | `int` | `524288` |
| `url` | `str` | Download URL |
| `file_extension` | `str` | `"pdf"` |
| `file_type` | `str` | `"pdf"` |
| `is_image()` | `bool` | `True` for image MIME types |
| `is_pdf()` | `bool` | `True` for PDF files |

## Structural Streaming

Rich UI components streamed alongside text. Bind a structural component to the streamer with `stream_by()`.

### Artifact

Files, images, or embeds with generation progress. Artifact data is persisted to the database.

```python
from basion_agent import Artifact

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        artifact = Artifact()

        # Show progress
        s.stream_by(artifact).generating("Generating genetic pathway diagram...", progress=0.3)
        # ... do work ...
        s.stream_by(artifact).generating("Rendering...", progress=0.8)

        # Complete with result
        s.stream_by(artifact).done(
            url="https://example.com/pathway-diagram.png",
            type="image",
            title="HTT Gene Pathway",
            description="Huntingtin protein interaction network",
            metadata={"width": 1200, "height": 800},
        )

        # Or signal an error
        # s.stream_by(artifact).error("Failed to generate diagram")

        s.stream("Here's the pathway diagram for the HTT gene.")
```

**Artifact types:** `image`, `iframe`, `document`, `video`, `audio`, `code`, `link`, `file`

### Surface

Interactive embedded components (iframes, widgets).

```python
from basion_agent import Surface

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        surface = Surface()
        s.stream_by(surface).generating("Loading appointment scheduler...")
        s.stream_by(surface).done(
            url="https://cal.com/embed/dr-smith",
            type="iframe",
            title="Schedule Genetic Counseling",
            description="Book a session with a genetic counselor",
        )
        s.stream("You can schedule your genetic counseling session above.")
```

### TextBlock

Collapsible text blocks with streaming title/body and visual variants. TextBlock events are **not** persisted.

```python
from basion_agent import TextBlock

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        block = TextBlock()

        # Set visual variant
        s.stream_by(block).set_variant("thinking")

        # Stream title (appends)
        s.stream_by(block).stream_title("Analyzing ")
        s.stream_by(block).stream_title("symptoms...")

        # Stream body (appends)
        s.stream_by(block).stream_body("Checking symptom database...\n")
        s.stream_by(block).stream_body("Cross-referencing with HPO ontology...\n")
        s.stream_by(block).stream_body("Matching against known phenotypes...\n")

        # Replace title/body entirely
        s.stream_by(block).update_title("Analysis Complete")
        s.stream_by(block).update_body("Found 3 matching conditions.")

        # Mark as done
        s.stream_by(block).done()

        s.stream("Based on the symptoms, here are possible conditions...")
```

**Variants:** `thinking`, `note`, `warning`, `error`, `success`

### Stepper

Multi-step progress indicators. Stepper events are **not** persisted.

```python
from basion_agent import Stepper

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        stepper = Stepper(steps=[
            "Search diseases",
            "Analyze phenotypes",
            "Find similar conditions",
            "Generate report",
        ])

        s.stream_by(stepper).start_step(0)
        diseases = await kg.search_diseases(name="Huntington")
        s.stream_by(stepper).complete_step(0)

        s.stream_by(stepper).start_step(1)
        phenotypes = await kg.search_phenotypes(name="chorea")
        s.stream_by(stepper).complete_step(1)

        s.stream_by(stepper).start_step(2)
        similar = await kg.find_similar_diseases("Huntington Disease")
        s.stream_by(stepper).complete_step(2)

        # Add a step dynamically
        s.stream_by(stepper).add_step("Cross-reference")
        s.stream_by(stepper).start_step(4)
        s.stream_by(stepper).update_step_label(4, "Cross-reference (final)")
        s.stream_by(stepper).complete_step(4)

        s.stream_by(stepper).start_step(3)
        # Or mark a step as failed
        # s.stream_by(stepper).fail_step(3, error="Report generation timed out")
        s.stream_by(stepper).complete_step(3)

        s.stream_by(stepper).done()
        s.stream("Here's your rare disease report...")
```

## Knowledge Graph

Query biomedical knowledge graphs for diseases, proteins, phenotypes, drugs, and pathways. Accessed via `agent.tools.knowledge_graph`.

```python
@agent.on_message
async def handle(message, sender):
    kg = agent.tools.knowledge_graph

    # Search diseases
    diseases = await kg.search_diseases(name="Huntington", limit=5)
    diseases = await kg.search_diseases(orphacode="399", limit=5)
    diseases = await kg.search_diseases(omim="143100")
    disease = await kg.get_disease(disease_id=123)

    # Search proteins/genes
    proteins = await kg.search_proteins(symbol="HTT", limit=10)
    proteins = await kg.search_proteins(ensembl_id="ENSG00000197386")

    # Search phenotypes (HPO terms)
    phenotypes = await kg.search_phenotypes(name="chorea", limit=10)
    phenotypes = await kg.search_phenotypes(hpo_id="HP:0002072")

    # Search drugs
    drugs = await kg.search_drugs(name="tetrabenazine", limit=5)

    # Search pathways
    pathways = await kg.search_pathways(name="apoptosis", limit=5)

    # Find similar diseases by shared phenotypes
    similar = await kg.find_similar_diseases("Huntington Disease", limit=10)
    for s in similar:
        s.disease_name        # "Spinocerebellar Ataxia Type 17"
        s.similarity_score    # 0.85
        s.shared_count        # 12

    # Find similar diseases by shared genes
    by_genes = await kg.find_similar_diseases_by_genes("Huntington Disease", limit=10)

    # Get all connections for an entity
    edges = await kg.get_entity_network("HTT", "protein")
    for e in edges:
        e.source_id, e.source_type
        e.target_id, e.target_type
        e.relation_type

    # k-hop graph traversal (BFS subgraph)
    subgraph = await kg.k_hop_traversal("HTT", "protein", k=2, limit_edges=100)

    # Shortest path between two entities
    path = await kg.find_shortest_path(
        start_name="HTT", start_type="protein",
        end_name="Huntington Disease", end_type="disease",
        max_hops=5,
    )
    for step in path:
        step.node_id, step.node_name, step.node_type, step.relation
```

**Entity types:** `protein`, `phenotype`, `disease`, `pathway`, `drug`, `molecular_function`, `cellular_component`, `biological_process`

## Agent Inventory

Query the AI Inventory service to discover active agents and their capabilities. Accessed via `agent.tools.agent_inventory`.

```python
@agent.on_message
async def handle(message, sender):
    inv = agent.tools.agent_inventory

    # Get all active agents
    agents = await inv.get_active_agents()
    for a in agents:
        a.id                    # Agent UUID
        a.name                  # "rare-disease-assistant"
        a.representation_name   # "Dr. Assistant"
        a.about                 # Short description
        a.document              # Full documentation
        a.example_prompts       # ["What is Huntington disease?", ...]
        a.categories            # [{"id": "...", "name": "medical"}]
        a.tags                  # [{"id": "...", "name": "rare-disease"}]

    # Get agents accessible to a specific user (filtered by role/permissions)
    user_agents = await inv.get_user_agents(user_id="user-uuid")
```

| Method | Returns | Description |
|---|---|---|
| `get_active_agents()` | `List[AgentInfo]` | All agents with `status=active` and `lifeStatus=active` |
| `get_user_agents(user_id)` | `List[AgentInfo]` | Active agents accessible to a specific user |

## Agent-Initiated (Proactive) Conversations

Agents can proactively start new conversations with users — without waiting for them to message first. Use cases: health check-ins, medication reminders, appointment follow-ups, new research alerts.

### How It Works

1. Agent creates a conversation via the conversation store API (`is_new=True`, `current_route`, `locked_by` set atomically)
2. Agent streams the first message through the normal Kafka pipeline (router → provider → Centrifugo → user)
3. Provider persists the assistant message and unlocks the conversation on `done=true`
4. User sees a new bold conversation in their sidebar (`is_new` flag)
5. If `awaiting=True`, the user's reply routes back to the agent's `@on_message` handler

### Basic Usage

```python
@agent.on_message
async def handle_reply(message, sender):
    # User replied to the proactive conversation
    async with agent.streamer(message) as s:
        s.stream("Thanks for responding to the check-in!")

# Trigger from a scheduler, webhook, or API endpoint
async def send_weekly_check_in(user_id: str):
    conv_id, streamer = await agent.start_conversation(
        user_id=user_id,
        title="Weekly Health Check-in",
        awaiting=True,
    )
    async with streamer as s:
        s.stream("Hi! Time for your weekly check-in.\n\n")
        s.stream("How have you been feeling this week?")
```

### With Response Schema

```python
async def request_symptom_log(user_id: str):
    conv_id, streamer = await agent.start_conversation(
        user_id=user_id,
        title="Daily Symptom Log",
        awaiting=True,
        response_schema={
            "type": "object",
            "title": "How are you feeling today?",
            "properties": {
                "pain_level": {"type": "integer", "minimum": 0, "maximum": 10, "title": "Pain Level"},
                "fatigue": {"type": "integer", "minimum": 0, "maximum": 10, "title": "Fatigue Level"},
                "notes": {"type": "string", "title": "Additional Notes"},
            },
            "required": ["pain_level"],
        },
    )
    async with streamer as s:
        s.stream("Please fill out today's symptom log:")
```

### With Metadata

```python
conv_id, streamer = await agent.start_conversation(
    user_id=user_id,
    title="New Research Alert",
    awaiting=True,
    message_metadata={"alert_type": "research", "paper_id": "PMC12345"},
    metadata={"source": "pubmed_monitor", "triggered_at": "2025-01-15T09:00:00Z"},
)
async with streamer as s:
    s.stream("A new paper about your condition was published today...")
```

### API Reference

```python
async def start_conversation(
    user_id: str,
    title: str = "Agent-initiated conversation",
    awaiting: bool = False,
    response_schema: Optional[Dict[str, Any]] = None,
    message_metadata: Optional[Dict[str, Any]] = None,
    metadata: Optional[Dict[str, Any]] = None,
) -> Tuple[str, Streamer]:
```

| Parameter | Type | Default | Description |
|---|---|---|---|
| `user_id` | `str` | Required | Target user UUID |
| `title` | `str` | `"Agent-initiated conversation"` | Conversation title shown in sidebar |
| `awaiting` | `bool` | `False` | If `True`, user reply routes back to this agent |
| `response_schema` | `dict` | `None` | JSON Schema for structured response (renders a form) |
| `message_metadata` | `dict` | `None` | Metadata attached to the streamed message |
| `metadata` | `dict` | `None` | Metadata stored on the conversation itself |

Returns `(conversation_id, streamer)`. The streamer works identically to `agent.streamer()`.

## Inter-Agent Communication

Agents can send messages to other agents using `send_to`.

```python
# Triage agent forwards to specialist
@agent.on_message(senders=["user"])
async def handle_user(message, sender):
    async with agent.streamer(message, send_to="genetics-specialist") as s:
        s.stream(f"Patient asks: {message.content}")

# Receive specialist response, relay to user
@agent.on_message(senders=["genetics-specialist"])
async def handle_specialist(message, sender):
    async with agent.streamer(message, send_to="user") as s:
        s.stream(f"The genetics specialist says:\n\n{message.content}")
```

## Remote Logging (Loki)

Send agent logs to Loki for centralized monitoring.

```python
import logging

app = BasionAgentApp(
    ...,
    enable_remote_logging=True,
    remote_log_level=logging.INFO,
    remote_log_batch_size=100,
    remote_log_flush_interval=5.0,
)

# Standard Python logging — automatically sent to Loki
logger = logging.getLogger(__name__)
logger.info("Processing symptom query", extra={"user_id": "..."})
```

## Extensions

### LangGraph

Persist LangGraph state via the Conversation Store checkpoint API.

```python
from basion_agent import BasionAgentApp
from basion_agent.extensions.langgraph import HTTPCheckpointSaver
from langgraph.graph import StateGraph

app = BasionAgentApp(gateway_url="...", api_key="...")
checkpointer = HTTPCheckpointSaver(app=app)

graph = StateGraph(MyState)
# ... add nodes and edges ...
compiled = graph.compile(checkpointer=checkpointer)

agent = app.register_me(name="langgraph-agent", ...)

@agent.on_message
async def handle(message, sender):
    config = {"configurable": {"thread_id": message.conversation_id}}
    result = await compiled.ainvoke({"messages": [message.content]}, config)

    async with agent.streamer(message) as s:
        s.stream(result["messages"][-1])

app.run()
```

### Pydantic AI

Persist Pydantic AI message history via the Conversation Store.

```python
from basion_agent import BasionAgentApp
from basion_agent.extensions.pydantic_ai import PydanticAIMessageStore
from pydantic_ai import Agent as PydanticAgent

app = BasionAgentApp(gateway_url="...", api_key="...")
store = PydanticAIMessageStore(app=app)
llm = PydanticAgent("openai:gpt-4o", system_prompt="You are a rare disease specialist.")

agent = app.register_me(name="pydantic-agent", ...)

@agent.on_message
async def handle(message, sender):
    history = await store.load(message.conversation_id)

    async with agent.streamer(message) as s:
        async with llm.run_stream(message.content, message_history=history) as result:
            async for chunk in result.stream_text():
                s.stream(chunk)

        await store.save(message.conversation_id, result.all_messages())

app.run()
```

## Full Example: Rare Disease Assistant

A complete agent that uses memory, knowledge graph, attachments, and structural streaming.

```python
import json
import logging
from basion_agent import BasionAgentApp, Stepper, TextBlock

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = BasionAgentApp(
    gateway_url="agent-gateway:8080",
    api_key="your-api-key",
    enable_remote_logging=True,
)

agent = app.register_me(
    name="rare-disease-assistant",
    about="Helps patients understand rare diseases, find similar conditions, and track symptoms",
    document="""
    A comprehensive rare disease assistant that can:
    - Look up diseases, genes, and phenotypes in a biomedical knowledge graph
    - Find similar diseases by shared symptoms or genes
    - Track patient symptoms over time using memory
    - Process genetic test reports (PDF attachments)
    - Proactively check in on patients
    """,
    category_name="medical",
    tag_names=["rare-disease", "genetics", "patient-support"],
    example_prompts=[
        "What is Huntington disease?",
        "Find diseases similar to Cystic Fibrosis",
        "What genes are associated with Marfan syndrome?",
    ],
)


@agent.on_message(senders=["user"])
async def handle_user(message, sender):
    kg = agent.tools.knowledge_graph
    mem = message.memory_v2

    # Ingest the user's message into long-term memory
    if mem:
        await mem.ingest(role="user", content=message.content)

    # Check for form submissions
    if message.schema:
        data = json.loads(message.content)
        async with agent.streamer(message) as s:
            s.stream(f"Thank you for logging your symptoms.\n\n")
            s.stream(f"- Pain level: {data.get('pain_level')}/10\n")
            s.stream(f"- Notes: {data.get('notes', 'None')}\n")
        return

    # Check for attachments (e.g., genetic test PDF)
    if message.has_attachments():
        async with agent.streamer(message) as s:
            for i, att in enumerate(message.get_attachments()):
                if att.is_pdf():
                    pdf_bytes = await message.get_attachment_bytes_at(i)
                    s.stream(f"Processing **{att.filename}** ({att.size} bytes)...\n")
                    # Parse and analyze the PDF...
                elif att.is_image():
                    base64_data = await message.get_attachment_base64_at(i)
                    s.stream(f"Received image **{att.filename}**.\n")
        return

    # Main message handling with knowledge graph
    async with agent.streamer(message) as s:
        # Show thinking process
        thinking = TextBlock()
        s.stream_by(thinking).set_variant("thinking")
        s.stream_by(thinking).stream_title("Analyzing query...")

        # Recall relevant memories
        if mem:
            memories = await mem.search(query=message.content)
            if memories:
                s.stream_by(thinking).stream_body("Found relevant patient history.\n")

        # Search for diseases mentioned in the query
        stepper = Stepper(steps=["Search diseases", "Find connections", "Compile results"])

        s.stream_by(stepper).start_step(0)
        diseases = await kg.search_diseases(name=message.content, limit=5)
        s.stream_by(stepper).complete_step(0)

        if diseases:
            s.stream_by(stepper).start_step(1)
            disease_name = diseases[0].get("name", "")
            similar = await kg.find_similar_diseases(disease_name, limit=5)
            s.stream_by(stepper).complete_step(1)

            s.stream_by(stepper).start_step(2)
            s.stream_by(thinking).done()

            s.stream(f"## {disease_name}\n\n")

            if similar:
                s.stream("### Similar Conditions\n\n")
                for sim in similar:
                    score = int(sim.similarity_score * 100)
                    s.stream(f"- **{sim.disease_name}** ({score}% similar, "
                             f"{sim.shared_count} shared phenotypes)\n")

            s.stream_by(stepper).complete_step(2)
        else:
            s.stream_by(thinking).done()
            s.stream(f"I couldn't find a disease matching \"{message.content}\". "
                     f"Try searching for a specific disease name.")
            s.stream_by(stepper).complete_step(0)

        s.stream_by(stepper).done()


# Proactive check-in (called from a scheduler or API)
async def daily_check_in(user_id: str):
    conv_id, streamer = await agent.start_conversation(
        user_id=user_id,
        title="Daily Symptom Check-in",
        awaiting=True,
        response_schema={
            "type": "object",
            "title": "How are you feeling today?",
            "properties": {
                "pain_level": {"type": "integer", "minimum": 0, "maximum": 10, "title": "Pain Level"},
                "notes": {"type": "string", "title": "Any additional notes?"},
            },
            "required": ["pain_level"],
        },
    )
    async with streamer as s:
        s.stream("Good morning! Time for your daily check-in.\n\n")
        s.stream("Please fill out the form below:")


app.run()
```

## Architecture

```
┌─────────────────────────────────────────────────────────────────────────────┐
│                         Your Agent Application                              │
├─────────────────────────────────────────────────────────────────────────────┤
│  BasionAgentApp                                                             │
│  ├── register_me() → Agent                                                  │
│  │   ├── @on_message(senders=[...])                                         │
│  │   ├── streamer(message) → Streamer                                       │
│  │   │   ├── stream() / write()                                             │
│  │   │   ├── set_response_schema() / set_message_metadata()                 │
│  │   │   └── stream_by() → Artifact, Surface, TextBlock, Stepper            │
│  │   ├── start_conversation() → (conv_id, Streamer)                         │
│  │   └── tools                                                              │
│  │       ├── .knowledge_graph → KnowledgeGraphTool                          │
│  │       └── .agent_inventory → AgentInventoryTool                          │
│  └── run()                                                                  │
│                                                                             │
│  Message Context:                                                           │
│  ├── message.conversation → Conversation (history, metadata)                │
│  ├── message.memory_v2 → MemoryV2 (mem0: ingest + search)                  │
│  ├── message.memory → Memory (deprecated)                                   │
│  └── message.attachments → List[AttachmentInfo]                             │
└──────────────────────────────┬──────────────────────────────────────────────┘
                               │ gRPC (Kafka) + HTTP (APIs)
                               ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│                           Agent Gateway                                      │
├─────────────────────────────────────────────────────────────────────────────┤
│  gRPC: AgentStream (bidirectional)      HTTP: /s/{service}/* proxy          │
│  - Auth + Subscribe/Unsubscribe         - /s/ai-inventory/*                 │
│  - Produce/Consume messages             - /s/conversation-store/*           │
│                                         - /s/memory/* (mem0)                │
│                                         - /s/knowledge-graph/*              │
│                                         - /s/attachment/*                   │
│                                         - /loki/api/v1/push (logging)       │
└─────────────────┬───────────────────────────────────┬───────────────────────┘
                  │                                   │
                  ▼                                   ▼
           ┌──────────────┐                  ┌────────────────────┐
           │    Kafka      │                  │   AI Inventory /   │
           │ {agent}.inbox │                  │ Conversation Store │
           └──────────────┘                  │ Memory / KG        │
                                             └────────────────────┘
```

### Message Flow

```
User → Provider → Kafka: router.inbox → Router → Kafka: {agent}.inbox → Gateway → Agent
Agent → Gateway → Kafka: router.inbox → Router → Kafka: user.inbox → Provider → WebSocket → User
```

## Development

```bash
# Install dev dependencies
pip install -e ".[dev]"

# Run tests with coverage
pytest

# Run specific tests
pytest tests/test_agent.py -v
pytest tests/test_streamer.py -v
```

## Dependencies

| Package | Purpose |
|---|---|
| `grpcio` | gRPC communication with Agent Gateway |
| `grpcio-tools` | Protobuf compilation |
| `protobuf` | Message serialization |
| `requests` | Sync HTTP for registration |
| `aiohttp` | Async HTTP for runtime operations |

### Optional

| Extra | Install | Purpose |
|---|---|---|
| `langgraph` | `pip install basion-agent[langgraph]` | LangGraph checkpoint integration |
| `pydantic` | `pip install basion-agent[pydantic]` | Pydantic AI message history |
