by-framework · trace architecture

Trace 体系完整设计与接入指南

这份文档解释 by-framework 的 trace 写入规范、上下文传播、Langfuse/Phoenix 层级、 Redis Read SDK、Dashboard 查询链路,以及普通 Worker、LangGraph Worker、外部应用的接入方法。

4统一写入模型
3核心层级节点
2观测后端:Langfuse / Phoenix
1Trace Read SDK 入口

TL;DR

by-framework 的 trace 目标是让 client dispatch、worker execution、agent workflow、 LLM/tool/span、session event 可以在同一个 trace 下被统一写入和读取。 写入侧使用 by_framework.trace 的 schema 和 best-effort writer; 读取侧使用独立包 by-framework-trace-query; Dashboard 只是 Read SDK 的调用方。

TraceWriteClient TraceReadClient RedisTraceSource LangfusePlugin PhoenixPlugin Dashboard

最关键的传播参数是 trace_idtrace_parent_span_idlangfuse_parent_observation_idsession_idmessage_idparent_message_id

设计总览

Trace 被拆成两个方向:写入规范负责把运行时事实稳定落到 Redis / Langfuse / Phoenix; 读取 SDK 负责把不同来源合并为一个可解释的 trace tree。这样 Dashboard、CLI、外部系统都可以复用同一套读能力。

Client 创建或继承 trace id,发送 AskAgentCommand,写 client.dispatch。
Worker Runner 读取 header,恢复 trace parent,创建 execution 和 worker.execute。
Agent / LangGraph 执行任务、发 chunk、call_agent、resume,并传播 parent。
Trace Backends Redis 是 v1 读取源;Langfuse/Phoenix 提供观测 UI 和 OTel/LLM 细节。
Read SDK 合并 spans、构建树、输出 diagnostics,供 Dashboard 使用。

推荐的 Langfuse 层级

client.dispatch:langgraph-extension-demo 13.4s
└─ client.dispatch:langgraph-extension-demo 0.01s
  └─ agent.workflow:langgraph-extension-demo 13.3s
    ├─ worker.execute 3.5s A initial
    │  └─ langgraph-extension-demo:langgraph 3.3s
    ├─ agent.workflow:weather-agent 5.3s
    │  └─ worker.execute 5.3s
    └─ worker.execute 4.6s A resume

为什么要有 agent.workflow? worker.execute 是一次 worker 运行片段;LangGraph 遇到工具调用会 suspend 并返回 QUEUED。如果把异步 child 挂到已经结束的 agent task 下,Langfuse 会出现子节点耗时超过父节点。 workflow span 代表逻辑流程,跨 suspend/resume 保持打开,时间语义更自然。

统一数据模型

TraceRecord

trace 级元数据,用于列表、检索、根输入输出和最终状态。

  • trace_idnamesession_id
  • root_message_idroot_agent_type
  • inputoutputstatus
  • start_tsend_tsmetadata

ExecutionRecord

任务执行级信息,主要落在 session registry 中,支持 fallback 重建。

  • execution_idmessage_idparent_message_id
  • worker_idsource_agent_typetarget_agent_type
  • statustimingrouteerror

SpanRecord

trace 树节点,是 Redis spans、Langfuse observations、Phoenix OTel spans 的统一语义。

  • span_idparent_span_idnameoperation
  • componentkindsource
  • inputoutputtokenscost

EventRecord

session data stream 事件,用于前端流式输出和 trace fallback。

  • event_idtrace_idsession_idmessage_id
  • event_typecontent_typetimestamp
  • payload
字段必须一致的原因典型来源
trace_id 所有 spans、executions、events 归并到同一 trace 的主键。 Client 生成;worker、子 agent、外部应用继承。
message_id / parent_message_id 构建业务调用链和 resume 回调链。 AskAgentCommand / ResumeCommand header。
trace_parent_span_id 通用 OpenTelemetry 父 span,Phoenix 和其他 APM 依赖。 当前 OTel span 或 header metadata。
langfuse_parent_observation_id Langfuse observation 层级父节点,必须指向正确 workflow/dispatch parent。 client.dispatch 或 agent.workflow observation。

写入路径

1. Client dispatch 写入根信息

Python / Java client 发送 AskAgentCommand 时会写入 Redis trace meta、client.dispatch span, 并把 trace parent 信息放入 header。没有配置 Langfuse 时,client trace 写入应该 best-effort 跳过, 不能影响主流程。

2. Worker task start 创建 workflow 与 worker.execute

Langfuse 插件会创建 agent.workflow:<agent>worker.execute<agent> 三层。workflow 用于跨异步流程;worker.execute 表示当前执行片段。

3. Agent emit / call_agent / dispatch_group 写 Redis spans

AgentContext 会记录 agent.emit_chunkagent.dispatch_group 等 span。 call_agent 会优先传播 workflow observation id,避免异步 child 挂到已经结束的 task 下。

4. Task complete / error / cancel 写 output 与状态

完成、失败、取消都会更新 observation output 和 trace-level output。写入失败只记录 warning, 不抛到业务执行链路。

手动写入示例

from by_framework.trace import TraceRecord, TraceWriteClient

writer = TraceWriteClient()

await writer.record_trace(
    TraceRecord(
        trace_id=header.trace_id,
        name=f"client.dispatch:{header.target_agent_type}",
        session_id=header.session_id,
        root_message_id=header.message_id,
        root_agent_type=header.target_agent_type,
        input=command.content,
        status="RUNNING",
        start_ts=now_ms,
    )
)
from by_framework.trace import SpanRecord, TraceWriteClient

writer = TraceWriteClient()

await writer.record_span(
    SpanRecord(
        trace_id=header.trace_id,
        span_id="custom-step-1",
        parent_span_id=header.trace_parent_span_id,
        name="Knowledge_Retrieval",
        operation="knowledge.retrieve",
        component="external-worker",
        kind="retriever",
        start_ts=start_ms,
        end_ts=end_ms,
        input={"query": query},
        output={"documents": len(docs)},
        source="external",
        session_id=header.session_id,
        message_id=header.message_id,
    )
)
from by_framework.trace import EventRecord, TraceWriteClient

writer = TraceWriteClient()

await writer.record_event(
    EventRecord(
        event_id="evt-001",
        trace_id=header.trace_id,
        session_id=header.session_id,
        message_id=header.message_id,
        event_type="chunk",
        content_type="text",
        timestamp=now_ms,
        payload={"content": "partial answer"},
    )
)

读取 SDK

by-framework-trace-query 是 Dashboard 和外部查询工具的统一入口。 v1 source 是 Redis,后续可以扩展 Langfuse / Phoenix source。单个 source 失败时返回 status=partial 和 diagnostics,而不是让整体查询失败。

from by_framework_trace_query import TraceReadClient

client = TraceReadClient()
result = await client.get_trace(trace_id, session_id=session_id)

print(result.status)
print(result.trace.to_dict())
print([span.to_dict() for span in result.spans])
print([node.to_dict() for node in result.tree])
from by_framework_trace_query import TraceReadClient

client = TraceReadClient()

by_session = await client.list_traces(session_id="sess-123", limit=20)
by_worker = await client.list_traces(worker_id="worker-1", limit=20)
by_agent = await client.list_traces(agent_type="weather-agent", limit=20)
from by_framework_trace_query import TraceReadClient

client = TraceReadClient(max_spans=1000)
explain = await client.explain_trace(trace_id, session_id=session_id)

# diagnostics codes:
# missing_client_dispatch, missing_worker_execute, missing_parent,
# parent_cycle, trace_output_missing, span_count_exceeded, source_timeout
print(explain)

Redis Source

读取 trace_metatrace_spans、trace indexes;没有 stored spans 时从 session fallback 合成。

Trace Merger

span_id 去重,保留 input/output/tokens/cost 更完整的 span,并构建树。

Diagnostics

返回结构问题和 source 问题,帮助 Dashboard 明确是缺写入、缺 parent、循环还是部分读取。

Langfuse / Phoenix 集成

Langfuse

安装 by-framework-trace-langfuse 并配置环境变量后,Worker 启动时会自动发现 LangfuseTraceProviderFactory。插件会把 by-framework 的 trace id 转成 Langfuse 需要的 32 位 hex trace id,并使用 parent_span_id 连接 observation 层级。

export BYAI_LANGFUSE_ENABLED=true
export LANGFUSE_PUBLIC_KEY=pk-lf-...
export LANGFUSE_SECRET_KEY=sk-lf-...
export LANGFUSE_BASE_URL=http://localhost:3000

Phoenix

安装 by-framework-trace-phoenix 后,通过 OpenTelemetry parent context 连接通用 span。 Phoenix 更适合查看 OTel 链路、OpenInference/LangChain/OpenAI instrumentation。

export BYAI_PHOENIX_ENABLED=true
export PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006/v1/traces
export PHOENIX_PROJECT_NAME=by-framework

Langfuse 的 observation.end() 在当前 SDK 中不接受 output=... 参数。 by-framework 插件会兼容处理:能直接 end 就直接 end,否则先 update output 再 end。

接入方式

from by_framework import AgentContext, GatewayWorker

class MyWorker(GatewayWorker):
    async def process_command(self, command, context: AgentContext):
        await context.emit_chunk("starting")
        result = await do_work(command.content)
        await context.emit_chunk(result)
        return result

# 自动接入:
# - header.trace_id 进入 context.trace_id
# - worker.execute / agent.workflow 由插件写入
# - emit_chunk / call_agent 写 Redis spans 和 session events
class PlannerWorker(LangGraphWorker):
    def get_thread_id(self, context):
        # 推荐使用 message_id,避免同 session 下旧挂起状态串扰。
        return context.message_id or context.session_id

    def build_graph(self):
        # make_remote_agent_tool 会通过 context.call_agent 派发子 agent。
        # trace parent 会自动使用 agent.workflow observation。
        ...

# 时间语义:
# - A initial worker.execute: 执行到 interrupt / QUEUED
# - B worker.execute: 子 agent 实际耗时
# - A resume worker.execute: 回调后继续执行
# - agent.workflow:A: 覆盖完整逻辑流程
from langfuse import Langfuse
from by_framework.trace import (
    extract_external_trace_context,
    start_langfuse_observation,
    build_otel_parent_context,
)
from opentelemetry import trace

ctx = extract_external_trace_context(command_dict)
langfuse = Langfuse()

obs = start_langfuse_observation(
    langfuse,
    command_dict,
    name="external_plain_app",
    as_type="span",
    input_data=command_dict.get("content"),
)

tracer = trace.get_tracer("external-app")
with tracer.start_as_current_span(
    "external_plain_app",
    context=build_otel_parent_context(command_dict),
):
    result = run_external_pipeline()

obs.update(output=result)
obs.end()

接入检查清单

所有派生 command 继承同一个 trace_id
子任务设置 parent_message_id 为调用方 message_id
OpenTelemetry 使用 trace_parent_span_id 作为父 span。
Langfuse 使用 langfuse_parent_observation_id 作为父 observation。
长流程使用 workflow span,单次执行使用 worker.execute span。
写入失败必须 best-effort,不影响业务主流程。

端到端 Trace 链路示例

本节以 Client → Orchestrator Worker → Sub-agent (LangGraph) → Return 为例, 展示一次完整请求中 trace_id、Langfuse observation 层级、token 统计如何自动串联。

Langfuse 层级总览

一次请求在 Langfuse 中会形成如下树形结构(trace_id 贯穿全程):

Trace  {trace_id}
│
└─ agent.workflow:orchestrator            ← 整个逻辑任务生命周期(含 suspend/resume)
   │
   ├─ worker.execute  [orchestrator, 初始]   token usage: 0(纯路由)
   │   └─ agent.call_agent:weather-agent   ← on_call_agent_start 打点,记录调用 input/output
   │       │
   │       └─ agent.workflow:weather-agent  ← sub-agent 收到命令时以此 obs 为 parent
   │           └─ worker.execute  [weather-agent]
   │               └─ generation (ChatOpenAI)  ← input_tokens / output_tokens ✓
   │
   ├─ agent.return                          ← sub-agent 回包时打点
   │
   └─ worker.execute  [orchestrator, resume]   ← 恢复段,可再次调 LLM

框架自动完成 langfuse_parent_observation_idframework_parent_span_id 的传播——业务代码只需调用 context.call_agent(),无需手动处理任何 header。

关键字段传播路径

字段谁写入如何传播谁消费
trace_id Client(首次生成) 所有派生命令继承 header.trace_id 每个 worker,贯穿全链
langfuse_parent_observation_id on_call_agent_start(写 agent.call_agent obs.id) 注入转发命令的 header & metadata sub-agent on_task_start 以此为 parent 挂 workflow
framework_parent_span_id _enqueue_agent_return(写 {execution_id}:agent.return ResumeCommand metadata resume 时 worker.execute span 的 parent
usage_metadata(token) LLM 最后一个 streaming chunk(需 stream_options={"include_usage":True} on_llm_end 回调 → context.record_token_usage() worker.execute Redis span 和 Langfuse observation usage 字段

示例代码

# client_demo.py
import asyncio
from by_framework.client import GatewayClient

async def main():
    client = GatewayClient(
        redis_host="127.0.0.1",
        redis_port=6379,
    )
    # send_message 自动生成 trace_id、message_id 并写入 Redis Stream
    response = await client.send_message(
        target_agent_type="orchestrator",
        session_id="session-demo-001",
        content="帮我查一下北京今天天气,然后给我一个出行建议",
        user_code="user-001",
        user_name="张三",
    )
    print("message_id:", response.message_id)
    print("trace_id  :", response.trace_id)

asyncio.run(main())
# orchestrator_worker.py
import os
from typing import Any, List
from by_framework.worker import GatewayWorker, run_worker
from by_framework.core.protocol.commands import AskAgentCommand, ResumeCommand
from by_framework.worker.context import AgentContext

class OrchestratorWorker(GatewayWorker):

    def get_agent_types(self) -> List[str]:
        return ["orchestrator"]

    async def process_command(
        self, command: AskAgentCommand | ResumeCommand, context: AgentContext
    ) -> Any:

        if isinstance(command, ResumeCommand):
            # ── 恢复阶段:sub-agent 已返回 ──
            # command.reply_data 是子 agent 的结果
            weather_result = command.reply_data
            await context.emit_chunk(
                f"天气信息已获取:{weather_result}\n根据天气,建议携带雨具出行。"
            )
            return {"status": "done", "weather": weather_result}

        # ── 初始阶段:收到用户请求 ──
        # context.call_agent 自动完成:
        #   1. 在 Langfuse 创建 agent.call_agent:weather-agent span
        #   2. 将 langfuse_parent_observation_id 注入转发命令的 header
        #   3. 挂起本 worker,等待 ResumeCommand 返回
        await context.call_agent(
            target_agent_type="weather-agent",
            content={"city": "北京", "query": command.content},
            wait_for_reply=True,
        )

if __name__ == "__main__":
    run_worker(
        OrchestratorWorker,
        worker_id="orchestrator-worker-1",
        redis_host=os.getenv("BYAI_REDIS_HOST", "127.0.0.1"),
        redis_port=int(os.getenv("BYAI_REDIS_PORT", 6379)),
        plugins=[_make_langfuse_plugin()],
    )
# weather_agent_worker.py
import os
from typing import List
from by_framework.worker import run_worker
from by_framework_langgraph import LangGraphWorker
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent

class WeatherAgentWorker(LangGraphWorker):

    def get_agent_types(self) -> List[str]:
        return ["weather-agent"]

    def build_graph(self, context, command):
        llm = ChatOpenAI(
            model=os.getenv("LLM_MODEL", "gpt-4o"),
            base_url=os.getenv("OPENAI_BASE_URL"),
            api_key=os.getenv("OPENAI_API_KEY"),
            streaming=True,
            # ↓ 必须开启,否则 streaming 模式下拿不到 token 用量
            stream_options={"include_usage": True},
        )
        return create_react_agent(
            llm,
            tools=[],
            checkpointer=self.get_checkpointer(),
            prompt="你是专业的天气助手,根据城市给出天气状况和出行建议,不超过100字。",
        )

if __name__ == "__main__":
    run_worker(
        WeatherAgentWorker,
        worker_id="weather-agent-worker-1",
        redis_host=os.getenv("BYAI_REDIS_HOST", "127.0.0.1"),
        redis_port=int(os.getenv("BYAI_REDIS_PORT", 6379)),
        plugins=[_make_langfuse_plugin()],
    )
# 两个 worker 都需要注入同一套 LangfusePlugin 配置
import os
from by_framework_trace_langfuse import LangfusePlugin, LangfuseConfig

def _make_langfuse_plugin() -> LangfusePlugin:
    return LangfusePlugin(
        config=LangfuseConfig(
            public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
            secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
            base_url=os.getenv("LANGFUSE_BASE_URL"),
        )
    )

# 在 run_worker 调用时传入:
# run_worker(MyWorker, ..., plugins=[_make_langfuse_plugin()])

常见问题

Token 统计为 0? OpenAI-compatible streaming API 默认不返回 usage。 必须在 ChatModel 上加 stream_options={"include_usage": True}。 控制台会出现 [TokenAccumulator] on_llm_end fired but extracted 0 tokens 警告提示。
sub-agent 的 trace 没挂到 orchestrator 下? 检查 orchestrator worker 是否注入了 LangfusePlugin。 插件的 on_call_agent_start 钩子负责写入 langfuse_parent_observation_id; 没有插件则字段不会被设置。
resume 段的 worker.execute 没有挂到 agent.return 下? 确认 GatewayWorker._enqueue_agent_return 传入了 context 参数,它用于计算 framework_parent_span_id

Dashboard 与运行方式

Dashboard 已经独立为 by-framework-dashboard 子包。它通过 TraceReadClient 读取 trace,不直接耦合 Redis 查询细节。

uv run --package by-framework-dashboard by-framework-dashboard \
  --host 127.0.0.1 \
  --port 8765 \
  --redis-host localhost \
  --redis-port 6379 \
  --redis-db 0
接口说明
/api/traces?session_id=...按 session 列 trace summaries。
/api/traces?worker_id=...按 worker index 列 trace summaries。
/api/traces?agent_type=...按 agent index 列 trace summaries。
/api/trace/<trace_id>读取单个 trace 的完整 tree/timeline。
/metrics导出 Prometheus 格式指标。

排查手册

现象优先检查修复方向
Langfuse 显示 Unnamed trace client 是否写 client.dispatch;trace root 是否设置 trace name。 确认 client 侧 Langfuse 环境变量和 best-effort warning。
外部 app 节点和 worker.execute 同层 langfuse_parent_observation_id 是否传给外部 app。 使用 start_langfuse_observation() 或手动设置 trace_context。
子节点耗时超过父节点 异步 child 是否挂到了已结束的 agent.task。 使用 agent.workflow 作为 call parent。
Dashboard trace 没有 spans Redis trace_spans 是否有写入;session fallback 是否有 execution/event。 检查 TraceReadClient.explain_trace() diagnostics。
Java client trace 缺 client 信息 Java 是否同步了 Redis client.dispatch 写入和 Langfuse parent header。 确认 Java SDK 和 Python schema 字段一致。

FAQ

client 端默认会生成哪些 trace 字段?

client 会生成或继承 trace_id,并为下游设置 trace_parent_span_id。 如果 Langfuse 已配置并成功创建 client.dispatch observation,还会设置 langfuse_parent_observation_id。没有配置 Langfuse 时不应影响发送消息。

外部应用一定要使用 by-framework SDK 吗?

不一定。外部应用只要能拿到普通 AskAgentCommand 或 header dict,就可以用 extract_external_trace_context() 提取 trace join 参数,再接入 Langfuse 或 OpenTelemetry。

Redis Read SDK 和 Langfuse/Phoenix 的关系是什么?

Redis Read SDK 是 by-framework Dashboard 的 v1 查询事实源;Langfuse/Phoenix 是观测后端。 后续可以增加 LangfuseSource / PhoenixSource,把 tokens/cost/OTel latency 补充进统一结果。

为什么不让 Dashboard 直接查 Redis?

Dashboard 只做展示和 HTTP 兼容层。查询能力沉到 by-framework-trace-query, 这样 CLI、测试工具、外部服务也可以复用同一套读逻辑,并为多数据源高可用留出口。