by-framework · trace architecture
Trace 体系完整设计与接入指南
这份文档解释 by-framework 的 trace 写入规范、上下文传播、Langfuse/Phoenix 层级、 Redis Read SDK、Dashboard 查询链路,以及普通 Worker、LangGraph Worker、外部应用的接入方法。
TL;DR
by-framework 的 trace 目标是让 client dispatch、worker execution、agent workflow、
LLM/tool/span、session event 可以在同一个 trace 下被统一写入和读取。
写入侧使用 by_framework.trace 的 schema 和 best-effort writer;
读取侧使用独立包 by-framework-trace-query;
Dashboard 只是 Read SDK 的调用方。
最关键的传播参数是 trace_id、trace_parent_span_id、
langfuse_parent_observation_id、session_id、
message_id、parent_message_id。
设计总览
Trace 被拆成两个方向:写入规范负责把运行时事实稳定落到 Redis / Langfuse / Phoenix; 读取 SDK 负责把不同来源合并为一个可解释的 trace tree。这样 Dashboard、CLI、外部系统都可以复用同一套读能力。
推荐的 Langfuse 层级
为什么要有 agent.workflow?
worker.execute 是一次 worker 运行片段;LangGraph 遇到工具调用会 suspend 并返回
QUEUED。如果把异步 child 挂到已经结束的 agent task 下,Langfuse 会出现子节点耗时超过父节点。
workflow span 代表逻辑流程,跨 suspend/resume 保持打开,时间语义更自然。
统一数据模型
TraceRecord
trace 级元数据,用于列表、检索、根输入输出和最终状态。
trace_id、name、session_idroot_message_id、root_agent_typeinput、output、statusstart_ts、end_ts、metadata
ExecutionRecord
任务执行级信息,主要落在 session registry 中,支持 fallback 重建。
execution_id、message_id、parent_message_idworker_id、source_agent_type、target_agent_typestatus、timing、route、error
SpanRecord
trace 树节点,是 Redis spans、Langfuse observations、Phoenix OTel spans 的统一语义。
span_id、parent_span_id、name、operationcomponent、kind、sourceinput、output、tokens、cost
EventRecord
session data stream 事件,用于前端流式输出和 trace fallback。
event_id、trace_id、session_id、message_idevent_type、content_type、timestamppayload
| 字段 | 必须一致的原因 | 典型来源 |
|---|---|---|
trace_id |
所有 spans、executions、events 归并到同一 trace 的主键。 | Client 生成;worker、子 agent、外部应用继承。 |
message_id / parent_message_id |
构建业务调用链和 resume 回调链。 | AskAgentCommand / ResumeCommand header。 |
trace_parent_span_id |
通用 OpenTelemetry 父 span,Phoenix 和其他 APM 依赖。 | 当前 OTel span 或 header metadata。 |
langfuse_parent_observation_id |
Langfuse observation 层级父节点,必须指向正确 workflow/dispatch parent。 | client.dispatch 或 agent.workflow observation。 |
写入路径
1. Client dispatch 写入根信息
Python / Java client 发送 AskAgentCommand 时会写入 Redis trace meta、client.dispatch span, 并把 trace parent 信息放入 header。没有配置 Langfuse 时,client trace 写入应该 best-effort 跳过, 不能影响主流程。
2. Worker task start 创建 workflow 与 worker.execute
Langfuse 插件会创建 agent.workflow:<agent>、worker.execute、
<agent> 三层。workflow 用于跨异步流程;worker.execute 表示当前执行片段。
3. Agent emit / call_agent / dispatch_group 写 Redis spans
AgentContext 会记录 agent.emit_chunk、agent.dispatch_group 等 span。
call_agent 会优先传播 workflow observation id,避免异步 child 挂到已经结束的 task 下。
4. Task complete / error / cancel 写 output 与状态
完成、失败、取消都会更新 observation output 和 trace-level output。写入失败只记录 warning, 不抛到业务执行链路。
手动写入示例
from by_framework.trace import TraceRecord, TraceWriteClient
writer = TraceWriteClient()
await writer.record_trace(
TraceRecord(
trace_id=header.trace_id,
name=f"client.dispatch:{header.target_agent_type}",
session_id=header.session_id,
root_message_id=header.message_id,
root_agent_type=header.target_agent_type,
input=command.content,
status="RUNNING",
start_ts=now_ms,
)
)
from by_framework.trace import SpanRecord, TraceWriteClient
writer = TraceWriteClient()
await writer.record_span(
SpanRecord(
trace_id=header.trace_id,
span_id="custom-step-1",
parent_span_id=header.trace_parent_span_id,
name="Knowledge_Retrieval",
operation="knowledge.retrieve",
component="external-worker",
kind="retriever",
start_ts=start_ms,
end_ts=end_ms,
input={"query": query},
output={"documents": len(docs)},
source="external",
session_id=header.session_id,
message_id=header.message_id,
)
)
from by_framework.trace import EventRecord, TraceWriteClient
writer = TraceWriteClient()
await writer.record_event(
EventRecord(
event_id="evt-001",
trace_id=header.trace_id,
session_id=header.session_id,
message_id=header.message_id,
event_type="chunk",
content_type="text",
timestamp=now_ms,
payload={"content": "partial answer"},
)
)
读取 SDK
by-framework-trace-query 是 Dashboard 和外部查询工具的统一入口。
v1 source 是 Redis,后续可以扩展 Langfuse / Phoenix source。单个 source 失败时返回
status=partial 和 diagnostics,而不是让整体查询失败。
from by_framework_trace_query import TraceReadClient
client = TraceReadClient()
result = await client.get_trace(trace_id, session_id=session_id)
print(result.status)
print(result.trace.to_dict())
print([span.to_dict() for span in result.spans])
print([node.to_dict() for node in result.tree])
from by_framework_trace_query import TraceReadClient
client = TraceReadClient()
by_session = await client.list_traces(session_id="sess-123", limit=20)
by_worker = await client.list_traces(worker_id="worker-1", limit=20)
by_agent = await client.list_traces(agent_type="weather-agent", limit=20)
from by_framework_trace_query import TraceReadClient
client = TraceReadClient(max_spans=1000)
explain = await client.explain_trace(trace_id, session_id=session_id)
# diagnostics codes:
# missing_client_dispatch, missing_worker_execute, missing_parent,
# parent_cycle, trace_output_missing, span_count_exceeded, source_timeout
print(explain)
Redis Source
读取 trace_meta、trace_spans、trace indexes;没有 stored spans 时从 session fallback 合成。
Trace Merger
按 span_id 去重,保留 input/output/tokens/cost 更完整的 span,并构建树。
Diagnostics
返回结构问题和 source 问题,帮助 Dashboard 明确是缺写入、缺 parent、循环还是部分读取。
Langfuse / Phoenix 集成
Langfuse
安装 by-framework-trace-langfuse 并配置环境变量后,Worker 启动时会自动发现
LangfuseTraceProviderFactory。插件会把 by-framework 的 trace id 转成 Langfuse 需要的
32 位 hex trace id,并使用 parent_span_id 连接 observation 层级。
export BYAI_LANGFUSE_ENABLED=true
export LANGFUSE_PUBLIC_KEY=pk-lf-...
export LANGFUSE_SECRET_KEY=sk-lf-...
export LANGFUSE_BASE_URL=http://localhost:3000
Phoenix
安装 by-framework-trace-phoenix 后,通过 OpenTelemetry parent context 连接通用 span。
Phoenix 更适合查看 OTel 链路、OpenInference/LangChain/OpenAI instrumentation。
export BYAI_PHOENIX_ENABLED=true
export PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006/v1/traces
export PHOENIX_PROJECT_NAME=by-framework
Langfuse 的 observation.end() 在当前 SDK 中不接受 output=... 参数。
by-framework 插件会兼容处理:能直接 end 就直接 end,否则先 update output 再 end。
接入方式
from by_framework import AgentContext, GatewayWorker
class MyWorker(GatewayWorker):
async def process_command(self, command, context: AgentContext):
await context.emit_chunk("starting")
result = await do_work(command.content)
await context.emit_chunk(result)
return result
# 自动接入:
# - header.trace_id 进入 context.trace_id
# - worker.execute / agent.workflow 由插件写入
# - emit_chunk / call_agent 写 Redis spans 和 session events
class PlannerWorker(LangGraphWorker):
def get_thread_id(self, context):
# 推荐使用 message_id,避免同 session 下旧挂起状态串扰。
return context.message_id or context.session_id
def build_graph(self):
# make_remote_agent_tool 会通过 context.call_agent 派发子 agent。
# trace parent 会自动使用 agent.workflow observation。
...
# 时间语义:
# - A initial worker.execute: 执行到 interrupt / QUEUED
# - B worker.execute: 子 agent 实际耗时
# - A resume worker.execute: 回调后继续执行
# - agent.workflow:A: 覆盖完整逻辑流程
from langfuse import Langfuse
from by_framework.trace import (
extract_external_trace_context,
start_langfuse_observation,
build_otel_parent_context,
)
from opentelemetry import trace
ctx = extract_external_trace_context(command_dict)
langfuse = Langfuse()
obs = start_langfuse_observation(
langfuse,
command_dict,
name="external_plain_app",
as_type="span",
input_data=command_dict.get("content"),
)
tracer = trace.get_tracer("external-app")
with tracer.start_as_current_span(
"external_plain_app",
context=build_otel_parent_context(command_dict),
):
result = run_external_pipeline()
obs.update(output=result)
obs.end()
接入检查清单
trace_id。parent_message_id 为调用方 message_id。trace_parent_span_id 作为父 span。langfuse_parent_observation_id 作为父 observation。端到端 Trace 链路示例
本节以 Client → Orchestrator Worker → Sub-agent (LangGraph) → Return 为例, 展示一次完整请求中 trace_id、Langfuse observation 层级、token 统计如何自动串联。
Langfuse 层级总览
一次请求在 Langfuse 中会形成如下树形结构(trace_id 贯穿全程):
Trace {trace_id}
│
└─ agent.workflow:orchestrator ← 整个逻辑任务生命周期(含 suspend/resume)
│
├─ worker.execute [orchestrator, 初始] token usage: 0(纯路由)
│ └─ agent.call_agent:weather-agent ← on_call_agent_start 打点,记录调用 input/output
│ │
│ └─ agent.workflow:weather-agent ← sub-agent 收到命令时以此 obs 为 parent
│ └─ worker.execute [weather-agent]
│ └─ generation (ChatOpenAI) ← input_tokens / output_tokens ✓
│
├─ agent.return ← sub-agent 回包时打点
│
└─ worker.execute [orchestrator, resume] ← 恢复段,可再次调 LLM
框架自动完成 langfuse_parent_observation_id 和
framework_parent_span_id 的传播——业务代码只需调用
context.call_agent(),无需手动处理任何 header。
关键字段传播路径
| 字段 | 谁写入 | 如何传播 | 谁消费 |
|---|---|---|---|
trace_id |
Client(首次生成) | 所有派生命令继承 header.trace_id | 每个 worker,贯穿全链 |
langfuse_parent_observation_id |
on_call_agent_start(写 agent.call_agent obs.id) |
注入转发命令的 header & metadata | sub-agent on_task_start 以此为 parent 挂 workflow |
framework_parent_span_id |
_enqueue_agent_return(写 {execution_id}:agent.return) |
ResumeCommand metadata | resume 时 worker.execute span 的 parent |
usage_metadata(token) |
LLM 最后一个 streaming chunk(需 stream_options={"include_usage":True}) |
on_llm_end 回调 → context.record_token_usage() |
worker.execute Redis span 和 Langfuse observation usage 字段 |
示例代码
# client_demo.py
import asyncio
from by_framework.client import GatewayClient
async def main():
client = GatewayClient(
redis_host="127.0.0.1",
redis_port=6379,
)
# send_message 自动生成 trace_id、message_id 并写入 Redis Stream
response = await client.send_message(
target_agent_type="orchestrator",
session_id="session-demo-001",
content="帮我查一下北京今天天气,然后给我一个出行建议",
user_code="user-001",
user_name="张三",
)
print("message_id:", response.message_id)
print("trace_id :", response.trace_id)
asyncio.run(main())
# orchestrator_worker.py
import os
from typing import Any, List
from by_framework.worker import GatewayWorker, run_worker
from by_framework.core.protocol.commands import AskAgentCommand, ResumeCommand
from by_framework.worker.context import AgentContext
class OrchestratorWorker(GatewayWorker):
def get_agent_types(self) -> List[str]:
return ["orchestrator"]
async def process_command(
self, command: AskAgentCommand | ResumeCommand, context: AgentContext
) -> Any:
if isinstance(command, ResumeCommand):
# ── 恢复阶段:sub-agent 已返回 ──
# command.reply_data 是子 agent 的结果
weather_result = command.reply_data
await context.emit_chunk(
f"天气信息已获取:{weather_result}\n根据天气,建议携带雨具出行。"
)
return {"status": "done", "weather": weather_result}
# ── 初始阶段:收到用户请求 ──
# context.call_agent 自动完成:
# 1. 在 Langfuse 创建 agent.call_agent:weather-agent span
# 2. 将 langfuse_parent_observation_id 注入转发命令的 header
# 3. 挂起本 worker,等待 ResumeCommand 返回
await context.call_agent(
target_agent_type="weather-agent",
content={"city": "北京", "query": command.content},
wait_for_reply=True,
)
if __name__ == "__main__":
run_worker(
OrchestratorWorker,
worker_id="orchestrator-worker-1",
redis_host=os.getenv("BYAI_REDIS_HOST", "127.0.0.1"),
redis_port=int(os.getenv("BYAI_REDIS_PORT", 6379)),
plugins=[_make_langfuse_plugin()],
)
# weather_agent_worker.py
import os
from typing import List
from by_framework.worker import run_worker
from by_framework_langgraph import LangGraphWorker
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
class WeatherAgentWorker(LangGraphWorker):
def get_agent_types(self) -> List[str]:
return ["weather-agent"]
def build_graph(self, context, command):
llm = ChatOpenAI(
model=os.getenv("LLM_MODEL", "gpt-4o"),
base_url=os.getenv("OPENAI_BASE_URL"),
api_key=os.getenv("OPENAI_API_KEY"),
streaming=True,
# ↓ 必须开启,否则 streaming 模式下拿不到 token 用量
stream_options={"include_usage": True},
)
return create_react_agent(
llm,
tools=[],
checkpointer=self.get_checkpointer(),
prompt="你是专业的天气助手,根据城市给出天气状况和出行建议,不超过100字。",
)
if __name__ == "__main__":
run_worker(
WeatherAgentWorker,
worker_id="weather-agent-worker-1",
redis_host=os.getenv("BYAI_REDIS_HOST", "127.0.0.1"),
redis_port=int(os.getenv("BYAI_REDIS_PORT", 6379)),
plugins=[_make_langfuse_plugin()],
)
# 两个 worker 都需要注入同一套 LangfusePlugin 配置
import os
from by_framework_trace_langfuse import LangfusePlugin, LangfuseConfig
def _make_langfuse_plugin() -> LangfusePlugin:
return LangfusePlugin(
config=LangfuseConfig(
public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
base_url=os.getenv("LANGFUSE_BASE_URL"),
)
)
# 在 run_worker 调用时传入:
# run_worker(MyWorker, ..., plugins=[_make_langfuse_plugin()])
常见问题
stream_options={"include_usage": True}。
控制台会出现 [TokenAccumulator] on_llm_end fired but extracted 0 tokens 警告提示。
LangfusePlugin。
插件的 on_call_agent_start 钩子负责写入 langfuse_parent_observation_id;
没有插件则字段不会被设置。
GatewayWorker._enqueue_agent_return 传入了
context 参数,它用于计算 framework_parent_span_id。
Dashboard 与运行方式
Dashboard 已经独立为 by-framework-dashboard 子包。它通过
TraceReadClient 读取 trace,不直接耦合 Redis 查询细节。
uv run --package by-framework-dashboard by-framework-dashboard \
--host 127.0.0.1 \
--port 8765 \
--redis-host localhost \
--redis-port 6379 \
--redis-db 0
| 接口 | 说明 |
|---|---|
/api/traces?session_id=... | 按 session 列 trace summaries。 |
/api/traces?worker_id=... | 按 worker index 列 trace summaries。 |
/api/traces?agent_type=... | 按 agent index 列 trace summaries。 |
/api/trace/<trace_id> | 读取单个 trace 的完整 tree/timeline。 |
/metrics | 导出 Prometheus 格式指标。 |
排查手册
| 现象 | 优先检查 | 修复方向 |
|---|---|---|
| Langfuse 显示 Unnamed trace | client 是否写 client.dispatch;trace root 是否设置 trace name。 | 确认 client 侧 Langfuse 环境变量和 best-effort warning。 |
| 外部 app 节点和 worker.execute 同层 | langfuse_parent_observation_id 是否传给外部 app。 |
使用 start_langfuse_observation() 或手动设置 trace_context。 |
| 子节点耗时超过父节点 | 异步 child 是否挂到了已结束的 agent.task。 | 使用 agent.workflow 作为 call parent。 |
| Dashboard trace 没有 spans | Redis trace_spans 是否有写入;session fallback 是否有 execution/event。 |
检查 TraceReadClient.explain_trace() diagnostics。 |
| Java client trace 缺 client 信息 | Java 是否同步了 Redis client.dispatch 写入和 Langfuse parent header。 | 确认 Java SDK 和 Python schema 字段一致。 |
FAQ
client 端默认会生成哪些 trace 字段?
client 会生成或继承 trace_id,并为下游设置 trace_parent_span_id。
如果 Langfuse 已配置并成功创建 client.dispatch observation,还会设置
langfuse_parent_observation_id。没有配置 Langfuse 时不应影响发送消息。
外部应用一定要使用 by-framework SDK 吗?
不一定。外部应用只要能拿到普通 AskAgentCommand 或 header dict,就可以用
extract_external_trace_context() 提取 trace join 参数,再接入 Langfuse 或 OpenTelemetry。
Redis Read SDK 和 Langfuse/Phoenix 的关系是什么?
Redis Read SDK 是 by-framework Dashboard 的 v1 查询事实源;Langfuse/Phoenix 是观测后端。 后续可以增加 LangfuseSource / PhoenixSource,把 tokens/cost/OTel latency 补充进统一结果。
为什么不让 Dashboard 直接查 Redis?
Dashboard 只做展示和 HTTP 兼容层。查询能力沉到 by-framework-trace-query,
这样 CLI、测试工具、外部服务也可以复用同一套读逻辑,并为多数据源高可用留出口。