Metadata-Version: 2.3
Name: agent-hub-kit
Version: 0.1.0
Summary: MCP ベースのマルチエージェント開発キット: ストリーミング対応の MCP クライアント / サーバーをまとめた統合ライブラリ
Requires-Dist: mcp>=1.2.0
Requires-Dist: httpx>=0.27.0
Requires-Dist: pydantic>=2.7.0
Requires-Dist: llm-graph-kit>=1.1.1
Requires-Python: >=3.13
Project-URL: Homepage, https://github.com/ToPo-ToPo-ToPo/agent-hub-kit
Description-Content-Type: text/markdown

# agent-hub-kit

MCP (Model Context Protocol) ベースの **マルチエージェント開発キット**。
MCP クライアントと MCP サーバーの両方を、**トークン単位ストリーミング対応**
で簡潔に書けるようにする **1 つのパッケージ**。複数のエージェントリポジトリ
から同じプロトコルで呼び合えるようにすることを目的とする。

## 特徴

- **トークン単位ストリーミング**: `notifications/progress.message` に
  JSON エンベロープを載せて `text` / `think` / `log` / `image` / `stl` /
  `video` / `emotion` などを多重化。LLM のトークンを 1 文字ずつ流せる
- **複数 MCP サーバー集約**: 1 つの `AgentHubClient` で複数の MCP サーバーへ
  並列接続し、ツール名でルーティング (`AsyncExitStack` で安全に管理)
- **中断 (cancellation)**: `async for` を `break` するだけで MCP の
  `notifications/cancelled` が自動送出される薄いラッパ
- **画像入力規約**: ツール引数 `images: list[str]` を data URI 形式で渡す
  慣習を統一
- **標準準拠**: 中身は MCP SDK (`mcp` パッケージ) のラッパで、独自プロトコル
  なし。他の MCP クライアント/サーバーとも互換

## インストール

```bash
uv add agent-hub-kit
# または
pip install agent-hub-kit
```

未リリース版を使う場合:

```bash
uv add "agent-hub-kit @ git+https://github.com/ToPo-ToPo-ToPo/agent-hub-kit.git"
```

要件: Python 3.13+。主要な依存は `mcp`, `httpx`, `pydantic`, `llm-graph-kit`。

## アーキテクチャ

```
              ┌────────────────────────────────────────┐
              │           AgentHubClient               │
              │     (複数 MCP サーバーを集約)          │
              └──────┬──────────────┬──────────────┬──┘
                     │ MCP/HTTP     │ MCP/HTTP     │ MCP/HTTP
                     ▼              ▼              ▼
          ┌──────────────────────────────────────────┐
          │           各 MCP サーバー                │
          │   EventStreamer で progress notification │
          │   としてトークン単位ストリーム配信        │
          └──────────────────────────────────────────┘
```

| 役割 | 主要 API | MCP プロトコル上の立ち位置 |
| --- | --- | --- |
| MCP クライアント (呼び出し側) | `AgentHubClient` | **client** |
| MCP サーバー (応答側) | `EventStreamer` / `stream_text_iter` | **server** |

## クイックスタート

サーバーとクライアントを 1 ファイルずつ作って動かす最小例。

### 1. サーバー (`my_server.py`)

```python
"""自前 MCP サーバーの最小例。"""
import asyncio
from mcp.server.fastmcp import Context, FastMCP
from agent_hub_kit import EventStreamer

mcp = FastMCP("my-app")

@mcp.tool()
async def echo(text: str, ctx: Context) -> str:
    """入力テキストを 1 文字ずつ text イベントとしてストリーム配信。"""
    stream = EventStreamer(ctx, node="my-app")
    await stream.log(f"受信: {len(text)} 文字")
    for ch in text:
        await stream.text(ch)
        await asyncio.sleep(0.02)
    return text

if __name__ == "__main__":
    mcp.settings.host = "127.0.0.1"
    mcp.settings.port = 8001
    mcp.run(transport="streamable-http")
```

### 2. クライアント (`my_client.py`)

```python
"""自前 MCP サーバーを呼び出すクライアント。"""
import asyncio
from agent_hub_kit import AgentHubClient

async def main():
    async with AgentHubClient() as client:
        await client.connect_http("my_app", "http://127.0.0.1:8001/mcp")

        async for ev in client.call_tool_stream("echo", {"text": "Hello"}):
            if ev.type == "text":
                print(ev.content, end="", flush=True)
            elif ev.type == "log":
                print(f"\n[log] {ev.content}")
            elif ev.type == "result":
                print(f"\n[result] {ev.content!r}")

asyncio.run(main())
```

### 3. 実行 (2 ターミナル)

```bash
# ターミナル A
python my_server.py

# ターミナル B
python my_client.py
```

`Hello` が 1 文字ずつストリームで届き、最後に `[result] 'Hello'` が表示
される。

## API リファレンス

公開 API はすべて `from agent_hub_kit import ...` で取得可能:

```python
from agent_hub_kit import (
    AgentHubClient,       # MCP クライアント
    ToolRef,              # tools/list で発見した 1 ツールの情報
    StreamEvent,          # ストリーム配信される 1 イベント
    ENVELOPE_VERSION,     # JSON エンベロープのバージョン
    encode_envelope,      # サーバー側で内部利用 (通常は EventStreamer 経由)
    decode_envelope,      # クライアント側で内部利用 (通常は自動)
    EventStreamer,        # サーバー側のストリーミング送出ヘルパー
    stream_text_iter,     # テキストイテレータを text イベントとして流すヘルパー
)
```

### `AgentHubClient`

複数 MCP サーバーへの接続を束ねる async context manager。

```python
async with AgentHubClient() as client:
    refs = await client.connect_http("server_a", "http://localhost:8001/mcp")
    ...
```

| メソッド | 説明 |
| --- | --- |
| `connect_http(server_name: str, url: str) -> list[ToolRef]` | Streamable HTTP で 1 サーバーに接続し、tools/list 結果を返す |
| `connect_all_http(endpoints: Mapping[str, str]) -> list[ToolRef]` | 複数エンドポイントを一括接続。`{"name": "http://.../mcp", ...}` |
| `list_tools() -> dict[str, list[ToolRef]]` | 接続中の全サーバーから tools/list を取り直す。`{server: [ToolRef]}` |
| `find_tool(tool_name: str) -> ToolRef \| None` | キャッシュ済みのツール情報を返す |
| `call_tool_stream(tool_name, arguments, *, server=None, queue_maxsize=256) -> AsyncIterator[StreamEvent]` | ツールを呼び出し、`StreamEvent` を逐次 yield する async generator |

`call_tool_stream()` は中断 (cancellation) も簡単:

```python
async for ev in client.call_tool_stream("echo", {"text": "..."}):
    if ev.type == "text":
        print(ev.content)
        if should_stop():
            break   # ← MCP の notifications/cancelled が自動送出される
```

### `EventStreamer`

MCP サーバー側でツール内から使うストリーミング送出ヘルパー。`FastMCP` の
`Context` をラップする。

```python
@mcp.tool()
async def my_tool(user_request: str, ctx: Context) -> str:
    stream = EventStreamer(ctx, node="my-agent")
    await stream.log("開始")
    for chunk in run_llm(user_request):
        await stream.text(chunk)
    return final_text
```

| メソッド | 配信される `StreamEvent.type` | 用途 |
| --- | --- | --- |
| `text(chunk: str)` | `text` | LLM トークン等のテキスト断片 |
| `think(chunk: str)` | `think` | 思考プロセス (UI でグレー表示等) |
| `log(message: str)` | `log` | 進捗ログ |
| `image(payload: dict)` | `image` | 画像 (URL / base64) |
| `stl(payload: dict)` | `stl` | STL モデル情報 |
| `video(payload: dict)` | `video` | 動画情報 |
| `emotion(payload: dict)` | `emotion_data` | 感情パラメータ |
| `error(message: str)` | `error` | ツール失敗扱いにせず継続したい時のエラー |
| `send(event_type: str, content: Any)` | 任意 | 上記以外の独自種別を送る場合の汎用 API |

### `stream_text_iter(stream, chunks)`

テキストイテレータをまとめて text イベントとして配信し、連結結果を返す
ヘルパー。同期/非同期どちらのイテレータも受け付ける。

```python
full_text = await stream_text_iter(stream, llm.stream(prompt))
```

### `StreamEvent`

クライアントが受信するイベント 1 件。`call_tool_stream()` が yield する型。

```python
@dataclass(slots=True)
class StreamEvent:
    type: str                    # "text" / "think" / "log" / ... / "result" / "error" / "cancelled"
    content: Any = None          # 種別依存ペイロード
    node: str | None = None      # 送信元サーバー / エージェント名
    raw: dict | None = None      # 元の JSON エンベロープ (デバッグ用)
```

## イベント種別一覧

クライアントが `StreamEvent.type` で受け取りうる値:

| `type` | 内容 | 発生源 |
| --- | --- | --- |
| `text` | LLM トークン等のテキスト断片 | `stream.text()` |
| `think` | 思考プロセス | `stream.think()` |
| `log` | 進捗ログ | `stream.log()` |
| `image` | 画像 | `stream.image()` |
| `stl` | STL モデル | `stream.stl()` |
| `video` | 動画 | `stream.video()` |
| `emotion_data` | 感情パラメータ | `stream.emotion()` |
| `result` | ツールの最終戻り値 | `call_tool_stream` が `call_tool` 完了時に 1 度だけ yield |
| `error` | エラー | `stream.error()` / `call_tool` 例外 / ツール側 isError |
| `cancelled` | キャンセル完了 | クライアント側で中断した時 |

## エンベロープ形式

`notifications/progress.message` に載せる JSON 形式:

```json
{
  "v": 1,
  "type": "text",
  "content": "あ",
  "node": "my-agent"
}
```

- `v`: `ENVELOPE_VERSION` (現在 1)
- `type`: 上記イベント種別のいずれか、または任意文字列
- `content`: 種別依存ペイロード (text は str、image は dict 等)
- `node`: 送信元の識別子 (任意。UI で吹き出し色分け等に使う想定)

JSON ではない通常文字列が来た場合は `type="text"` として解釈される
(前方互換)。

## パターン集

### 複数の MCP サーバーをひとつのクライアントから呼ぶ

```python
async with AgentHubClient() as client:
    await client.connect_all_http({
        "topology": "http://localhost:8001/mcp",
        "simulation": "http://localhost:8002/mcp",
        "rendering": "http://localhost:8003/mcp",
    })

    # ツール名で自動ルーティング (どの server に居るかは tools/list で発見済み)
    async for ev in client.call_tool_stream("optimize_beam", {"user_request": "..."}):
        handle(ev)
```

### 並列でツール呼び出し

```python
async def call(label):
    async for ev in client.call_tool_stream("ask", {"user_request": label}):
        if ev.type == "result":
            return ev.content
    return ""

async with AgentHubClient() as client:
    await client.connect_http("agent", "http://localhost:8001/mcp")
    a, b = await asyncio.gather(call("質問 A"), call("質問 B"))
```

### 中断 (cancellation)

```python
async for ev in client.call_tool_stream("long_running", args):
    if ev.type == "text":
        print(ev.content, end="")
        if user_pressed_stop():
            break   # ← MCP の notifications/cancelled が自動で送出される
```

### 画像を入力として渡す

サーバー側はツール引数に `images: list[str] | None = None` を取り、data URI
形式の base64 文字列リストを受け取る規約。

```python
# クライアント側
import base64
with open("input.png", "rb") as f:
    b64 = base64.b64encode(f.read()).decode()
images = [f"data:image/png;base64,{b64}"]

async for ev in client.call_tool_stream(
    "analyze_image",
    {"user_request": "この画像を説明して", "images": images},
):
    ...
```

```python
# サーバー側
@mcp.tool()
async def analyze_image(
    user_request: str,
    ctx: Context,
    images: list[str] | None = None,
) -> str:
    stream = EventStreamer(ctx, node="vision-agent")
    if images:
        for uri in images:
            _, b64 = uri.split(",", 1)
            img_bytes = base64.b64decode(b64)
            # ... VLM 等に渡す
    ...
```

### エラーハンドリング

```python
async for ev in client.call_tool_stream("flaky_tool", args):
    if ev.type == "error":
        # ツール内で stream.error() されたか、ツールが isError=True で完了した
        log.warning(f"server error: {ev.content}")
    elif ev.type == "result":
        ...
```

接続自体が失敗する場合は `connect_http` が例外を投げる:

```python
try:
    await client.connect_http("server", url)
except Exception as e:
    log.error(f"接続失敗: {e!r}")
```

### llm-graph-kit と組み合わせて LLM エージェントを書く

[llm-graph-kit](https://pypi.org/project/llm-graph-kit/) で宣言した
グラフベースのエージェントを MCP サーバーとして公開するパターン。
llm-graph-kit のイベント (`{"type": "answer_text", ...}` 等) を
`EventStreamer` のメソッドにマップする:

```python
from mcp.server.fastmcp import Context, FastMCP
from llm_graph_kit import LLMGraph, NodeState
from agent_hub_kit import EventStreamer

class MyAgent:
    def __init__(self, llm):
        self.llm = llm

    def build_graph(self) -> LLMGraph:
        g = LLMGraph(state_schema=...)
        g.add_node("answer", self._answer)
        ...
        return g

    def _answer(self, state: NodeState):
        for chunk in self.llm.respond(...):
            yield {"type": "answer_text", "content": chunk}
        return {"answer": "..."}

    def run(self, question: str):
        yield from self.build_graph().run({"question": question})


mcp = FastMCP("my-llm-agent")
agent = MyAgent(llm=...)

@mcp.tool()
async def ask(user_request: str, ctx: Context) -> str:
    stream = EventStreamer(ctx, node="my-llm-agent")
    answer = ""
    for event in agent.run(user_request):
        if event["type"] == "answer_text":
            answer += event["content"]
            await stream.text(event["content"])
        elif event["type"] == "log":
            await stream.log(event["content"])
    return answer
```

完全な動作例は本リポジトリの `examples/llm_agent_server.py` (DummyLLM 版)
および `examples/llm_agent_server_mlx.py` (実 LLM 版) を参照。

### サーバーを auto-reload で開発する

`mcp.run(transport="streamable-http")` の代わりに ASGI アプリを取り出して
uvicorn を使えば `--reload` が利用できる:

```python
# my_server_reload.py
from my_server import mcp

if __name__ != "__main__":
    app = mcp.streamable_http_app()

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(
        "my_server_reload:app",
        host="127.0.0.1", port=8001,
        reload=True,
        reload_dirs=["."],
    )
```

```bash
python my_server_reload.py   # .py を編集すると自動再起動
```

LLM ロードが重い場合は、再起動コストとのトレードオフがあるので、軽量
モデルで開発するか、--reload を使わず手動再起動が現実的な場合も多い。

## リポジトリ内の実行サンプル

本リポジトリをクローンした場合は `examples/` に動作確認用のサンプルが
入っている。`uv sync` 後に以下で動く:

| サンプル | 起動コマンド | LLM | OS |
| --- | --- | --- | --- |
| `examples/llm_agent_server.py` | `uv run python examples/llm_agent_server.py` | DummyLLM | 不問 |
| `examples/llm_agent_server_mlx.py` | `uv run python examples/llm_agent_server_mlx.py` | `augllm.MlxLLM` | macOS + MLX |
| `examples/llm_agent_server_reload.py` | `uv run python examples/llm_agent_server_reload.py` | DummyLLM (+ --reload) | 不問 |
| `examples/llm_agent_server_mlx_reload.py` | `uv run python examples/llm_agent_server_mlx_reload.py` | MlxLLM (+ --reload) | macOS + MLX |

クライアントはどのサーバーに対しても共通:

```bash
uv run python main.py            # 標準 (内部で examples/llm_agent_client.py を実行)
# または
uv run python examples/llm_agent_client.py
```

ポート切替は環境変数 `LLM_AGENT_PORT` (デフォルト 8201) で。サーバーと
クライアントで同じ値を指定する。

## なぜ MCP の素の SDK ではなく本キットなのか

MCP SDK ([`mcp` パッケージ](https://github.com/modelcontextprotocol/python-sdk))
だけでも client/server の双方向通信は可能だが、**マルチエージェント開発**
では以下が頻出のため、本キットでカプセル化している:

1. **トークン単位ストリーミング**: `notifications/progress.message` に
   JSON エンベロープを載せて多種類のイベントを多重化
2. **複数サーバー集約**: 1 つのクライアントから複数の MCP サーバーへ
   並列接続し、ツール名で自動ルーティング
3. **中断 (cancellation)**: `async for` を `break` するだけで
   `notifications/cancelled` が自動送出される
4. **画像入力規約**: tool 引数 `images: list[str]` を data URI 形式で
   渡す慣習を統一

## 開発

```bash
git clone https://github.com/ToPo-ToPo-ToPo/agent-hub-kit.git
cd agent-hub-kit
uv sync
uv run pytest -v
```

テストは `tests/test_llm_agent_integration.py` に集約されており、
`examples/llm_agent_server.py` をサブプロセス起動して公開 API のみで
end-to-end (発見・ストリーミング・中断・並行呼び出し) を検証する。

## ライセンス

MIT
