跳转至

LLM 流式适配器

LLMStreamAdapter

zerograph.adapters.llm_stream.LLMStreamAdapter

Convert LLM SDK streaming chunks to ZeroGraph-compatible output.

Works with OpenAI and Anthropic SDK chunk formats via duck typing (no SDK dependency required).

Usage (non-streaming node)::

def my_node(state):
    adapter = LLMStreamAdapter()
    for chunk in client.chat.completions.create(..., stream=True):
        adapter.append(chunk, provider="openai")
    return {"messages": [adapter.build_message()]}

Usage (streaming node in "messages" mode)::

def my_streaming_node(state):
    adapter = LLMStreamAdapter()
    for chunk in client.chat.completions.create(..., stream=True):
        delta = adapter.append(chunk, provider="openai")
        if delta:
            yield delta
    return {"messages": [adapter.build_message()]}

__init__

__init__() -> None

append

append(
    chunk: Any, *, provider: str = "openai"
) -> str | None

Process one streaming chunk and return text delta if any.

参数:

名称 类型 描述 默认
chunk Any

A streaming chunk from the LLM SDK.

必需
provider str

Either "openai" or "anthropic".

'openai'

返回:

类型 描述
str | None

The text content delta from this chunk, or None.

build_message

build_message() -> dict

Build the final message dict from accumulated chunks.

stream_openai

zerograph.adapters.llm_stream.stream_openai

stream_openai(
    llm_callable: Callable,
    messages: list[dict],
    *,
    tools: list[dict] | None = None,
    **kwargs: Any
) -> Generator[str, None, dict]

Convenience generator: call OpenAI SDK streaming, yield text deltas.

Use inside a generator node for "messages" stream mode::

def my_node(state):
    yield from stream_openai(
        lambda **kw: client.chat.completions.create(**kw),
        state["messages"],
    )

The generator yields text deltas and the return value is the state update.