Metadata-Version: 2.4
Name: health-universe-a2a
Version: 0.5.1
Summary: Python SDK for building Health Universe A2A-compliant agents
Project-URL: Homepage, https://github.com/Health-Universe/healthuniverse-a2a-sdk-python
Project-URL: Documentation, https://docs.healthuniverse.com/a2a-sdk
Project-URL: Repository, https://github.com/Health-Universe/healthuniverse-a2a-sdk-python
Project-URL: Issues, https://github.com/Health-Universe/healthuniverse-a2a-sdk-python/issues
Author-email: Health Universe <support@healthuniverse.com>
License: MIT
License-File: LICENSE
Keywords: a2a,agents,ai,health-universe
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Python: >=3.10
Requires-Dist: a2a-sdk>=0.1.0
Requires-Dist: fastapi>=0.109.0
Requires-Dist: httpx>=0.27.0
Requires-Dist: langfuse>=4.0.0
Requires-Dist: openai>=1.0.0
Requires-Dist: opentelemetry-api>=1.20.0
Requires-Dist: opentelemetry-instrumentation-anthropic
Requires-Dist: opentelemetry-instrumentation-httpx
Requires-Dist: opentelemetry-instrumentation-openai
Requires-Dist: opentelemetry-sdk>=1.20.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: sse-starlette>=2.0.0
Requires-Dist: starlette>=0.35.0
Requires-Dist: uvicorn>=0.27.0
Provides-Extra: dev
Requires-Dist: anthropic>=0.80.0; extra == 'dev'
Requires-Dist: mypy>=1.8.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23.0; extra == 'dev'
Requires-Dist: pytest-cov>=4.1.0; extra == 'dev'
Requires-Dist: pytest>=8.0.0; extra == 'dev'
Requires-Dist: ruff>=0.3.0; extra == 'dev'
Provides-Extra: docs
Requires-Dist: mkdocs-material>=9.5.0; extra == 'docs'
Requires-Dist: mkdocs>=1.5.0; extra == 'docs'
Requires-Dist: mkdocstrings[python]>=0.24.0; extra == 'docs'
Description-Content-Type: text/markdown

# Health Universe A2A SDK for Python

[![Test](https://github.com/Health-Universe/healthuniverse-a2a-sdk-python/actions/workflows/test.yml/badge.svg)](https://github.com/Health-Universe/healthuniverse-a2a-sdk-python/actions/workflows/test.yml)

A simple, batteries-included Python SDK for building A2A-compliant agents for the Health Universe platform.

## Features

- **Simple API**: Just implement 3 methods to create an agent
- **Document Operations**: Built-in support for reading, writing, and searching files
- **Search**: Full-text and semantic search across thread documents
- **Multi-Agent Support**: Run multiple agents with inter-agent communication
- **Sub-Agents**: Lightweight `SubAgent` class for fast inline agent-to-agent calls
- **Progress Updates**: Built-in support for progress tracking and artifacts
- **Validation**: Pre-validate messages before processing
- **Lifecycle Hooks**: Customize behavior at key points
- **Local Development**: Test agents against local files without the HU backend
- **Health Universe Integration**: Works seamlessly with HU platform

## Installation

```bash
uv pip install health-universe-a2a
```

For development:

```bash
git clone https://github.com/Health-Universe/healthuniverse-a2a-sdk-python.git
cd healthuniverse-a2a-sdk-python
uv pip install -e ".[dev]"
```

## Quick Start

```python
from health_universe_a2a import Agent, AgentContext

class SymptomClassifierAgent(Agent):
    def get_agent_name(self) -> str:
        return "Symptom Classifier"

    def get_agent_description(self) -> str:
        return "Classifies symptoms into medical categories"

    async def process_message(self, message: str, context: AgentContext) -> str:
        await context.update_progress("Analyzing symptoms...", 0.5)

        # Your classification logic here
        category = classify_symptoms(message)

        return f"Classification: {category}"

if __name__ == "__main__":
    SymptomClassifierAgent().serve()
```

## Working with Documents

The SDK provides a `DocumentClient` for reading and writing files in the Health Universe platform:

```python
from health_universe_a2a import Agent, AgentContext

class DocumentAnalyzerAgent(Agent):
    def get_agent_name(self) -> str:
        return "Document Analyzer"

    def get_agent_description(self) -> str:
        return "Analyzes clinical documents"

    async def process_message(self, message: str, context: AgentContext) -> str:
        # List all documents in the thread
        documents = await context.document_client.list_documents()

        # Filter documents by name
        protocols = await context.document_client.filter_by_name("protocol")

        # Download and read a document
        content = await context.document_client.download_text(documents[0].id)

        # Write results back
        await context.document_client.write(
            name="Analysis Results",
            content='{"result": "analysis complete"}',
            filename="analysis_results.json",
        )

        return f"Analyzed {len(documents)} documents"
```

### Searching Documents

The SDK supports full-text and semantic (vector) search across thread documents:

```python
# Full-text search
results = await context.document_client.search("blood pressure", limit=5)
for result in results:
    print(f"{result.document_name}: {result.content}")

# Semantic search (vector similarity)
results = await context.document_client.semantic_search(
    "patient vitals",
    max_results=5,
    similarity_threshold=0.4,
)
for result in results:
    print(f"{result.document_name} (score: {result.similarity}): {result.content}")
```

### Document Processing Status

Wait for platform-side document extraction and embedding before searching:

```python
# Wait for all documents to be ready (extracted + embedded)
statuses = await context.document_client.wait_for_ready(timeout=120.0)

# Check individual document status
status = await context.document_client.get_processing_status(doc.id)
if status.is_ready:
    text = await context.document_client.download_extracted(doc.id)
```

## Core Concepts

### Agent Context

Your `process_message` method receives an `AgentContext` with helper methods:

```python
async def process_message(self, message: str, context: AgentContext) -> str:
    # Send progress updates
    await context.update_progress("Working...", 0.5)

    # Add artifacts (files generated by the agent)
    # Prefer markdown format - the platform has markdown WYSIWYG support
    await context.add_artifact(
        name="Results",
        content=markdown_report,
        data_type="text/markdown"
    )

    # Access metadata
    user_id = context.user_id
    thread_id = context.thread_id

    # Access documents API
    docs = await context.document_client.list_documents()

    return "Done!"
```

> **Note: Automatic Terminal Status**
>
> The SDK automatically sends a terminal status (completed or failed) when `process_message()` returns or raises an exception. This ensures the Navigator progress bar always completes properly.

> **Tip: Prefer Markdown for Artifacts**
>
> When generating artifacts, prefer `text/markdown` as the data type. The Health Universe platform includes a markdown WYSIWYG editor, so users can view and edit markdown artifacts directly in the browser.

### Validation

Validate messages before processing:

```python
from health_universe_a2a import ValidationAccepted, ValidationRejected

async def validate_message(self, message: str, metadata: dict) -> ValidationAccepted | ValidationRejected:
    if len(message) < 10:
        return ValidationRejected(reason="Message too short (min 10 chars)")

    return ValidationAccepted(estimated_duration_seconds=60)
```

### Lifecycle Hooks

Customize behavior at key points:

```python
async def on_startup(self) -> None:
    """Called when agent starts up"""
    self.model = await load_model()

async def on_shutdown(self) -> None:
    """Called when agent shuts down"""
    await self.model.unload()

async def on_task_start(self, message: str, context: AgentContext) -> None:
    """Called before processing"""
    self.logger.info(f"Starting task for {context.user_id}")

async def on_task_complete(self, message: str, result: str, context: AgentContext) -> None:
    """Called after successful processing"""
    await self.metrics.increment("tasks_completed")

async def on_task_error(self, message: str, error: Exception, context: AgentContext) -> str | None:
    """Called on error - return custom error message or None for default"""
    if isinstance(error, TimeoutError):
        return "Task timed out. Try a smaller request."
    return None
```

### Configuration Methods

Customize agent behavior:

```python
def get_agent_version(self) -> str:
    """Version string (default: "1.0.0")"""
    return "2.1.0"

def get_max_duration_seconds(self) -> int:
    """Max duration hint (default: 3600)"""
    return 7200  # 2 hours

def get_supported_input_formats(self) -> list[str]:
    """Supported input MIME types"""
    return ["text/plain", "application/json"]

def get_supported_output_formats(self) -> list[str]:
    """Supported output MIME types"""
    return ["text/plain", "application/json"]
```

## Local Development

The SDK includes a `LocalDocumentClient` and `create_local_context()` helper so you can test agents against local files without the Health Universe backend.

### How it works

Agent code uses `context.document_client` for all file operations. In production the SDK injects a NestJS/S3-backed `DocumentClient`; locally, `create_local_context()` injects a filesystem-backed `LocalDocumentClient`. Both implement `DocumentClientBase`, so agent code never branches on the environment.

```python
from health_universe_a2a import Agent, AgentContext, create_local_context

class MyAgent(Agent):
    # ... get_agent_name, get_agent_description ...

    async def process_message(self, message: str, context: AgentContext) -> str:
        # Works identically in local and production modes
        docs = await context.document_client.list_documents(role="source")
        for doc in docs:
            content = await context.document_client.download_text(doc.id)
            # ... process content ...

        await context.document_client.write(
            "Results", '{"score": 0.95}', filename="results.json"
        )
        return "Done"
```

### Directory layout

```
test_data/
    source/       # Input files (role="source", document_type="user_upload")
    artifact/     # Pre-seeded + agent-written outputs (role="artifact", document_type="agent_output")
```

- `source/` is created automatically if it doesn't exist
- `artifact/` is the default `output_dir` — files written via `document_client.write()` go here
- You can override `output_dir` to write artifacts elsewhere

### Running locally

```python
import asyncio
from health_universe_a2a import create_local_context

async def main():
    # Point at a directory with source/ subdirectory
    context = create_local_context(data_dir="./test_data")

    agent = MyAgent()
    result = await agent.process_message("analyze", context)
    print(result)

asyncio.run(main())
```

### Listing and filtering documents

```python
# List only input files
source_docs = await context.document_client.list_documents(role="source")

# List only artifacts (pre-seeded + agent-written)
artifact_docs = await context.document_client.list_documents(role="artifact")

# List all (default)
all_docs = await context.document_client.list_documents()
```

- Progress updates are logged to stdout instead of POSTed to the backend

See [examples/local_dev_example.py](examples/local_dev_example.py) for a complete working example.

## Examples

See the `examples/` directory for complete working examples:

- **[simple_agent.py](examples/simple_agent.py)**: Basic echo agent
- **[simple_async_agent.py](examples/simple_async_agent.py)**: File processor with progress updates
- **[advanced_agent.py](examples/advanced_agent.py)**: Validation, lifecycle hooks, and artifacts
- **[complex_async_agent.py](examples/complex_async_agent.py)**: Multi-feature async agent
- **[medical_classifier.py](examples/medical_classifier.py)**: Medical document classification
- **[document_inventory.py](examples/document_inventory.py)**: List and inspect thread documents
- **[protocol_analyzer.py](examples/protocol_analyzer.py)**: Search, download, and analyze documents
- **[physician_followup_agent.py](examples/physician_followup_agent.py)**: SOAP note analysis with OpenAI
- **[inter_agent_example.py](examples/inter_agent_example.py)**: Orchestrator calling sub-agents
- **[multi_agent_orchestration.py](examples/multi_agent_orchestration.py)**: Multi-agent architecture
- **[local_dev_example.py](examples/local_dev_example.py)**: Local testing with filesystem-backed documents

## Sub-Agents

For lightweight, fast agent-to-agent calls (under ~30 seconds) that don't need background job infrastructure, use `SubAgent`:

```python
from health_universe_a2a import SubAgent, SubAgentContext

class SummarizerSubAgent(SubAgent):
    def get_agent_name(self) -> str:
        return "Summarizer"

    def get_agent_description(self) -> str:
        return "Summarizes text quickly"

    async def process_message(self, message: str, context: SubAgentContext) -> str:
        return summarize(message)
```

Unlike `Agent`, a `SubAgent` returns its result directly in the HTTP response with no SSE streaming or background job lifecycle. Use it for inline helper agents that are called by an orchestrator.

## Inter-Agent Communication

Call other A2A-compliant agents from your agent:

```python
from health_universe_a2a import Agent, AgentContext

class OrchestratorAgent(Agent):
    async def process_message(self, message: str, context: AgentContext) -> str:
        # Call with text message
        preprocessor_result = await self.call_agent(
            "/preprocessor",
            message,
            context,
        )

        # Call with structured data (dict or list)
        analysis_result = await self.call_agent(
            "/analyzer",
            {"data": preprocessor_result, "mode": "detailed"},
            context,
        )

        return analysis_result
```

**Agent Identifier Formats:**

1. **Local agent path**: `/agent-name` - Uses `LOCAL_AGENT_BASE_URL` (default: `http://localhost:8501`)
2. **Direct URL**: `https://...` - Calls directly with HTTPS
3. **Registry name**: `agent-name` - Looks up in `AGENT_REGISTRY` environment variable

## Running Your Agent

### Built-in HTTP Server

```python
if __name__ == "__main__":
    agent = MyAgent()
    agent.serve()  # Starts server on http://0.0.0.0:8000
```

The server automatically provides:
- **Agent card endpoint**: `GET /.well-known/agent-card.json`
- **JSON-RPC endpoint**: `POST /` (method: "message/send")
- **Health check**: `GET /health`

### Server Configuration

```python
# Via environment variables
# HOST=0.0.0.0 PORT=8080 python my_agent.py

# Via method parameters
agent.serve(host="0.0.0.0", port=8080, reload=True)
```

### Multi-Agent Server

Run multiple agents in a single server:

```python
from health_universe_a2a import serve_multi_agents

serve_multi_agents({
    "/orchestrator": OrchestratorAgent(),
    "/analyzer": AnalyzerAgent(),
    "/reader": ReaderAgent(),
}, port=8501)
```

## Development

### Setup

```bash
git clone https://github.com/Health-Universe/healthuniverse-a2a-sdk-python
cd healthuniverse-a2a-sdk-python
uv pip install -e ".[dev]"
```

### Testing

```bash
uv run pytest
```

### Linting and Formatting

```bash
uv run ruff check src/
uv run ruff format src/
uv run mypy src/
```

## Requirements

- Python 3.10+
- httpx >= 0.27.0
- pydantic >= 2.0.0
- a2a-sdk >= 0.1.0
- fastapi >= 0.109.0
- uvicorn >= 0.27.0
- openai >= 1.0.0

## Support

- [Documentation](https://docs.healthuniverse.com/a2a-sdk)
- [Issue Tracker](https://github.com/Health-Universe/healthuniverse-a2a-sdk-python/issues)
- [Email Support](mailto:support@healthuniverse.com)

## Links

- [Health Universe](https://healthuniverse.com)
