Metadata-Version: 2.4
Name: langchain-agentx-stream-ui-backend
Version: 0.1.7
Summary: SSE thin-layer SDK: LangchainAgentEvent → standard SSE for langchain_agentx_stream_ui
Requires-Python: >=3.11
Description-Content-Type: text/markdown
Requires-Dist: fastapi>=0.100
Requires-Dist: langchain-agentx-python>=1.5.5
Provides-Extra: dev
Requires-Dist: httpx>=0.27; extra == "dev"
Requires-Dist: pytest>=8.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.24; extra == "dev"

# langchain-agentx-stream-ui-backend

SSE 薄层 SDK：**仅 UI 线** — 将 `LangchainAgentEvent`（经 SDK `LangGraphToLangchainAgentEventAdapter` 的 out-of-band / artifact 投影）序列化为前端 SSE 协议。

不暴露 LangGraph 原始 dict，也不向浏览器推送 `ToolMessage.content`（model 路径，仅供 LLM）。

## 快速接入

```python
from fastapi import FastAPI, Request
from langchain_agentx_stream_ui_backend import stream_to_sse, wrap_langgraph

app = FastAPI()

@app.get("/wiki/build")
async def wiki_build(task_id: str, request: Request):
    graph = build_wiki_workflow(task_id)
    # adapter_config 可省略：wrap_langgraph 内置 DEFAULT_UI_ADAPTER_CONFIG
    events = wrap_langgraph(graph, {"task_id": task_id}, graph_name="WikiBuilder")
    return stream_to_sse(events, request=request, session_id=f"wiki-{task_id}")
```

## 测试组织（对齐 T10 金字塔）

| 层级 | 位置 | 说明 |
|------|------|------|
| L0 单测 | `langchain_agentx_stream_ui_backend/test_*.py`、`tests/test_*.py` | 函数/类级行为 |
| L1 集成 | `tests/integration/` | FastAPI 路由 + SSE / buffer / permission 全链路 |
| 冒烟 | `tests/smoke/` | CI 快速回归（`-m smoke`，约 7 项） |

原 `examples/` 目录已合并为 `tests/integration/sse/` 下的集成测试。

```bash
cd backend
python -m pip install -e ".[dev]"

# 全部测试（103 项）
pytest -v

# 仅 L0 单测
pytest langchain_agentx_stream_ui_backend/ tests/test_*.py -v

# 仅 L1 集成测试
pytest tests/integration/ -v -m integration

# 冒烟（发布前 / CI 快速门禁）
pytest tests/smoke/ -v -m smoke
```

### 集成测试覆盖

| 模块 | 文件 | 覆盖点 |
|------|------|--------|
| Wiki / LangGraph | `test_wiki_workflow_integration.py` | `wrap_langgraph` + SSE |
| AgentSession | `test_agent_session_integration.py` | `wrap_agent_session` + SSE |
| EventBuffer | `test_buffer_multi_subscriber_integration.py` | 多订阅、断连不解 ingest |
| Last-Event-ID | `test_last_event_id_integration.py` | SSE `id:` 续传、400 校验 |
| Middleware | `test_middleware_http_integration.py` | HTTP 路由级 middleware |
| Permission raise | `test_permission_raise_integration.py` | ingest 挂起 → POST 回写 → 继续 |
| 协议不变量 | `test_sse_protocol_integration.py` | meta 字段、bypass/raise 模式 |
| 断连 | `test_stream_disconnect_integration.py` | `aclose` 2 秒内触发 |

## 公开 API

- `stream_to_sse(events, *, request, session_id, ...)` → `StreamingResponse`（`SseStreamingService` 门面）
- `stream_to_sse(..., assign_event_ids=True)` — 形态 A 单连接内 SSE id 行（默认关闭，向后兼容）
- `stream_to_sse_from_buffer` — 自动读取 `Last-Event-ID` 请求头并写入 SSE `id:` 行
- `LastEventIdValidator` / `TaggedAgentEvent` — 续传校验与带 id 事件载荷
- `EventBuffer` / `ingest_stream_to_buffer` — 长任务「POST 触发 ingest + GET 订阅」
- `stream_to_sse(..., middlewares=[])` — 序列化前可选事件管道（默认空链与 MVP 一致）
- `permission_mode="raise"` + `create_permission_raise_setup` / `PermissionWaitRegistry` — SDK 权限挂起 → `permission-request` 事件（需 agentx ≥ 0.7.9）
- `EventMiddleware` / `EventMiddlewarePipeline` / `apply_middlewares` — 脱敏、trace 注入、debug 过滤
- `wrap_langgraph(graph, input_data, ...)` → `LangGraphEventStream`（内置 `DEFAULT_UI_ADAPTER_CONFIG`）
- `wrap_agent_session(session_stream, *, cancel=None)` → `AgentSessionEventStream`；`cancel` 为可选 `SessionCancelHandle`
- `safe_aclose_events` / `EventSourceLifecycle` — 上游释放（断连 finally 使用）
- `DEFAULT_UI_ADAPTER_CONFIG` / `merge_ui_adapter_config()` — UI 适配器默认（`UiAdapterConfigMerger`）
- `PROTOCOL_VERSION` — 当前为 `"1"`

### AgentSession 取消语义

断连或 `stream_to_sse` 结束时，薄层在 `finally` 调用 `events.aclose()`。对 AgentSession 路径：

1. 若传入 `cancel=SessionCancelHandle`，`aclose()` **优先** `await cancel.cancel()`
2. 否则回退 `await session_stream.aclose()`

推荐在业务侧传入显式 cancel，以便 SDK 升级时语义稳定：

```python
events = wrap_agent_session(session.astream(question), cancel=session)
return stream_to_sse(events, request=request)
```

### EventMiddleware 示例（tool-result 脱敏）

```python
import dataclasses
from langchain_agentx.observability.events.langchain_agentx_event_adapter import (
    LangchainAgentEvent,
    LangchainAgentEventType,
)

def redact_tool_result(event: LangchainAgentEvent) -> LangchainAgentEvent | None:
    if event.event_type != LangchainAgentEventType.TOOL_RESULT:
        return event
    data = dict(event.data)
    data["content"] = "[REDACTED]"
    if isinstance(data.get("display"), dict):
        display = dict(data["display"])
        display["value"] = "[REDACTED]"
        data["display"] = display
    return dataclasses.replace(event, data=data)

return stream_to_sse(events, request=request, middlewares=[redact_tool_result])
```

middleware 返回 `None` 表示丢弃该事件；抛错则转为 thin_layer `error` 终态帧。

### permission_mode="raise"（V2-D，特性开关）

默认 `permission_mode="bypass"` 与 MVP 一致。编码类场景可启用 `raise`，将 SDK `PermissionPromptHandler` 挂起转为 documented extension 事件 `permission-request`，等待业务回写后继续。

**依赖**：`langchain-agentx >= 0.7.9`（L3 `PermissionPromptHandler`）。

```python
import asyncio

from fastapi import FastAPI, HTTPException, Request
from langchain_agentx_stream_ui_backend import (
    EventBuffer,
    PermissionWaitRegistry,
    create_permission_raise_setup,
    ingest_stream_to_buffer,
    stream_to_sse_from_buffer,
    wrap_langgraph,
)

app = FastAPI()
_BUFFERS: dict[str, EventBuffer] = {}
_PERMISSIONS: dict[str, PermissionWaitRegistry] = {}

@app.post("/wiki/{task_id}/start")
async def start(task_id: str):
    perm = create_permission_raise_setup(session_id=task_id)
    _PERMISSIONS[task_id] = perm.registry
    buf = EventBuffer()
    _BUFFERS[task_id] = buf
    graph = build_graph(services={"prompt_handler": perm.handler})  # 注入 agentx graph
    events = wrap_langgraph(graph, {"task_id": task_id}, permission_broker=perm.broker)
    asyncio.create_task(ingest_stream_to_buffer(perm.wrap_stream(events), buf))
    return {"task_id": task_id}

@app.post("/wiki/{task_id}/permission")
async def permission(task_id: str, body: dict):
    registry = _PERMISSIONS[task_id]
    ok = await registry.resolve(body["request_id"], body)
    if not ok:
        raise HTTPException(404)
    return {"ok": True}

@app.get("/wiki/{task_id}/stream")
async def stream(task_id: str, request: Request):
    return stream_to_sse_from_buffer(
        _BUFFERS[task_id],
        request=request,
        permission_mode="raise",
        session_id=task_id,
    )
```

回退至 bypass：不传 `prompt_handler`，`stream_to_sse(..., permission_mode="bypass")`（默认）。

## 契约 Fixture

SSOT 路径：`backend/tests/fixtures/sse/*.events.json`。生成说明见该目录 `README.md`。
