Metadata-Version: 2.4
Name: fast_a2a_app
Version: 0.6.2
Summary: fast_a2a_app — Drop-in A2A server and chat UI for any AI agent
Project-URL: Homepage, https://github.com/rembli/fast_a2a_app
Project-URL: Repository, https://github.com/rembli/fast_a2a_app
Project-URL: Issues, https://github.com/rembli/fast_a2a_app/issues
Author-email: Georg Boegerl <georg.boegerl@web.de>
License: MIT
Keywords: a2a,agent,ai,chat,fastapi,llm,server
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Internet :: WWW/HTTP :: HTTP Servers
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Python: <4.0,>=3.12
Requires-Dist: a2a-sdk>=1.0.0
Requires-Dist: agent-sandbox<0.0.31,>=0.0.30
Requires-Dist: anthropic<1.0.0,>=0.40.0
Requires-Dist: asyncpg>=0.30.0
Requires-Dist: azure-identity<2.0.0,>=1.25.3
Requires-Dist: dbos-argus<0.0.28,>=0.0.27
Requires-Dist: dbos<3.0.0,>=2.0.0
Requires-Dist: fastapi>=0.115.0
Requires-Dist: folium<0.21.0,>=0.20.0
Requires-Dist: greenlet<4.0.0,>=3.5.0
Requires-Dist: lxml<7.0.0,>=6.1.0
Requires-Dist: motor>=3.5.0
Requires-Dist: openai<3.0.0,>=2.36.0
Requires-Dist: pydantic-ai<2.0.0,>=1.93.0
Requires-Dist: python-pptx<2.0.0,>=1.0.2
Requires-Dist: pyyaml<7.0.0,>=6.0.3
Requires-Dist: redis>=7.4.0
Requires-Dist: sse-starlette>=2.1.3
Requires-Dist: starlette>=0.41.0
Requires-Dist: uvicorn[standard]>=0.34.0
Requires-Dist: waterfall-log<0.2.0,>=0.1.7
Description-Content-Type: text/markdown

# fast_a2a_app

**Drop-in A2A server and chat UI for any FastAPI application running AI agents — installable from PyPI.**

```bash
pip install fast_a2a_app
```

---

## Why fast_a2a_app

The [Agent2Agent (A2A) protocol](https://a2a-protocol.org/) is HTTP for AI agents — a shared contract that lets any agent talk to any client (chat UI, orchestrator, another agent) across companies and frameworks. Turning a Python coroutine into a spec-compliant A2A server is a lot of plumbing: JSON-RPC routes, SSE streaming, task lifecycle, cross-instance cancel, agent-card discovery, multi-turn history. fast_a2a_app does it for you, mounted cleanly into the FastAPI app you already run.

- **Mount, don't replace.** Starlette app you mount at any path prefix. Auth, middleware, CORS, observability — all yours, unchanged.
- **Framework-agnostic.** No dependency on Pydantic AI, LangChain, or any agent runtime. Wrap any `async (str) -> str` (or async generator) and you're done.
- **Batteries-included chat UI.** Self-contained browser interface — no build step, no npm. Markdown, tables, maps, clickable suggestions, file uploads, image previews, fullscreen viewer.
- **Typed-artifact widgets you can extend.** Drop a `<TAG>.py` + `<TAG>.js` pair to add a new chat widget; built-ins ship `TABLE`, `PROMPT_SUGGESTIONS`, and `MAP` (Leaflet).
- **Real protocol, not a mock.** Streaming SSE, multi-turn history, cross-instance cancel, reload recovery, agent-card discovery — built on `a2a-sdk` 1.0.x.

---

## 60-second quickstart

One file, three lines of glue — and you get a fully spec-compliant streaming A2A server with a built-in chat UI on top of an Azure OpenAI chat-completions call:

```python
# main.py
import os
from collections.abc import AsyncIterable

from fastapi import FastAPI
from a2a.types import AgentCapabilities, AgentCard, AgentInterface
from fast_a2a_app import a2a_ui, build_a2a_app, build_stream_invoke

from azure.identity.aio import AzureCliCredential, get_bearer_token_provider
from openai import AsyncOpenAI


# Azure OpenAI client — bearer token from `az login` (no API key needed).
client = AsyncOpenAI(
    base_url=f"{os.environ['AZURE_AI_BASE_URL'].rstrip('/')}/openai/v1",
    api_key=get_bearer_token_provider(AzureCliCredential(), "https://ai.azure.com/.default"),
)

# Your agent: any async generator yielding text chunks.
async def stream_chat(prompt: str) -> AsyncIterable[str]:
    stream = await client.chat.completions.create(
        model=os.environ.get("AZURE_AI_DEPLOYMENT_NAME", "gpt-4o"),
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    )
    async for chunk in stream:
        if chunk.choices and (text := chunk.choices[0].delta.content):
            yield text

# A2A agent card — public metadata served at /a2a/.well-known/agent-card.json
agent_card = AgentCard(
    name="Chat",
    description="Streaming chat agent",
    version="1.0.0",
    supported_interfaces=[
        AgentInterface(url="http://localhost:8000/a2a/", protocol_binding="JSONRPC")
    ],
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=["text"],
    default_output_modes=["text"],
)

# Mount the A2A protocol server and the chat UI into your FastAPI app.
app = FastAPI()
app.mount(
    "/a2a",
    build_a2a_app(agent_card=agent_card, stream_invoke=build_stream_invoke(stream_chat)),
)
app.mount("/", a2a_ui)
```

```bash
pip install fast_a2a_app openai azure-identity
az login                                         # AzureCliCredential
export AZURE_AI_BASE_URL=https://<your-resource>.openai.azure.com
export AZURE_AI_DEPLOYMENT_NAME=gpt-4o
uvicorn main:app --reload
```

No Docker needed for local development — the default in-process `MemoryTaskStore` keeps task state in RAM. For multi-process / cross-instance deployments, pass `task_store=RedisTaskStore.from_url(REDIS_URL)` (or a `MongoTaskStore` / `PostgresTaskStore`) to `build_a2a_app`.

Open <http://localhost:8000/> — you're chatting.

---

## API

Every public symbol exported from `fast_a2a_app`:

```python
from fast_a2a_app import (
    # Server
    build_a2a_app, build_invoke, build_stream_invoke,
    # UI
    a2a_ui, build_a2a_ui,
    # Embedded artifact primitives
    text_artifact, data_artifact, file_artifact, image_artifact,
    # Specialised artifacts (typed `_type` envelopes)
    table_artifact, prompt_suggestions_artifact, map_artifact,
    # Typed-artifact registry
    ArtifactType, ArtifactTypeRegistry, artifact_types,
    # Prompt helpers
    get_user_input, get_task_history, format_history,
    # Progress
    report_progress,
    # Storage / executor (lower-level)
    A2ATaskStore,
    MemoryTaskStore, RedisTaskStore, MongoTaskStore, PostgresTaskStore,
    ConfigurableAgentExecutor, ContextAwareRequestContextBuilder,
)
```

### `build_a2a_app(...)`

Assembles a Starlette ASGI app implementing the A2A protocol. Mount it at any path prefix.

| Parameter | Type | Default | Description |
|---|---|---|---|
| `agent_card` | `AgentCard` | required | Pre-built A2A agent card (name, description, version, `supported_interfaces`, skills, capabilities) |
| `invoke` | `Callable \| None` | `None` | Non-streaming callable — wrap with `build_invoke()` |
| `stream_invoke` | `Callable \| None` | `None` | Streaming callable — wrap with `build_stream_invoke()` |
| `system_prompt` | `str \| None` | `None` | Prepended to every prompt before history and user input |
| `history_max_lines` | `int` | `12` | Number of prior conversation lines to inject; `0` disables history |
| `prompt_builder` | `Callable \| None` | auto | Custom `(RequestContext) -> str`; overrides `system_prompt` and `history_max_lines` |
| `on_task_start` | `Callable[[str], Awaitable] \| None` | `None` | Called before each task — useful for metrics or per-task locks |
| `on_task_cancel` | `Callable[[str, str], Awaitable] \| None` | `None` | Called on cancel with `(context_id, task_id)` |
| `task_store` | `A2ATaskStore \| None` | `MemoryTaskStore()` | Pass `RedisTaskStore.from_url(...)` / `MongoTaskStore.from_uri(...)` / `PostgresTaskStore.from_dsn(...)` for multi-process deployments |
| `debug` | `bool` | `False` | Include exception details in failure messages and surface them in the UI |

### `build_invoke(run)` / `build_stream_invoke(run)`

Wraps any of these shapes as an A2A invoke. The framework inspects your function's signature with `inspect.signature` and forwards the `RequestContext` only when you declare a second positional parameter.

```python
# Non-streaming
async def fn(prompt: str) -> str | Artifact: ...
async def fn(prompt: str, context: RequestContext) -> str | Artifact: ...

# Streaming
async def fn(prompt: str) -> AsyncIterable[str | Artifact]: ...
async def fn(prompt: str, context: RequestContext) -> AsyncIterable[str | Artifact]: ...
```

Streaming yields can mix plain strings (streamed as text deltas into one bubble) and full `Artifact` objects (each rendered as its own bubble). `build_stream_invoke` also sets up the `report_progress()` ContextVar so live progress updates work out of the box.

```python
from fast_a2a_app import build_a2a_app, build_invoke, text_artifact

async def echo(prompt: str) -> str:
    return f"echo: {prompt}"

app.mount("/a2a", build_a2a_app(agent_card=card, invoke=build_invoke(echo)))
```

### `report_progress(message)`

Pushes a status string to the chat UI spinner. Has no effect outside a streaming context (safe to call unconditionally).

```python
@agent.tool
async def long_computation(ctx, n: int) -> str:
    report_progress(f"Computing step 1/{n}…")
    ...
    return result
```

### `RequestContext` helpers

| Helper | Returns | Purpose |
|---|---|---|
| `get_user_input(context)` | `str` | Current user message text |
| `get_task_history(context)` | `list[tuple[str, str]]` | Prior conversation as `(role, text)` tuples, oldest → newest |
| `format_history(history, *, max_lines=12, header="Conversation so far:")` | `str` | Renders `(role, text)` pairs as a prompt prefix, capped to the most recent `max_lines` |

```python
from fast_a2a_app import format_history, get_task_history, get_user_input

def my_prompt(context) -> str:
    return (
        "You are an expert.\n\n"
        + format_history(get_task_history(context), max_lines=6)
        + get_user_input(context)
    )
```

### Artifact builders

The package splits builders into two tiers: **embedded primitives** that wrap A2A protocol Parts directly, and **specialised artifacts** that carry a typed `_type` discriminator and route to a dedicated UI renderer.

**Embedded primitives:**

| Helper | UI rendering |
|---|---|
| `text_artifact(text, *, name="result")` | Markdown bubble |
| `data_artifact(data, *, name="data", text=None)` | When `data._type` matches a registered typed renderer → that widget; otherwise generic key-value block |
| `file_artifact(content=None, *, url=None, filename, media_type, name=None, text=None)` | Download card; `image/*` media types render inline. Pass exactly one of inline `content` bytes or a `url` reference |
| `image_artifact(image_bytes=None, *, url=None, media_type="image/png", caption=None, filename=None, name="image")` | Inline image preview + click-to-fullscreen |

**Specialised artifacts:**

| Helper | `_type` | UI rendering |
|---|---|---|
| `table_artifact(rows, *, columns=None, caption=None, name="table")` | `"TABLE"` | Real HTML `<table>` — headers, alternating row shading, right-aligned monospace numerics |
| `prompt_suggestions_artifact(suggestions, *, text=None, name="prompt_suggestions")` | `"PROMPT_SUGGESTIONS"` | Row of clickable pill buttons; click submits the suggestion's `prompt` as the next user message |
| `map_artifact(markers, *, center=None, zoom=None, caption=None, name="map")` | `"MAP"` | Interactive Leaflet/OpenStreetMap map. `markers` is `[{lat, lng, label?, popup?}, …]` |

```python
async def stream_invoke(prompt, context):
    yield text_artifact("Computing…")
    yield table_artifact(
        rows=[["APAC", 38400], ["EMEA", 22000]],
        columns=["region", "revenue"],
        caption="Top regions",
    )
    yield image_artifact(url="/charts/abc.png", caption="Year-over-year")
    yield map_artifact(
        [{"lat": 41.9028, "lng": 12.4964, "label": "Rome"}],
        caption="Suggested destination",
    )
    yield prompt_suggestions_artifact(
        [{"label": "Drill into APAC", "prompt": "Break down APAC by country."}],
        text="What next?",
    )
```

`image_artifact` and `file_artifact` accept either inline bytes *or* a `url`. The URL form keeps large binaries out of the wire transcript and the browser's `localStorage` — store the bytes in your own backend (object store, sibling FastAPI endpoint, CDN) and ship just the URL.

### Typed-artifact registry

`artifact_types` is a process-wide `ArtifactTypeRegistry` populated at import time by walking `fast_a2a_app/server/artifacts/` and registering every uppercase `<TAG>.py` module.

```python
from fast_a2a_app import artifact_types

# Built-ins after import:
[t.tag for t in artifact_types.all()]
# → ['MAP', 'PROMPT_SUGGESTIONS', 'TABLE']

# Register your own at runtime:
artifact_types.register("MYAPP_TIMELINE", builder=timeline_artifact)
```

| Method | Purpose |
|---|---|
| `register(tag, *, builder=None)` | Adds (or overrides) a `(tag, builder)` pair |
| `unregister(tag)` | Removes a tag from the registry |
| `get(tag)` | Returns the `ArtifactType` record or `None` |
| `builder(tag)` | Convenience accessor for the Python builder |
| `all()` | All registered types in registration order |

### UI

**`a2a_ui`** — pre-built Starlette ASGI app serving the self-contained single-page chat interface. No build step, no npm. Mount it at `"/"` to serve the UI with default settings (no file upload).

```python
app.mount("/", a2a_ui)
```

**`build_a2a_ui(...)`** — build a fresh UI app with configuration applied at template-substitution time.

| Parameter | Type | Default | Description |
|---|---|---|---|
| `file_upload_api` | `str \| None` | `None` | URL the paperclip should `POST` files to as `multipart/form-data`. Endpoint must return `{id, url, mediaType, filename}`. When `None`, the attach button is hidden. |
| `accepted_file_types` | `list[str] \| str \| None` | `None` (images only) | What the file picker accepts. Same format as the HTML `<input accept>` attribute — file extensions (`".csv"`), MIME types (`"text/csv"`), or wildcards (`"image/*"`) |

```python
app.mount("/", build_a2a_ui(
    file_upload_api="/uploads",
    accepted_file_types=[".csv", ".xlsx", "text/csv"],
))
```

The UI reads the agent card from `/a2a/.well-known/agent-card.json` to populate the header name and the collapsible info panel.

### Storage

`A2ATaskStore` is a Protocol for pluggable storage of tasks, context indices, and cancel signals:

```python
class A2ATaskStore(Protocol):
    async def save(self, task, call_context): ...
    async def get(self, task_id): ...
    async def list_by_context(self, context_id, exclude_task_id=None): ...
    async def signal_cancel(self, task_id): ...
    async def is_cancel_signalled(self, task_id): ...
```

Four built-in implementations:

| Store | When to use | Constructor |
|---|---|---|
| `MemoryTaskStore` | Dev, tests, single-process demos. The default when `task_store` is omitted. State lives in RAM — no persistence, no cross-instance cancel. | `MemoryTaskStore()` |
| `RedisTaskStore` | Production. Native TTL, horizontal scale, cross-instance cancel via short-TTL keys. | `RedisTaskStore(client)` or `RedisTaskStore.from_url("redis://…")` |
| `MongoTaskStore` | Production where Mongo is the operational data store. TTL indexes drop expired docs server-side. | `MongoTaskStore(client, database_name="fast_a2a")` or `await MongoTaskStore.from_uri("mongodb://…")` |
| `PostgresTaskStore` | Production where Postgres is the operational data store. `expires_at` columns + read-time filtering. | `PostgresTaskStore(pool)` or `await PostgresTaskStore.from_dsn("postgresql://…")` |

```python
from fast_a2a_app import RedisTaskStore, build_a2a_app

build_a2a_app(
    ...,
    task_store=RedisTaskStore.from_url("redis://localhost:6379"),
)
```

Every store logs an `INFO` line on initialization so the console makes it obvious which backend is live; `MemoryTaskStore` additionally warns about its single-process limitation.

### Low-level

- **`ConfigurableAgentExecutor`** — the internal executor that runs `invoke` / `stream_invoke` against the A2A SDK's event loop. Honours `on_task_start` / `on_task_cancel` hooks and surfaces `report_progress` calls as `TASK_STATE_WORKING` status events.
- **`ContextAwareRequestContextBuilder`** — builds a `RequestContext` whose `related_tasks` is populated from the task store. Pass a custom one to `build_a2a_app(request_context_builder=...)` if you need to override how prior turns are loaded.
- **`ArtifactType`** — frozen dataclass describing a registered typed artifact: `tag: str`, `builder: Callable[..., Artifact] | None`.

### Versioning

```python
import fast_a2a_app
fast_a2a_app.__version__
```

---

## License

MIT
