Metadata-Version: 2.4
Name: fast_a2a_app
Version: 0.3.1
Summary: fast_a2a_app — Drop-in A2A server and chat UI for any AI agent
Project-URL: Homepage, https://github.com/rembli/fast_a2a_app
Project-URL: Repository, https://github.com/rembli/fast_a2a_app
Project-URL: Issues, https://github.com/rembli/fast_a2a_app/issues
Author-email: Georg Boegerl <georg.boegerl@henkel.com>
License: MIT
Keywords: a2a,agent,ai,chat,fastapi,llm,server
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Internet :: WWW/HTTP :: HTTP Servers
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Python: <4.0,>=3.11
Requires-Dist: a2a-sdk>=1.0.0
Requires-Dist: fastapi>=0.115.0
Requires-Dist: redis>=7.4.0
Requires-Dist: sse-starlette>=2.1.3
Requires-Dist: starlette>=0.41.0
Requires-Dist: uvicorn[standard]>=0.34.0
Description-Content-Type: text/markdown

# fast_a2a_app

**Drop-in A2A server and chat UI for any FastAPI application that runs ai agents — installable from PyPI.**

fast_a2a_app packages the battle-tested A2A protocol adapter and self-contained browser chat UI into a standalone pip-installable library. Get a fully
spec-compliant A2A server plus a ready-to-use chat interface in under 20 lines.

```
pip install fast_a2a_app
```

---

## Why fast_a2a_app?

[Pydantic AI](https://ai.pydantic.dev/) ships its own `FastA2A` integration, which is excellent if you are already inside the Pydantic AI ecosystem. fast_a2a_app exists for a different set of needs:

- **Mount point, not a framework.** fast_a2a_app is a plain Starlette app you mount into an existing FastAPI application at any path prefix. Everything outside that prefix — authentication middleware, custom routes, dependency injection, observability — is yours to own and compose however you like.
- **Framework-agnostic.** The library has zero dependency on Pydantic AI. Wire in any agent: raw Anthropic/OpenAI API calls, LangChain, LlamaIndex, or plain Python — as long as it exposes an `async (str) -> str` function or an async generator.
- **Separation of concerns.** Your FastAPI application stays in charge of the HTTP layer (auth, rate limiting, CORS, health checks). fast_a2a_app only handles the A2A protocol inside its mounted prefix, keeping agent logic cleanly decoupled from transport concerns.

We hope this contributes to a composable AI agent architecture where protocol adapters, agent frameworks, and application infrastructure are independent choices.

---

## What's inside

| Module | What it does |
|---|---|
| `fast_a2a_app.server` | A2A JSON-RPC server (streaming SSE, multi-turn history, cross-instance cancel) |
| `fast_a2a_app.ui` | Self-contained browser chat UI — no build step, no npm |

### Protocol features (via `a2a-sdk` 1.0.x)

- `SendMessage` — single-shot request/response (non-streaming)
- `SendStreamingMessage` — streaming SSE responses
- `CancelTask` — immediate or cross-replica cancellation
- `SubscribeToTask` — reconnect to an in-flight stream after a network blip
- `GetTask` — snapshot fallback for page-reload recovery
- `.well-known/agent-card.json` — agent discovery

### Server features

- **Multi-turn history** — every turn is stored in Redis and injected as a "Conversation so far:" prefix, giving the agent continuity without client-side replay. History depth and a system prompt are configurable via `history_max_lines` and `system_prompt` on `build_a2a_app`; full custom prompt assembly is also supported.
- **Cross-instance cancellation** — cancel signals flow through Redis so any replica can stop a task running on another replica
- **Live progress updates** — call `report_progress("step 2/5…")` from any tool and the chat UI spinner updates in real time
- **Lifecycle hooks** — `on_task_start` / `on_task_cancel` callbacks for metrics, locks, or state resets

### UI features

- **Stream toggle** — checkbox in the input bar switches between `SendStreamingMessage` (tokens arrive live) and `SendMessage` (full response rendered at once); preference persisted in `localStorage`. Hidden automatically when the agent card reports `capabilities.streaming = false`.
- **Data part widget** — parts with `media_type: application/json` are rendered as a labeled key-value table with color-coded value types; no raw JSON brackets shown
- **File part widget** — raw binary parts show a type icon, filename, and media type with a "Download" button that creates a temporary Blob URL
- **Page-reload recovery** — active task is stored in `localStorage` and resubscribed on the next load
- **Markdown rendering** — agent responses rendered as GitHub-Flavored Markdown with DOMPurify sanitisation
- **Collapsible agent card** — name, version, capabilities, and skills pulled from `.well-known/agent-card.json`

---

## Storage

fast_a2a_app currently uses **Redis** for all server-side state:

| What is stored | Key pattern | TTL |
|---|---|---|
| Task JSON (full A2A task object) | `a2a:task:{id}` | 24 h |
| Conversation index (task_id → sequence) | `a2a:context:{cid}:tasks` | 24 h |
| Cross-instance cancel signal | `a2a:cancel:{id}` | 5 min |

Start a local Redis instance before running any example:

```bash
docker run -d -p 6379:6379 redis:7-alpine
```

Or point `REDIS_URL` at any managed Redis-compatible service (Redis Cloud, AWS ElastiCache, Azure Cache for Redis, etc.).

> **Roadmap** — pluggable storage backends (MongoDB, PostgreSQL) are planned.
> The `RedisTaskStore` already implements the `A2ATaskStore` Protocol, so a
> Mongo or Postgres backend can be swapped in by passing a custom
> `a2a_task_store` to `build_a2a_app()` without any library changes.

---

## Framework-agnostic design

fast_a2a_app has **no dependency on any AI framework**. `build_a2a_app` accepts two
plain callables that you implement however you like:

| Callable | Signature |
|---|---|
| `invoke` | `async (prompt: str) -> str` **or** `async (prompt: str) -> Artifact` |
| `stream_invoke` | `async (prompt: str) -> AsyncIterable[str]` |

Wrap them with the two helpers and pass to `build_a2a_app`:

```python
invoke=build_invoke(my_async_fn)
stream_invoke=build_stream_invoke(my_async_generator_fn)
```

`build_invoke` accepts both plain-text and multi-part agents — return a `str` for
a single text response, or return an `Artifact` to send text, structured data, and
file parts together in one response.

`build_stream_invoke` automatically sets up the `report_progress()` ContextVar,
so any code called during streaming can push live status updates to the chat UI —
regardless of which framework (or none) your agent uses.

You can also implement `invoke` and `stream_invoke` directly as bare callables
and pass them straight to `build_a2a_app` — no wrapper needed.

---

## Quickstart

### 1. Install

```bash
pip install fast_a2a_app
```

### 2. Implement your agent

Any `async (str) -> str` function works as the non-streaming invoke.
Any `async (str) -> AsyncIterable[str]` generator works for streaming.

```python
# agent.py
from collections.abc import AsyncIterable

client = ...  # any OpenAI-compatible async client

async def invoke(prompt: str) -> str:
    resp = await client.chat.completions.create(
        model="gpt-4o", max_completion_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
    )
    return (resp.choices[0].message.content or "").strip()

async def stream_invoke(prompt: str) -> AsyncIterable[str]:
    stream = await client.chat.completions.create(
        model="gpt-4o", max_completion_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    )
    async for chunk in stream:
        if not chunk.choices:
            continue
        text = chunk.choices[0].delta.content or ""
        if text:
            yield text
```

### 3. Wire up the server

```python
# main.py
from fastapi import FastAPI
from a2a.types import AgentCapabilities, AgentCard, AgentInterface
from fast_a2a_app import a2a_ui, build_a2a_app, build_invoke, build_stream_invoke
from agent import invoke, stream_invoke

app = FastAPI()

agent_card = AgentCard(
    name="My Agent",
    description="Does cool things",
    version="1.0.0",
    supported_interfaces=[AgentInterface(url="http://localhost:8000/a2a/", protocol_binding="JSONRPC")],
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=["text"],
    default_output_modes=["text"],
)

app.mount("/a2a", build_a2a_app(
    agent_card=agent_card,
    invoke=build_invoke(invoke),
    stream_invoke=build_stream_invoke(stream_invoke),
))

app.mount("/", a2a_ui)        # built-in chat UI at http://localhost:8000/
```

### 4. Run

```bash
# Start Redis (required for conversation history)
docker run -d -p 6379:6379 redis:7-alpine

# Run the app
uvicorn main:app --reload
```

Open `http://localhost:8000/` — you're chatting.

---

## Prompt management

fast_a2a_app injects conversation history automatically, but you can take as
much or as little control over prompt construction as you need. The API
follows **Progressive Disclosure** — use only the level that fits your use case.

### Level 0 — zero config

Works out of the box. The last 12 lines of conversation history are prepended
to the user's message as `"Conversation so far:\n…"`. Nothing to set.

```python
build_a2a_app(agent_card=card, stream_invoke=build_stream_invoke(my_fn))
```

### Level 1 — keyword parameters

Tune the built-in prompt without writing any code:

```python
build_a2a_app(
    agent_card=card,
    stream_invoke=build_stream_invoke(my_fn),
    system_prompt="You are a concise travel planner. Reply in JSON.",
    history_max_lines=6,   # default is 12; set to 0 for a stateless agent
)
```

`system_prompt` is prepended before the history block and the user message.
`history_max_lines=0` disables history injection entirely.

### Level 2 — compose from helpers

Build a custom prompt from the exported building blocks:

```python
from fast_a2a_app import build_conversation_prefix, get_user_input

def my_prompt(context) -> str:
    return (
        "You are an expert planner.\n\n"
        + build_conversation_prefix(context, max_lines=4)
        + f"Respond in JSON:\n{get_user_input(context)}"
    )

build_a2a_app(..., prompt_builder=my_prompt)
```

### Level 3 — full custom builder

Pass any `(RequestContext) -> str` as `prompt_builder` for complete control.
`system_prompt` and `history_max_lines` are ignored when a custom
`prompt_builder` is supplied.

```python
def my_prompt(context) -> str:
    # context.get_user_input()   — current user message
    # context.related_tasks      — prior Task objects for this conversation
    # context.current_task       — task being executed now
    # context.message            — raw A2A Message object
    return f"Be concise.\n{context.get_user_input()}"

build_a2a_app(..., prompt_builder=my_prompt)
```

---

## API reference

### `build_a2a_app(...)`

Assembles a Starlette ASGI app. Mount it at any path prefix.

| Parameter | Type | Default | Description |
|---|---|---|---|
| `agent_card` | `AgentCard` | required | Pre-built A2A agent card (name, description, version, url, skills, capabilities) |
| `invoke` | `Callable \| None` | `None` | Non-streaming callable — use `build_invoke()` to wrap |
| `stream_invoke` | `Callable \| None` | `None` | Streaming callable — use `build_stream_invoke()` to wrap |
| `system_prompt` | `str \| None` | `None` | **Level 1** — prepended to every prompt before history and user input |
| `history_max_lines` | `int` | `12` | **Level 1** — number of prior conversation lines to inject; `0` disables history |
| `prompt_builder` | `Callable \| None` | auto | **Level 2/3** — custom `(RequestContext) -> str`; overrides `system_prompt` and `history_max_lines` |
| `on_task_start` | `Callable[[str], Awaitable] \| None` | `None` | Called before each task |
| `on_task_cancel` | `Callable[[str], Awaitable] \| None` | `None` | Called on cancel |
| `a2a_task_store` | `A2ATaskStore \| None` | auto | Custom task store |
| `redis_client` | `aioredis.Redis \| None` | auto | Custom Redis client |
| `redis_url` | `str` | `"redis://localhost:6379"` | Redis connection string |
| `debug` | `bool` | `False` | Include exception details in failure messages |

### `build_invoke(run)`

Wraps any `async (prompt: str) -> str | Artifact` function as a non-streaming A2A invoke.
Works with any AI framework or plain API call. Return a plain `str` for a text response,
or return an `Artifact` to send multiple parts (text, JSON data, files) in one response:

```python
from a2a.types import Artifact, Part
import json, uuid

async def my_agent(prompt: str) -> Artifact:
    return Artifact(
        artifact_id=str(uuid.uuid4()),
        name="result",
        parts=[
            Part(text=f"Here is your data for: {prompt}"),
            Part(raw=json.dumps({"count": 42}).encode(), media_type="application/json"),
            Part(raw=b"file content", filename="out.txt", media_type="text/plain"),
        ],
    )

app.mount("/a2a", build_a2a_app(agent_card=card, invoke=build_invoke(my_agent)))
```

### `build_stream_invoke(run)`

Wraps any `async (prompt: str) -> AsyncIterable[str]` generator as a streaming A2A invoke.
Also sets up the `report_progress()` ContextVar so live progress updates work
out of the box — call `report_progress("step 2/5…")` anywhere during execution
and it will appear as a working-status event in the chat UI.

### `report_progress(message)`

Call from any agent tool to push a status string to the chat UI spinner.
Has no effect outside a streaming context (safe to call unconditionally).

```python
@agent.tool_plain
async def long_computation(n: int) -> str:
    report_progress(f"Computing step 1/{n}…")
    # …
    report_progress(f"Computing step 2/{n}…")
    return result
```

### `get_user_input(context)`

Returns the current user message text from a `RequestContext`. Use this in
a custom `prompt_builder` so you don't need to know the internal SDK method
name:

```python
from fast_a2a_app import get_user_input

def my_prompt(context) -> str:
    return f"Respond in JSON:\n{get_user_input(context)}"
```

### `build_conversation_prefix(context, *, max_lines=12)`

Returns prior conversation lines as a formatted `"Conversation so far:\n…"`
string, capped at `max_lines` lines. Returns an empty string when there is
no prior history. Use in a custom `prompt_builder`:

```python
from fast_a2a_app import build_conversation_prefix, get_user_input

def my_prompt(context) -> str:
    return (
        "You are an expert.\n\n"
        + build_conversation_prefix(context, max_lines=6)
        + get_user_input(context)
    )
```

### `a2a_ui`

A Starlette ASGI app serving a self-contained single-page chat interface.
No build step, no npm. Mount it at `"/"` to serve the UI.

```python
app.mount("/", a2a_ui)
```

The UI reads the agent card from `/a2a/.well-known/agent-card.json` to populate
the header name and the collapsible info panel.

---

## Example: Holiday Planner

`examples/holiday_planner/` is a complete example showing how to build a
domain-specific agent on fast_a2a_app.

```
examples/holiday_planner/
├── agent.py          # pydantic-ai agent with 4 tools
├── main.py           # FastAPI app + fast_a2a_app wiring
└── requirements.txt
```

### Running the example

```bash
# One-time: create your .env from the shared template
cp examples/.env.example examples/.env
# edit examples/.env — set AZURE_AI_ENDPOINT and AZURE_AI_DEPLOYMENT

cd examples/holiday_planner
pip install -e ../../          # install fast_a2a_app from repo root
pip install -r requirements.txt

docker run -d -p 6379:6379 redis:7-alpine

uvicorn main:app --reload
```

Open `http://localhost:8000/` and ask:

> *"I want to plan a 10-day trip somewhere in Southeast Asia in September,
>  moderate budget, interested in food, temples, and nature. Can you help?"*

The agent will ask follow-up questions, then use its tools to recommend
destinations, build a day-by-day itinerary, estimate costs, and provide
travel essentials — all with live progress updates in the UI.

### Holiday planner tools

| Tool | Description |
|---|---|
| `recommend_destinations` | 2-3 tailored destination suggestions with pros/cons |
| `create_itinerary` | Day-by-day plan with restaurants and local tips |
| `estimate_budget` | Cost breakdown table per person per day |
| `get_travel_essentials` | Visa, health, weather, and packing guide |

---

## Example: Echo Agent (no LLM, no external dependencies)

`examples/echo_agent/` is the minimal fast_a2a_app integration — pure Python, no API key, no AI framework.

```
examples/echo_agent/
├── agent.py          # Two plain async functions, zero external imports
├── main.py           # FastAPI app
└── requirements.txt  # fast_a2a_app only
```

```python
# agent.py
async def invoke(prompt: str) -> str:
    return f"Echo: {prompt}"

async def stream_invoke(prompt: str) -> AsyncIterable[str]:
    words = f"Echo: {prompt}".split()
    for i, word in enumerate(words):
        yield word if i == len(words) - 1 else word + " "
        await asyncio.sleep(0.05)   # makes streaming visible in the UI
```

### Running the echo agent

```bash
cd examples/echo_agent
pip install -e ../../
pip install -r requirements.txt

docker run -d -p 6379:6379 redis:7-alpine
uvicorn main:app --reload
```

> No `.env` needed — the echo agent requires no API key. A `REDIS_URL` can
> be set via `examples/.env` if you need a non-default Redis address.

No API key needed. Open `http://localhost:8000/` and type anything.

---

## Example: Joke Agent (raw chat completions, no agent framework)

`examples/joke_agent/` shows fast_a2a_app wired to plain Azure OpenAI chat completions — no agent framework at all.

```
examples/joke_agent/
├── agent.py          # Two plain async functions: run_joke_agent + stream_joke_agent
├── main.py           # FastAPI app using build_invoke / build_stream_invoke
└── requirements.txt
```

`agent.py` defines two callables that satisfy the fast_a2a_app contract:

```python
# Non-streaming: async (str) -> str
async def run_joke_agent(prompt: str) -> str:
    response = await client.chat.completions.create(model=..., messages=[...])
    return (response.choices[0].message.content or "").strip()

# Streaming: async (str) -> AsyncIterable[str]
async def stream_joke_agent(prompt: str) -> AsyncIterable[str]:
    stream = await client.chat.completions.create(model=..., messages=[...], stream=True)
    async for chunk in stream:
        if not chunk.choices:
            continue
        text = chunk.choices[0].delta.content or ""
        if text:
            yield text
```

`main.py` wires them in with the helpers:

```python
from a2a.types import AgentCapabilities, AgentCard, AgentInterface, AgentSkill
from fast_a2a_app import build_a2a_app, build_invoke, build_stream_invoke, a2a_ui
from agent import run_joke_agent, stream_joke_agent

agent_card = AgentCard(
    name="Joke Agent",
    description="Your AI stand-up comedian.",
    version="0.1.0",
    supported_interfaces=[AgentInterface(url="http://localhost:8000/a2a/", protocol_binding="JSONRPC")],
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=["text"],
    default_output_modes=["text"],
    skills=[AgentSkill(id="tell_joke", name="Tell a joke", description="Tells a joke on any topic.", tags=[])],
)

app.mount("/a2a", build_a2a_app(
    agent_card=agent_card,
    invoke=build_invoke(run_joke_agent),
    stream_invoke=build_stream_invoke(stream_joke_agent),
))
app.mount("/", a2a_ui)
```

### Running the joke agent

```bash
# One-time: create your .env from the shared template
cp examples/.env.example examples/.env
# edit examples/.env — set AZURE_AI_ENDPOINT and AZURE_AI_DEPLOYMENT

cd examples/joke_agent
pip install -e ../../
pip install -r requirements.txt

docker run -d -p 6379:6379 redis:7-alpine
uvicorn main:app --reload
```

Open `http://localhost:8000/` and try:

> *"Tell me a programming joke"* or *"Give me your best dad joke"*

Tokens stream directly from the Azure OpenAI API to the browser as they arrive.

---

## Example: Echo Multipart (multi-part responses, no LLM)

`examples/echo_multipart/` demonstrates returning text, structured data, and a
downloadable file in a single A2A response — no LLM, no API key, no protobuf boilerplate.

```
examples/echo_multipart/
├── agent.py          # Returns Artifact with 3 parts using only json.dumps
├── main.py           # FastAPI app wired with build_invoke
└── requirements.txt
```

```python
# agent.py
import json, uuid
from a2a.types import Artifact, Part

async def invoke(prompt: str) -> Artifact:
    words = prompt.split()
    return Artifact(
        artifact_id=str(uuid.uuid4()),
        name="result",
        parts=[
            Part(text=f"Echo: {prompt}"),
            Part(
                raw=json.dumps({"original": prompt, "word_count": len(words)}).encode(),
                media_type="application/json",
            ),
            Part(raw=f"Echo: {prompt}\n".encode(), filename="echo.txt", media_type="text/plain"),
        ],
    )
```

The UI renders the three parts as: a markdown bubble, a key-value data table, and a file download card.

### Running the echo multipart agent

```bash
cd examples/echo_multipart
pip install -e ../../
pip install -r requirements.txt

docker run -d -p 6379:6379 redis:7-alpine
uvicorn main:app --reload
```

---

## Architecture

```
FastAPI app
├── /a2a    ← Starlette ASGI app (build_a2a_app)
│   ├── POST /            SendMessage, SendStreamingMessage, CancelTask, …
│   └── GET  /.well-known/agent-card.json
└── /       ← a2a_ui (Starlette, single HTML file)

Redis
├── a2a:task:{id}                 Task JSON (24 h TTL)
├── a2a:context:{cid}:tasks       Context index (task_id → sequence)
├── a2a:context:{cid}:sequence    Sequence counter
└── a2a:cancel:{id}               Cancel signal (5 min TTL)
```

### Conversation history injection

Each A2A task has a `context_id` shared across all turns of a conversation.
`ContextAwareRequestContextBuilder` fetches all prior tasks for the same
`context_id` from Redis and attaches them to the `RequestContext` as
`related_tasks`.

The default `prompt_builder` then calls `build_conversation_prefix()` to
extract the most recent `history_max_lines` lines of dialogue (default: 12)
and prepend them as `"Conversation so far:\n…"` before the user's message.
An optional `system_prompt` is inserted first.

The agent therefore sees recent history without the client needing to
replay it. Depth and format are fully configurable — see
[Prompt management](#prompt-management) above.

### How streaming works

`build_stream_invoke` wraps your generator in an `asyncio.Queue`-based
relay. Before starting the generator it sets a `ContextVar` callback that
`report_progress()` reads. Strings from `report_progress()` are placed in
the queue with a sentinel prefix; `ConfigurableAgentExecutor` routes them
to non-final `statusUpdate` (state `TASK_STATE_WORKING`) SSE events. All
other yielded strings become `artifactUpdate` events — the streaming text
the user sees.

---

## Publishing to PyPI

```bash
pip install hatch
hatch build
hatch publish
```

Or with `twine`:

```bash
pip install build twine
python -m build
twine upload dist/*
```

---

## License

MIT
