StateGraph 与 CompiledStateGraph¶
zerograph.graph.state.StateGraph ¶
Bases: Generic[StateT, InputT, OutputT]
A graph whose nodes communicate by reading and writing to shared state.
Each node has signature State -> Partial[State].
State keys can be annotated with reducer functions via
Annotated[type, reducer].
Usage::
from typing import Annotated
import operator
from ZeroGraph import StateGraph, START, END
class MyState(TypedDict):
messages: Annotated[list, operator.add]
graph = StateGraph(MyState)
graph.add_node("greet", lambda s: {"messages": ["Hello"]})
graph.add_edge(START, "greet")
graph.add_edge("greet", END)
app = graph.compile()
result = app.invoke({"messages": []})
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
state_schema
|
type
|
TypedDict or dict type defining the graph state. |
必需 |
input_schema
|
type | None
|
Optional separate schema for graph input. Defaults to state_schema. |
None
|
output_schema
|
type | None
|
Optional separate schema for graph output. Defaults to state_schema. |
None
|
context_schema
|
type | None
|
Optional schema for immutable context values injected into nodes. |
None
|
__init__ ¶
__init__(
state_schema: type,
*,
input_schema: type | None = None,
output_schema: type | None = None,
context_schema: type | None = None
) -> None
add_node ¶
add_node(
node: str | Callable,
action: Callable | None = None,
*,
metadata: dict | None = None,
input_schema: type | None = None,
retry_policy: RetryPolicy | None = None,
destinations: (
dict[str, str] | tuple[str, ...] | None
) = None,
timeout: (
float | timedelta | TimeoutPolicy | None
) = None,
cache_policy: Any | None = None,
error_handler: str | None = None
) -> StateGraph
Add a node to the graph.
The action function receives the current state and returns a dict
of updates to merge. A CompiledStateGraph can be passed as
action to embed a subgraph.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
node
|
str | Callable
|
Node name (str) or a callable (uses |
必需 |
action
|
Callable | None
|
The function to execute. If node is callable this is set automatically. |
None
|
metadata
|
dict | None
|
Optional metadata dict attached to the node. |
None
|
input_schema
|
type | None
|
Override the input schema for this node. |
None
|
retry_policy
|
RetryPolicy | None
|
Automatic retry configuration. |
None
|
destinations
|
dict[str, str] | tuple[str, ...] | None
|
Pre-declared routing targets for |
None
|
timeout
|
float | timedelta | TimeoutPolicy | None
|
Timeout in seconds, |
None
|
cache_policy
|
Any | None
|
Cache configuration for this node's results. |
None
|
error_handler
|
str | None
|
Name of another node to route to on exception. |
None
|
返回:
| 类型 | 描述 |
|---|---|
StateGraph
|
self, for method chaining. |
add_edge ¶
add_edge(
start_key: str | list[str], end_key: str
) -> StateGraph
Add a directed edge.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
start_key
|
str | list[str]
|
Source node name, START, or a list of source nodes (creates a waiting / fan-in edge). |
必需 |
end_key
|
str
|
Target node name or END. |
必需 |
返回:
| 类型 | 描述 |
|---|---|
StateGraph
|
self, for method chaining. |
add_conditional_edges ¶
add_conditional_edges(
source: str,
path: Callable,
path_map: dict[Hashable, str] | list[str] | None = None,
) -> StateGraph
Add a conditional edge from source node.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
source
|
str
|
Source node name. |
必需 |
path
|
Callable
|
Routing function |
必需 |
path_map
|
dict[Hashable, str] | list[str] | None
|
Maps return values of path to node names. |
None
|
返回:
| 类型 | 描述 |
|---|---|
StateGraph
|
self, for method chaining. |
add_sequence ¶
add_sequence(
nodes: Sequence[Callable | tuple[str, Callable]],
) -> StateGraph
Add a sequence of nodes with edges between them.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
nodes
|
Sequence[Callable | tuple[str, Callable]]
|
Sequence of callables or |
必需 |
返回:
| 类型 | 描述 |
|---|---|
StateGraph
|
self, for method chaining. |
set_node_defaults ¶
set_node_defaults(
*,
retry_policy: RetryPolicy | None = None,
timeout: (
float | timedelta | TimeoutPolicy | None
) = None,
cache_policy: Any | None = None,
error_handler: str | None = None
) -> StateGraph
Set default policies for all existing nodes.
Only overwrites a node's setting if the node currently has the default
(None) value for that field.
set_conditional_entry_point ¶
set_conditional_entry_point(
path: Callable, path_map: dict | list | None = None
) -> StateGraph
compile ¶
compile(
checkpointer: BaseCheckpointSaver | None = None,
*,
interrupt_before: list[str] | None = None,
interrupt_after: list[str] | None = None,
context: dict | None = None,
cache: Any | None = None,
store: Any | None = None,
debug: bool = False
) -> CompiledStateGraph
Compile the StateGraph into an executable CompiledStateGraph.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
checkpointer
|
BaseCheckpointSaver | None
|
Checkpoint saver for persistence. |
None
|
interrupt_before
|
list[str] | None
|
Node names to interrupt before execution. |
None
|
interrupt_after
|
list[str] | None
|
Node names to interrupt after execution. |
None
|
context
|
dict | None
|
Immutable context values injected into nodes. |
None
|
cache
|
Any | None
|
Cache backend for node-level result caching. |
None
|
store
|
Any | None
|
Key-value store accessible from nodes. |
None
|
debug
|
bool
|
Enable debug stream mode. |
False
|
返回:
| 类型 | 描述 |
|---|---|
CompiledStateGraph
|
A compiled graph ready for |
zerograph.graph.state.CompiledStateGraph ¶
Compiled graph that can be executed via invoke, stream, etc.
Created by calling StateGraph.compile(). Do not instantiate directly.
invoke ¶
Execute the graph synchronously.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
input
|
Any
|
Initial state or state update. |
必需 |
config
|
dict | None
|
Optional config dict with |
None
|
返回:
| 类型 | 描述 |
|---|---|
Any
|
Final state after graph execution completes. |
stream ¶
stream(
input: Any,
config: dict | None = None,
*,
stream_mode: str | list[str] = "updates"
) -> Any
Execute the graph and yield stream events.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
input
|
Any
|
Initial state or state update. |
必需 |
config
|
dict | None
|
Optional config dict. |
None
|
stream_mode
|
str | list[str]
|
|
'updates'
|
返回:
| 类型 | 描述 |
|---|---|
Any
|
Generator yielding stream events. |
ainvoke
async
¶
Execute the graph asynchronously.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
input
|
Any
|
Initial state or state update. |
必需 |
config
|
dict | None
|
Optional config dict. |
None
|
返回:
| 类型 | 描述 |
|---|---|
Any
|
Final state after graph execution completes. |
astream
async
¶
Execute the graph asynchronously and yield stream events.
get_state ¶
get_state(
config: dict | None = None, *, subgraphs: bool = False
) -> StateSnapshot
Get a snapshot of the current graph state.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
config
|
dict | None
|
Config dict with |
None
|
subgraphs
|
bool
|
If True, include subgraph states. |
False
|
返回:
| 类型 | 描述 |
|---|---|
StateSnapshot
|
A StateSnapshot containing values, next nodes, metadata, etc. |
get_state_history ¶
Get all checkpoint snapshots for this thread, newest first.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
config
|
dict | None
|
Config dict with |
None
|
limit
|
int
|
Maximum number of snapshots to return. |
25
|
返回:
| 类型 | 描述 |
|---|---|
list
|
List of StateSnapshot objects. |
update_state ¶
Manually update the graph state.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
config
|
dict | None
|
Config dict with |
必需 |
values
|
Any
|
State values to update. |
必需 |
as_node
|
str | None
|
Pretend the update came from this node (affects |
None
|
返回:
| 类型 | 描述 |
|---|---|
dict
|
Updated config dict. |
batch ¶
Execute the graph for multiple inputs sequentially.
abatch
async
¶
Execute the graph for multiple inputs concurrently.