跳转至

StateGraph 与 CompiledStateGraph

zerograph.graph.state.StateGraph

Bases: Generic[StateT, InputT, OutputT]

A graph whose nodes communicate by reading and writing to shared state.

Each node has signature State -> Partial[State]. State keys can be annotated with reducer functions via Annotated[type, reducer].

Usage::

from typing import Annotated
import operator
from ZeroGraph import StateGraph, START, END

class MyState(TypedDict):
    messages: Annotated[list, operator.add]

graph = StateGraph(MyState)
graph.add_node("greet", lambda s: {"messages": ["Hello"]})
graph.add_edge(START, "greet")
graph.add_edge("greet", END)

app = graph.compile()
result = app.invoke({"messages": []})

参数:

名称 类型 描述 默认
state_schema type

TypedDict or dict type defining the graph state.

必需
input_schema type | None

Optional separate schema for graph input. Defaults to state_schema.

None
output_schema type | None

Optional separate schema for graph output. Defaults to state_schema.

None
context_schema type | None

Optional schema for immutable context values injected into nodes.

None

__init__

__init__(
    state_schema: type,
    *,
    input_schema: type | None = None,
    output_schema: type | None = None,
    context_schema: type | None = None
) -> None

add_node

add_node(
    node: str | Callable,
    action: Callable | None = None,
    *,
    metadata: dict | None = None,
    input_schema: type | None = None,
    retry_policy: RetryPolicy | None = None,
    destinations: (
        dict[str, str] | tuple[str, ...] | None
    ) = None,
    timeout: (
        float | timedelta | TimeoutPolicy | None
    ) = None,
    cache_policy: Any | None = None,
    error_handler: str | None = None
) -> StateGraph

Add a node to the graph.

The action function receives the current state and returns a dict of updates to merge. A CompiledStateGraph can be passed as action to embed a subgraph.

参数:

名称 类型 描述 默认
node str | Callable

Node name (str) or a callable (uses __name__).

必需
action Callable | None

The function to execute. If node is callable this is set automatically.

None
metadata dict | None

Optional metadata dict attached to the node.

None
input_schema type | None

Override the input schema for this node.

None
retry_policy RetryPolicy | None

Automatic retry configuration.

None
destinations dict[str, str] | tuple[str, ...] | None

Pre-declared routing targets for Command.goto.

None
timeout float | timedelta | TimeoutPolicy | None

Timeout in seconds, timedelta, or TimeoutPolicy.

None
cache_policy Any | None

Cache configuration for this node's results.

None
error_handler str | None

Name of another node to route to on exception.

None

返回:

类型 描述
StateGraph

self, for method chaining.

add_edge

add_edge(
    start_key: str | list[str], end_key: str
) -> StateGraph

Add a directed edge.

参数:

名称 类型 描述 默认
start_key str | list[str]

Source node name, START, or a list of source nodes (creates a waiting / fan-in edge).

必需
end_key str

Target node name or END.

必需

返回:

类型 描述
StateGraph

self, for method chaining.

add_conditional_edges

add_conditional_edges(
    source: str,
    path: Callable,
    path_map: dict[Hashable, str] | list[str] | None = None,
) -> StateGraph

Add a conditional edge from source node.

参数:

名称 类型 描述 默认
source str

Source node name.

必需
path Callable

Routing function (state) -> str | list[str | Send].

必需
path_map dict[Hashable, str] | list[str] | None

Maps return values of path to node names.

None

返回:

类型 描述
StateGraph

self, for method chaining.

add_sequence

add_sequence(
    nodes: Sequence[Callable | tuple[str, Callable]],
) -> StateGraph

Add a sequence of nodes with edges between them.

参数:

名称 类型 描述 默认
nodes Sequence[Callable | tuple[str, Callable]]

Sequence of callables or (name, callable) tuples.

必需

返回:

类型 描述
StateGraph

self, for method chaining.

set_node_defaults

set_node_defaults(
    *,
    retry_policy: RetryPolicy | None = None,
    timeout: (
        float | timedelta | TimeoutPolicy | None
    ) = None,
    cache_policy: Any | None = None,
    error_handler: str | None = None
) -> StateGraph

Set default policies for all existing nodes.

Only overwrites a node's setting if the node currently has the default (None) value for that field.

set_entry_point

set_entry_point(key: str) -> StateGraph

set_conditional_entry_point

set_conditional_entry_point(
    path: Callable, path_map: dict | list | None = None
) -> StateGraph

set_finish_point

set_finish_point(key: str) -> StateGraph

validate

validate(interrupt: list[str] | None = None) -> StateGraph

Validate graph integrity.

compile

compile(
    checkpointer: BaseCheckpointSaver | None = None,
    *,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    context: dict | None = None,
    cache: Any | None = None,
    store: Any | None = None,
    debug: bool = False
) -> CompiledStateGraph

Compile the StateGraph into an executable CompiledStateGraph.

参数:

名称 类型 描述 默认
checkpointer BaseCheckpointSaver | None

Checkpoint saver for persistence.

None
interrupt_before list[str] | None

Node names to interrupt before execution.

None
interrupt_after list[str] | None

Node names to interrupt after execution.

None
context dict | None

Immutable context values injected into nodes.

None
cache Any | None

Cache backend for node-level result caching.

None
store Any | None

Key-value store accessible from nodes.

None
debug bool

Enable debug stream mode.

False

返回:

类型 描述
CompiledStateGraph

A compiled graph ready for invoke / stream.

get_graph

get_graph() -> str

Generate a Mermaid flowchart diagram of this graph.

zerograph.graph.state.CompiledStateGraph

Compiled graph that can be executed via invoke, stream, etc.

Created by calling StateGraph.compile(). Do not instantiate directly.

invoke

invoke(input: Any, config: dict | None = None) -> Any

Execute the graph synchronously.

参数:

名称 类型 描述 默认
input Any

Initial state or state update.

必需
config dict | None

Optional config dict with configurable.thread_id etc.

None

返回:

类型 描述
Any

Final state after graph execution completes.

stream

stream(
    input: Any,
    config: dict | None = None,
    *,
    stream_mode: str | list[str] = "updates"
) -> Any

Execute the graph and yield stream events.

参数:

名称 类型 描述 默认
input Any

Initial state or state update.

必需
config dict | None

Optional config dict.

None
stream_mode str | list[str]

"updates", "values", "custom", "messages", "checkpoints", "tasks", "debug", or a list for multi-mode streaming.

'updates'

返回:

类型 描述
Any

Generator yielding stream events.

ainvoke async

ainvoke(input: Any, config: dict | None = None) -> Any

Execute the graph asynchronously.

参数:

名称 类型 描述 默认
input Any

Initial state or state update.

必需
config dict | None

Optional config dict.

None

返回:

类型 描述
Any

Final state after graph execution completes.

astream async

astream(
    input: Any,
    config: dict | None = None,
    *,
    stream_mode: str | list[str] = "updates"
)

Execute the graph asynchronously and yield stream events.

get_state

get_state(
    config: dict | None = None, *, subgraphs: bool = False
) -> StateSnapshot

Get a snapshot of the current graph state.

参数:

名称 类型 描述 默认
config dict | None

Config dict with configurable.thread_id.

None
subgraphs bool

If True, include subgraph states.

False

返回:

类型 描述
StateSnapshot

A StateSnapshot containing values, next nodes, metadata, etc.

get_state_history

get_state_history(
    config: dict | None = None, *, limit: int = 25
) -> list

Get all checkpoint snapshots for this thread, newest first.

参数:

名称 类型 描述 默认
config dict | None

Config dict with configurable.thread_id.

None
limit int

Maximum number of snapshots to return.

25

返回:

类型 描述
list

List of StateSnapshot objects.

update_state

update_state(
    config: dict | None,
    values: Any,
    as_node: str | None = None,
) -> dict

Manually update the graph state.

参数:

名称 类型 描述 默认
config dict | None

Config dict with configurable.thread_id.

必需
values Any

State values to update.

必需
as_node str | None

Pretend the update came from this node (affects next).

None

返回:

类型 描述
dict

Updated config dict.

batch

batch(
    inputs: list[Any], config: dict | None = None
) -> list[Any]

Execute the graph for multiple inputs sequentially.

abatch async

abatch(
    inputs: list[Any], config: dict | None = None
) -> list[Any]

Execute the graph for multiple inputs concurrently.

get_graph

get_graph()

Return self for compatibility.