Metadata-Version: 2.4
Name: fast_a2a_app
Version: 0.4.1
Summary: fast_a2a_app — Drop-in A2A server and chat UI for any AI agent
Project-URL: Homepage, https://github.com/rembli/fast_a2a_app
Project-URL: Repository, https://github.com/rembli/fast_a2a_app
Project-URL: Issues, https://github.com/rembli/fast_a2a_app/issues
Author-email: Georg Boegerl <georg.boegerl@henkel.com>
License: MIT
Keywords: a2a,agent,ai,chat,fastapi,llm,server
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Internet :: WWW/HTTP :: HTTP Servers
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Python: <4.0,>=3.11
Requires-Dist: a2a-sdk>=1.0.0
Requires-Dist: fastapi>=0.115.0
Requires-Dist: redis>=7.4.0
Requires-Dist: sse-starlette>=2.1.3
Requires-Dist: starlette>=0.41.0
Requires-Dist: uvicorn[standard]>=0.34.0
Description-Content-Type: text/markdown

# fast_a2a_app

**Drop-in A2A server and chat UI for any FastAPI application that runs ai agents — installable from PyPI.**

fast_a2a_app packages the battle-tested A2A protocol adapter and self-contained browser chat UI into a standalone pip-installable library. Get a fully
spec-compliant A2A server plus a ready-to-use chat interface in under 20 lines.

```
pip install fast_a2a_app
```

---

## Why fast_a2a_app?

[Pydantic AI](https://ai.pydantic.dev/) ships its own `FastA2A` integration, which is excellent if you are already inside the Pydantic AI ecosystem. fast_a2a_app exists for a different set of needs:

- **Mount point, not a framework.** fast_a2a_app is a plain Starlette app you mount into an existing FastAPI application at any path prefix. Everything outside that prefix — authentication middleware, custom routes, dependency injection, observability — is yours to own and compose however you like.
- **Framework-agnostic.** The library has zero dependency on Pydantic AI. Wire in any agent: raw Anthropic/OpenAI API calls, LangChain, LlamaIndex, or plain Python — as long as it exposes an `async (str) -> str` function or an async generator.
- **Separation of concerns.** Your FastAPI application stays in charge of the HTTP layer (auth, rate limiting, CORS, health checks). fast_a2a_app only handles the A2A protocol inside its mounted prefix, keeping agent logic cleanly decoupled from transport concerns.

We hope this contributes to a composable AI agent architecture where protocol adapters, agent frameworks, and application infrastructure are independent choices.

---

## What's inside

| Module | What it does |
|---|---|
| `fast_a2a_app.server` | A2A JSON-RPC server (streaming SSE, multi-turn history, cross-instance cancel) |
| `fast_a2a_app.ui` | Self-contained browser chat UI — no build step, no npm |

### Protocol features (via `a2a-sdk` 1.0.x)

- `SendMessage` — single-shot request/response (non-streaming)
- `SendStreamingMessage` — streaming SSE responses
- `CancelTask` — immediate or cross-replica cancellation
- `SubscribeToTask` — reconnect to an in-flight stream after a network blip
- `GetTask` — snapshot fallback for page-reload recovery
- `.well-known/agent-card.json` — agent discovery

### Server features

- **Multi-turn history** — every turn is stored in Redis and injected as a "Conversation so far:" prefix, giving the agent continuity without client-side replay. History depth and a system prompt are configurable via `history_max_lines` and `system_prompt` on `build_a2a_app`; full custom prompt assembly is also supported.
- **Cross-instance cancellation** — cancel signals flow through Redis so any replica can stop a task running on another replica
- **Live progress updates** — call `report_progress("step 2/5…")` from any tool and the chat UI spinner updates in real time
- **Lifecycle hooks** — `on_task_start` / `on_task_cancel` callbacks for metrics, locks, or state resets

### UI features

- **Stream toggle** — checkbox in the input bar switches between `SendStreamingMessage` (tokens arrive live) and `SendMessage` (full response rendered at once); preference persisted in `localStorage`. Hidden automatically when the agent card reports `capabilities.streaming = false`.
- **Image attachments** *(opt-in)* — paperclip button uploads images via multipart `POST` to whatever endpoint you pass as `build_a2a_ui(file_upload_api=...)`. The endpoint must accept `multipart/form-data` and return `{id, url, mediaType, filename}`; the UI then sends a `{url, filename, mediaType}` part to the agent. **The attach button is hidden by default** (when `file_upload_api` is unset) so agents that don't accept image references don't expose a broken upload affordance.
- **Inline image rendering + fullscreen viewer** — image parts (raw or URL) show inline previews; click any image for a fullscreen lightbox with prev/next navigation, a dedicated input that sends the currently-viewed image as a reference, and a loading spinner while the next image is generated.
- **Prompt-suggestion buttons** — agents can yield a data part with `_type: "PROMPT_SUGGESTIONS"` (via `prompt_suggestions_artifact`) and the UI renders clickable pills; clicking sends the suggestion as a normal user message.
- **Data part widget** — parts with `media_type: application/json` (or a `data` part) are rendered as a labeled key-value table with color-coded value types; no raw JSON brackets shown.
- **File part widget** — non-image binary parts show a type icon, filename, and media type with a "Download" button that creates a temporary Blob URL (or opens the URL for `url`-form parts).
- **Page-reload recovery** — active task is stored in `localStorage` and resubscribed on the next load. URL-based image parts re-fetch from the agent's storage endpoint, so refresh-safe galleries work without bloating localStorage with base64.
- **Markdown rendering** — agent responses rendered as GitHub-Flavored Markdown with DOMPurify sanitisation.
- **Collapsible agent card** — name, version, capabilities, and skills pulled from `.well-known/agent-card.json`.

---

## Storage

fast_a2a_app currently uses **Redis** for all server-side state:

| What is stored | Key pattern | TTL |
|---|---|---|
| Task JSON (full A2A task object) | `a2a:task:{id}` | 24 h |
| Conversation index (task_id → sequence) | `a2a:context:{cid}:tasks` | 24 h |
| Cross-instance cancel signal | `a2a:cancel:{id}` | 5 min |

Start a local Redis instance before running any example:

```bash
docker run -d -p 6379:6379 redis:7-alpine
```

Or point `REDIS_URL` at any managed Redis-compatible service (Redis Cloud, AWS ElastiCache, Azure Cache for Redis, etc.).

> **Roadmap** — pluggable storage backends (MongoDB, PostgreSQL) are planned.
> The `RedisTaskStore` already implements the `A2ATaskStore` Protocol, so a
> Mongo or Postgres backend can be swapped in by passing a custom
> `a2a_task_store` to `build_a2a_app()` without any library changes.

---

## Framework-agnostic design

fast_a2a_app has **no dependency on any AI framework**. `build_a2a_app` accepts two
plain callables that you implement however you like:

| Callable | Signature |
|---|---|
| `invoke` | `async (prompt: str) -> str` **or** `async (prompt: str) -> Artifact` |
| `stream_invoke` | `async (prompt: str) -> AsyncIterable[str]` |

Wrap them with the two helpers and pass to `build_a2a_app`:

```python
invoke=build_invoke(my_async_fn)
stream_invoke=build_stream_invoke(my_async_generator_fn)
```

`build_invoke` accepts both plain-text and multi-part agents — return a `str` for
a single text response, or return an `Artifact` to send text, structured data, and
file parts together in one response.

`build_stream_invoke` automatically sets up the `report_progress()` ContextVar,
so any code called during streaming can push live status updates to the chat UI —
regardless of which framework (or none) your agent uses.

You can also implement `invoke` and `stream_invoke` directly as bare callables
and pass them straight to `build_a2a_app` — no wrapper needed.

---

## Quickstart

### 1. Install

```bash
pip install fast_a2a_app
```

### 2. Implement your agent

Any `async (str) -> str` function works as the non-streaming invoke.
Any `async (str) -> AsyncIterable[str]` generator works for streaming.

```python
# agent.py
from collections.abc import AsyncIterable

client = ...  # any OpenAI-compatible async client

async def invoke(prompt: str) -> str:
    resp = await client.chat.completions.create(
        model="gpt-4o", max_completion_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
    )
    return (resp.choices[0].message.content or "").strip()

async def stream_invoke(prompt: str) -> AsyncIterable[str]:
    stream = await client.chat.completions.create(
        model="gpt-4o", max_completion_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    )
    async for chunk in stream:
        if not chunk.choices:
            continue
        text = chunk.choices[0].delta.content or ""
        if text:
            yield text
```

### 3. Wire up the server

```python
# main.py
from fastapi import FastAPI
from a2a.types import AgentCapabilities, AgentCard, AgentInterface
from fast_a2a_app import a2a_ui, build_a2a_app, build_invoke, build_stream_invoke
from agent import invoke, stream_invoke

app = FastAPI()

agent_card = AgentCard(
    name="My Agent",
    description="Does cool things",
    version="1.0.0",
    supported_interfaces=[AgentInterface(url="http://localhost:8000/a2a/", protocol_binding="JSONRPC")],
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=["text"],
    default_output_modes=["text"],
)

app.mount("/a2a", build_a2a_app(
    agent_card=agent_card,
    invoke=build_invoke(invoke),
    stream_invoke=build_stream_invoke(stream_invoke),
))

app.mount("/", a2a_ui)        # built-in chat UI at http://localhost:8000/
```

### 4. Run

```bash
# Start Redis (required for conversation history)
docker run -d -p 6379:6379 redis:7-alpine

# Run the app
uvicorn main:app --reload
```

Open `http://localhost:8000/` — you're chatting.

---

## Examples

| Example | What it shows | API key needed |
|---|---|---|
| [Echo Agent](examples/echo_agent/README.md) | Minimal integration — pure Python, no LLM | No |
| [Echo Multipart](examples/echo_multipart/README.md) | Streaming multi-part responses (text + JSON data + file download) | No |
| [Joke Agent](examples/joke_agent/README.md) | Raw chat completions, no agent framework | Azure OpenAI |
| [Holiday Planner](examples/holiday_planner/README.md) | Full pydantic-ai agent with tools and live progress updates | Azure OpenAI |
| [Image Creator](examples/image_creator/README.md) | Pydantic-ai agent with five tools (image generation, intent expansion, prompt rewriting, web search, brand-asset lookup), multi-step plans, URL-based image storage, fullscreen viewer, prompt suggestions, in-agent slash commands | Azure OpenAI |

All examples require a Redis instance. Start one with:

```bash
docker run -d -p 6379:6379 redis:7-alpine
```

---

## Prompt management

fast_a2a_app injects conversation history automatically, but you can take as
much or as little control over prompt construction as you need. The API
follows **Progressive Disclosure** — use only the level that fits your use case.

### Level 0 — zero config

Works out of the box. The last 12 lines of conversation history are prepended
to the user's message as `"Conversation so far:\n…"`. Nothing to set.

```python
build_a2a_app(agent_card=card, stream_invoke=build_stream_invoke(my_fn))
```

### Level 1 — keyword parameters

Tune the built-in prompt without writing any code:

```python
build_a2a_app(
    agent_card=card,
    stream_invoke=build_stream_invoke(my_fn),
    system_prompt="You are a concise travel planner. Reply in JSON.",
    history_max_lines=6,   # default is 12; set to 0 for a stateless agent
)
```

`system_prompt` is prepended before the history block and the user message.
`history_max_lines=0` disables history injection entirely.

### Level 2 — compose from helpers

Build a custom prompt from the exported building blocks:

```python
from fast_a2a_app import format_history, get_task_history, get_user_input

def my_prompt(context) -> str:
    return (
        "You are an expert planner.\n\n"
        + format_history(get_task_history(context), max_lines=4)
        + f"Respond in JSON:\n{get_user_input(context)}"
    )

build_a2a_app(..., prompt_builder=my_prompt)
```

`get_task_history(context)` returns raw `(role, text)` pairs (`role` is `"user"` or `"agent"`) so you can also route or filter conversation turns yourself. `format_history(pairs, *, max_lines, header)` is the formatter — pass it the pairs to render a `"Conversation so far:\n…"` block.

### Level 3 — full custom builder

Pass any `(RequestContext) -> str` as `prompt_builder` for complete control.
`system_prompt` and `history_max_lines` are ignored when a custom
`prompt_builder` is supplied.

```python
def my_prompt(context) -> str:
    # context.get_user_input()   — current user message
    # context.related_tasks      — prior Task objects for this conversation
    # context.current_task       — task being executed now
    # context.message            — raw A2A Message object
    return f"Be concise.\n{context.get_user_input()}"

build_a2a_app(..., prompt_builder=my_prompt)
```

---

## API reference

### `build_a2a_app(...)`

Assembles a Starlette ASGI app. Mount it at any path prefix.

| Parameter | Type | Default | Description |
|---|---|---|---|
| `agent_card` | `AgentCard` | required | Pre-built A2A agent card (name, description, version, url, skills, capabilities) |
| `invoke` | `Callable \| None` | `None` | Non-streaming callable — use `build_invoke()` to wrap |
| `stream_invoke` | `Callable \| None` | `None` | Streaming callable — use `build_stream_invoke()` to wrap |
| `system_prompt` | `str \| None` | `None` | **Level 1** — prepended to every prompt before history and user input |
| `history_max_lines` | `int` | `12` | **Level 1** — number of prior conversation lines to inject; `0` disables history |
| `prompt_builder` | `Callable \| None` | auto | **Level 2/3** — custom `(RequestContext) -> str`; overrides `system_prompt` and `history_max_lines` |
| `on_task_start` | `Callable[[str], Awaitable] \| None` | `None` | Called before each task |
| `on_task_cancel` | `Callable[[str], Awaitable] \| None` | `None` | Called on cancel |
| `a2a_task_store` | `A2ATaskStore \| None` | auto | Custom task store |
| `redis_client` | `aioredis.Redis \| None` | auto | Custom Redis client |
| `redis_url` | `str` | `"redis://localhost:6379"` | Redis connection string |
| `debug` | `bool` | `False` | Include exception details in failure messages |

### Artifact builders

Convenience constructors that wrap the verbose `Artifact(...) / Part(...)` boilerplate. Import from the top-level package:

```python
from fast_a2a_app import (
    text_artifact, data_artifact, file_artifact, image_artifact,
    prompt_suggestions_artifact,
)
```

| Helper | Returns | UI rendering |
|---|---|---|
| `text_artifact(text, *, name="result")` | text-only Artifact | markdown bubble |
| `data_artifact(data, *, name="data", text=None)` | structured-data Artifact (protobuf `Value`); accepts any JSON-compatible dict | key-value table; optional `text` rendered as markdown above |
| `file_artifact(content=None, *, url=None, filename, media_type, name=None, text=None)` | file Artifact — pass either inline `content` bytes OR a `url` reference (exactly one) | download card; image media types render inline |
| `image_artifact(image_bytes=None, *, url=None, media_type="image/png", caption=None, filename=None, name="image")` | image Artifact — inline bytes or a stored URL | inline image preview + download; clickable for fullscreen |
| `prompt_suggestions_artifact(suggestions, *, text=None, name="prompt_suggestions")` | data Artifact with `{_type: "PROMPT_SUGGESTIONS", suggestions: [...]}` envelope | clickable pill buttons; clicking sends the suggestion as a user message |

```python
async def my_agent(prompt, context):
    yield text_artifact("Computing summary…")
    yield data_artifact({"count": 42, "ok": True}, text="Run finished:")
    yield image_artifact(png_bytes, caption="Here's your chart.")              # inline
    yield image_artifact(url="/images/abc", caption="Or stored elsewhere.")    # url form
    yield prompt_suggestions_artifact(
        [{"label": "Make it warmer", "prompt": "Make the lighting warmer."}],
        text="Want to refine?",
    )
```

The URL form on `image_artifact` / `file_artifact` lets you keep large binaries out of the wire transcript and the browser's `localStorage` — store the bytes in your own backend (object store, sibling FastAPI endpoint, CDN) and ship just the URL. The chat UI fetches via `<img src=url>` / `window.open(url)` like any other static asset.

### `build_invoke(run)`

Wraps any `async (prompt: str) -> str | Artifact` function as a non-streaming A2A invoke.
Works with any AI framework or plain API call. Return a plain `str` for a text response,
or return an `Artifact` to send multiple parts (text, JSON data, files) in one response:

```python
from a2a.types import Artifact, Part
import json, uuid

async def my_agent(prompt: str) -> Artifact:
    return Artifact(
        artifact_id=str(uuid.uuid4()),
        name="result",
        parts=[
            Part(text=f"Here is your data for: {prompt}"),
            Part(raw=json.dumps({"count": 42}).encode(), media_type="application/json"),
            Part(raw=b"file content", filename="out.txt", media_type="text/plain"),
        ],
    )

app.mount("/a2a", build_a2a_app(agent_card=card, invoke=build_invoke(my_agent)))
```

### `build_stream_invoke(run)`

Wraps any `async (prompt: str) -> AsyncIterable[str]` generator as a streaming A2A invoke.
Also sets up the `report_progress()` ContextVar so live progress updates work
out of the box — call `report_progress("step 2/5…")` anywhere during execution
and it will appear as a working-status event in the chat UI.

### `report_progress(message)`

Call from any agent tool to push a status string to the chat UI spinner.
Has no effect outside a streaming context (safe to call unconditionally).

```python
@agent.tool_plain
async def long_computation(n: int) -> str:
    report_progress(f"Computing step 1/{n}…")
    # …
    report_progress(f"Computing step 2/{n}…")
    return result
```

### `get_user_input(context)`

Returns the current user message text from a `RequestContext`. Use this in
a custom `prompt_builder` so you don't need to know the internal SDK method
name:

```python
from fast_a2a_app import get_user_input

def my_prompt(context) -> str:
    return f"Respond in JSON:\n{get_user_input(context)}"
```

### `get_task_history(context)`

Returns prior conversation as a list of `(role, text)` tuples ordered oldest → newest.
`role` is `"user"` or `"agent"`. Returns `[]` when there is no prior history.

```python
for role, text in get_task_history(context):
    if role == "user":
        ...
```

### `format_history(history, *, max_lines=12, header="Conversation so far:")`

Renders `(role, text)` pairs as a prompt prefix — caps to the most recent `max_lines`,
formats each as `"User: …"` / `"Agent: …"`, and prepends `header`. Returns `""` when
the list is empty. Use together with `get_task_history` in a custom `prompt_builder`:

```python
from fast_a2a_app import format_history, get_task_history, get_user_input

def my_prompt(context) -> str:
    return (
        "You are an expert.\n\n"
        + format_history(get_task_history(context), max_lines=6)
        + get_user_input(context)
    )
```

### `a2a_ui` / `build_a2a_ui`

A Starlette ASGI app serving a self-contained single-page chat interface.
No build step, no npm. Mount it at `"/"` to serve the UI.

```python
app.mount("/", a2a_ui)                          # default: no upload button
app.mount("/", build_a2a_ui(file_upload_api="/images"))  # enable image attachments
```

`build_a2a_ui` accepts:

- `file_upload_api` *(str | None)* — URL the chat's paperclip button should `POST` images to as `multipart/form-data`. Endpoint must return `{id, url, mediaType, filename}`. When `None`, the attach button is hidden.

The UI reads the agent card from `/a2a/.well-known/agent-card.json` to populate
the header name and the collapsible info panel.


---

## Architecture

```
FastAPI app
├── /a2a    ← Starlette ASGI app (build_a2a_app)
│   ├── POST /            SendMessage, SendStreamingMessage, CancelTask, …
│   └── GET  /.well-known/agent-card.json
└── /       ← a2a_ui (Starlette, single HTML file)

Redis
├── a2a:task:{id}                 Task JSON (24 h TTL)
├── a2a:context:{cid}:tasks       Context index (task_id → sequence)
├── a2a:context:{cid}:sequence    Sequence counter
└── a2a:cancel:{id}               Cancel signal (5 min TTL)
```

### Conversation history injection

Each A2A task has a `context_id` shared across all turns of a conversation.
`ContextAwareRequestContextBuilder` fetches all prior tasks for the same
`context_id` from Redis and attaches them to the `RequestContext` as
`related_tasks`.

The default `prompt_builder` then calls `get_task_history()` to extract
`(role, text)` pairs and `format_history()` to render the most recent
`history_max_lines` of them (default: 12) as `"Conversation so far:\n…"`
before the user's message. An optional `system_prompt` is inserted first.

The agent therefore sees recent history without the client needing to
replay it. Depth and format are fully configurable — see
[Prompt management](#prompt-management) above.

### How streaming works

`build_stream_invoke` wraps your generator in an `asyncio.Queue`-based
relay. Before starting the generator it sets a `ContextVar` callback that
`report_progress()` reads. Strings from `report_progress()` are placed in
the queue with a sentinel prefix; `ConfigurableAgentExecutor` routes them
to non-final `statusUpdate` (state `TASK_STATE_WORKING`) SSE events. All
other yielded strings become `artifactUpdate` events — the streaming text
the user sees.

---

## Publishing to PyPI

```bash
pip install hatch
hatch build
hatch publish
```
---

## License

MIT
