Metadata-Version: 2.4
Name: langgraph-a2a
Version: 0.1.5
Summary: LangGraph A2A server framework with GenericAgentExecutor and pluggable agents
Author: Sudhagar Narayaan
License-Expression: MIT
License-File: LICENSE
Keywords: a2a,agent-to-agent,agents,langgraph,starlette
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: <3.14,>=3.11
Requires-Dist: a2a-sdk>=1.0.2
Requires-Dist: aiofiles>=24.1.0
Requires-Dist: langgraph>=0.3.0
Requires-Dist: protobuf<7.0,>=5.29.4
Requires-Dist: sse-starlette>=3.3.4
Requires-Dist: starlette<1.0.0,>=0.49.1
Requires-Dist: uvicorn>=0.46.0
Description-Content-Type: text/markdown

# LangGraph A2A Server Framework

## What is this?

**langgraph-a2a** is a Python package that hosts **LangGraph** agents behind an **Agent-to-Agent (A2A)** HTTP server. It gives you a ready-made server, protocol handling, and a pluggable executor so you can focus on building graphs—not wiring JSON-RPC, streaming, or agent discovery yourself.

## Objective

| Goal | How this framework helps |
|------|---------------------------|
| Expose a LangGraph agent over the network | A2A-compatible HTTP API (agent card, JSON-RPC, streaming) |
| Plug in your own agent logic | You ship a separate Python project; the framework loads it via entry points |
| Run one agent per process | CLI: `langgraph-a2a --agent <name>` |
| Support conversation memory | Maps each request’s A2A `contextId` to LangGraph `thread_id` when you compile with a checkpointer |
| Optional startup/shutdown | Per-agent lifespan hooks (e.g. DB pools, compile graph after secrets load) |

The framework does **not** include your business agents. It provides the **server and executor only**.

## Package

| | |
|---|---|
| **PyPI name** | `langgraph-a2a` |
| **Import** | `langgraph_a2a` |
| **CLI** | `langgraph-a2a` |
| **License** | MIT |

**Install:**

```bash
pip install langgraph-a2a
```

**What you get:**

- `GenericAgentExecutor` — runs LangGraph graphs via `ainvoke` / `astream` with A2A message format
- `langgraph-a2a` CLI — list and run registered agents
- Registry — discovers agents from `[project.entry-points."langgraph_a2a.agents"]` in *your* `pyproject.toml`
- Helpers — app config, secrets, agent card, default in-memory checkpointer utilities

## Requirements

| Requirement | Version / note |
|-------------|----------------|
| Python | `>=3.11, <3.14` |
| Your agent project | Separate installable package depending on `langgraph-a2a` |
| LangGraph | Declared in your project (e.g. `langgraph>=0.3.0`) |
| Per-agent config file | `local_app_config.json` (host, port, agent card) |
| Per-agent secrets (optional) | `local_secrets.json` |
| Checkpointer (optional) | Required only if you need conversation memory; choose in-memory or a LangGraph backend (e.g. PostgreSQL) in your agent code |

**Client requirement for multi-turn chat:** send the same A2A `contextId` on each message in a conversation. The framework passes it as LangGraph `configurable.thread_id`; you do not set `thread_id` in graph nodes when using `GenericAgentExecutor`.

---

## Quickstart

Build a separate agent project from scratch, run it with `langgraph-a2a`, and test with `curl`.

You will do two parts:

| Part | Checkpointer | Memory survives server restart? |
|------|----------------|----------------------------------|
| **A** | Default in-memory (`MemorySaver`) | No |
| **B** | PostgreSQL + lifespan | Yes |

Both parts use the same `graph.py`. Part B adds `lifespan.py` and changes `agent.py`.

---

### Step 1 — Create the project folder

```bash
mkdir -p ~/my-agent-app/my_agent_app
cd ~/my-agent-app
touch my_agent_app/__init__.py
```

Your work happens in `~/my-agent-app`, **not** inside the `langgraph-a2a` source repo.

---

### Step 2 — Folder structure

**After Part A (in-memory):**

```
my-agent-app/
├── pyproject.toml
└── my_agent_app/
    ├── __init__.py
    ├── graph.py
    ├── agent.py
    └── local_app_config.json
```

**After Part B (PostgreSQL), add:**

```
    ├── lifespan.py
    └── local_secrets.json
```

| File | Purpose |
|------|---------|
| `pyproject.toml` | Dependencies + agent entry point |
| `graph.py` | LangGraph nodes and state |
| `agent.py` | Registers agent with the framework |
| `lifespan.py` | (Part B) Opens Postgres, compiles graph with `AsyncPostgresSaver` |
| `local_app_config.json` | Host, port, agent card (required) |
| `local_secrets.json` | (Part B) `DATABASE_URL` — do not commit to git |

---

### Step 3 — Shared `graph.py`

Create `my_agent_app/graph.py` (used in both parts):

```python
from typing import TypedDict

from langgraph.graph import END, START, StateGraph


class State(TypedDict, total=False):
    user_input: str
    output: str
    turn_count: int


def greet(state: State) -> dict:
    text = state.get("user_input") or ""
    count = (state.get("turn_count") or 0) + 1
    return {
        "output": f"Hello! You said: {text} (turn {count})",
        "turn_count": count,
    }


def build_graph():
    builder = StateGraph(State)
    builder.add_node("greet", greet)
    builder.add_edge(START, "greet")
    builder.add_edge("greet", END)
    return builder
```

- Framework sends **`user_input`**.
- Your node returns **`output`** (reply text).
- **`turn_count`** helps verify memory: same `contextId` → turn 2, 3, …

---

## Part A — Default checkpointer (in-memory)

### Step A1 — `pyproject.toml`

```toml
[project]
name = "my-agent-app"
version = "0.1.0"
requires-python = ">=3.11, <3.14"
dependencies = [
    "langgraph-a2a",
]

[project.entry-points."langgraph_a2a.agents"]
my_agent = "my_agent_app.agent:Agent"
```

### Step A2 — `agent.py`

```python
from pathlib import Path
from typing import Any

from langgraph_a2a.base_utils import compile_with_default_checkpointer
from langgraph_a2a.executor import GenericAgentExecutor

from .graph import build_graph


class Agent:
    name = "my_agent"

    @staticmethod
    def register() -> dict[str, Any]:
        agent_dir = Path(__file__).parent
        graph = compile_with_default_checkpointer(build_graph())
        return {
            "name": Agent.name,
            "executor": GenericAgentExecutor(agent_impl=graph, enable_streaming=True),
            "local_config_path": agent_dir / "local_app_config.json",
        }
```

Part A does **not** use `local_secrets.json`. Omitting `local_secrets_path` is fine — the server sets `app.state.secrets` to `{}`.

`compile_with_default_checkpointer` attaches **`MemorySaver`**. You do **not** pass `thread_id`; the server sets it from each request’s **`contextId`**.

### Step A3 — `local_app_config.json`

```json
{
  "host": "127.0.0.1",
  "port": 9000,
  "protocol": "http",
  "logging": { "level": "INFO" },
  "agent_card": {
    "name": "my-agent",
    "description": "Quickstart agent (in-memory memory)",
    "version": "0.1.0",
    "skills": [
      {
        "id": "chat",
        "name": "Chat",
        "description": "Stateful chat",
        "tags": ["quickstart"],
        "examples": ["Hello"]
      }
    ],
    "capabilities": {
      "streaming": true,
      "push_notifications": false
    }
  }
}
```

### Step A4 — Install and run (Part A)

```bash
cd ~/my-agent-app
pip install -e .

langgraph-a2a --list-agents
langgraph-a2a --agent my_agent
```

Leave the server running. Open a **second terminal** for `curl`.

### Step A5 — Test with `curl` (Part A)

**Message 1** — use `contextId` `demo-memory-001`:

```bash
curl -sS -N -X POST http://127.0.0.1:9000/ \
  -H "Content-Type: application/json" \
  -d '{
    "jsonrpc": "2.0",
    "id": "1",
    "method": "message/stream",
    "params": {
      "message": {
        "kind": "message",
        "messageId": "msg-1",
        "contextId": "demo-memory-001",
        "role": "user",
        "parts": [{"kind": "text", "text": "Hi"}]
      }
    }
  }'
```

Expect **`turn 1`** in the streamed JSON.

**Message 2** — same `contextId`, new `messageId`:

```bash
curl -sS -N -X POST http://127.0.0.1:9000/ \
  -H "Content-Type: application/json" \
  -d '{
    "jsonrpc": "2.0",
    "id": "2",
    "method": "message/stream",
    "params": {
      "message": {
        "kind": "message",
        "messageId": "msg-2",
        "contextId": "demo-memory-001",
        "role": "user",
        "parts": [{"kind": "text", "text": "Again"}]
      }
    }
  }'
```

Expect **`turn 2`**.

If you **restart the server** and send again with the same `contextId`, you will get **`turn 1`** again (in-memory only).

---

## Part B — PostgreSQL checkpointer + lifespan

Use Part B when conversation state must **survive server restarts**.

### Step B0 — PostgreSQL (Docker example)

```bash
docker run -d --name langgraph-a2a-pg \
  -e POSTGRES_PASSWORD=postgres \
  -e POSTGRES_DB=myagent \
  -p 5432:5432 \
  postgres:16
```

URL:

```
postgresql://postgres:postgres@localhost:5432/myagent
```

### Step B1 — Update `pyproject.toml`

Add Postgres packages:

```toml
dependencies = [
    "langgraph-a2a",
    "langgraph>=0.3.0",
    "langgraph-checkpoint-postgres",
    "psycopg[pool]>=3.1",
]
```

Re-install:

```bash
pip install -e .
```

### Step B2 — `local_secrets.json`

Create `my_agent_app/local_secrets.json`:

```json
{
  "DATABASE_URL": "postgresql://postgres:postgres@localhost:5432/myagent"
}
```

Add `local_secrets.json` to `.gitignore`.

### Step B3 — `lifespan.py`

Create `my_agent_app/lifespan.py`:

```python
from typing import Any

from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from psycopg_pool import AsyncConnectionPool

from langgraph_a2a.base_utils import compile_with_default_checkpointer

from .graph import build_graph


async def startup(app: Any) -> None:
    url = (app.state.secrets or {}).get("DATABASE_URL")
    if not url:
        raise ValueError("DATABASE_URL missing in local_secrets.json")

    pool = AsyncConnectionPool(conninfo=url, max_size=10, open=False)
    await pool.open()
    app.state.db_pool = pool

    checkpointer = AsyncPostgresSaver(pool)
    await checkpointer.setup()

    app.state.executor.agent = compile_with_default_checkpointer(
        build_graph(),
        checkpointer=checkpointer,
    )


async def shutdown(app: Any) -> None:
    pool = getattr(app.state, "db_pool", None)
    if pool is not None:
        await pool.close()
```

**Lifespan sync with `app.state`:**

1. Framework loads `local_app_config.json` → `app.state.app_config`
2. Framework loads `local_secrets.json` → `app.state.secrets`
3. Framework creates `app.state.executor` and calls `bind_app(app)`
4. **`startup(app)` runs** — pool + Postgres checkpointer + `executor.agent = compiled graph`
5. Requests run; each `contextId` → `thread_id` → Postgres checkpoint row
6. **`shutdown(app)`** — closes the pool

### Step B4 — Replace `agent.py` (Part B)

```python
from pathlib import Path
from types import SimpleNamespace
from typing import Any

from langgraph_a2a.executor import GenericAgentExecutor

from . import lifespan as agent_lifespan


class Agent:
    name = "my_agent"

    @staticmethod
    def register() -> dict[str, Any]:
        agent_dir = Path(__file__).parent
        return {
            "name": Agent.name,
            "executor": GenericAgentExecutor(agent_impl=None, enable_streaming=True),
            "local_config_path": agent_dir / "local_app_config.json",
            "local_secrets_path": agent_dir / "local_secrets.json",
            "lifespan": SimpleNamespace(
                startup=agent_lifespan.startup,
                shutdown=agent_lifespan.shutdown,
            ),
        }
```

`agent_impl=None` at import time is intentional; **`lifespan.startup`** sets `app.state.executor.agent` after Postgres is ready.

### Step B5 — Run (Part B)

```bash
langgraph-a2a --agent my_agent
```

First run creates checkpoint tables via `await checkpointer.setup()`.

### Step B6 — Test with `curl` (Part B)

Use the **same curl commands as Part A** (Steps A5), e.g. `contextId` `demo-memory-001`.

1. First message → **`turn 1`**
2. Second message (same `contextId`) → **`turn 2`**
3. **Stop and restart** `langgraph-a2a`, same `contextId` again → **`turn 3`** (Postgres kept state)

New `contextId` → starts at **`turn 1`** again.

### Quick reference — memory rules

| | Part A (in-memory) | Part B (Postgres + lifespan) |
|--|-------------------|------------------------------|
| Compile | In `agent.py` at register | In `lifespan.startup` |
| `thread_id` | From client `contextId` (automatic) | Same |
| Survives restart | No | Yes |
| `local_secrets_path` in `register()` | Omit | Required (`DATABASE_URL` in `local_secrets.json`) |

Look for `artifact-update` and `parts` in the SSE stream for the agent reply text.
