跳转至

Channel 系统

Channel 是 ZeroGraph 状态管理的核心机制。每个状态字段对应一个 Channel 实例,负责管理该字段的读写和合并逻辑。

BaseChannel

zerograph.channels.base.BaseChannel

Bases: Generic[Value, Update, Checkpoint], ABC

Base class for all channels.

ValueType abstractmethod property

ValueType: Any

UpdateType abstractmethod property

UpdateType: Any

__init__

__init__(typ: Any, key: str = '') -> None

get abstractmethod

get() -> Value

update abstractmethod

update(values: Sequence[Update]) -> bool

from_checkpoint abstractmethod

from_checkpoint(checkpoint: Any) -> BaseChannel

checkpoint

checkpoint() -> Any

copy

copy() -> BaseChannel

is_available

is_available() -> bool

consume

consume() -> bool

finish

finish() -> bool

内置 Channel 类型

Channel 选择指南

Channel 类型 适用场景 典型用法
LastValue 默认行为,每个 key 只保留最后一个值 简单状态字段
BinaryOperatorAggregate 需要累加/合并多个节点的输出 Annotated[list, operator.add]、自定义 reducer
AnyValue 允许多次写入同一 key(不报错) context_schema 注入、内部机制
EphemeralValue 值只在一个超步中有效,之后自动清空 一次性信号、临时标记
Topic 发布/订阅模式,可累积消息列表 广播事件、消息总线
NamedBarrierValue 等待所有指定来源都写入后才可读 fan-in 同步、等待多路汇聚

LastValue

zerograph.channels.last_value.LastValue

Bases: Generic[Value], BaseChannel[Value, Value, Value]

Stores the last value received, at most one per step.

__init__

__init__(typ: Any, key: str = '') -> None

update

update(values: Sequence[Value]) -> bool

get

get() -> Value

BinaryOperatorAggregate

zerograph.channels.binop.BinaryOperatorAggregate

Bases: Generic[Value], BaseChannel[Value, Value, Value]

Stores the result of applying a binary operator (reducer) to values.

__init__

__init__(
    typ: type[Value],
    operator: Callable[[Value, Value], Value],
)

update

update(values: Sequence[Value]) -> bool

get

get() -> Value

AnyValue

zerograph.channels.any_value.AnyValue

Bases: Generic[Value], BaseChannel[Value, Value, Value]

Like LastValue but accepts multiple values per step without error.

Silently keeps the last value received. Useful when multiple nodes write the same value in a superstep (e.g., context injection).

__init__

__init__(typ: Any, key: str = '') -> None

update

update(values: Sequence[Value]) -> bool

get

get() -> Value

EphemeralValue

zerograph.channels.ephemeral_value.EphemeralValue

Bases: Generic[Value], BaseChannel[Value, Value, Value]

Stores value for one step only, clears after.

__init__

__init__(typ: Any, guard: bool = True) -> None

update

update(values: Sequence[Value]) -> bool

get

get() -> Value

Topic

zerograph.channels.topic.Topic

Bases: Generic[Value], BaseChannel[list[Value], Value, list[Value]]

Pub/sub topic channel for broadcasting messages.

__init__

__init__(
    typ: Any, accumulate: bool = False, key: str = ""
) -> None

update

update(values: Sequence[Value]) -> bool

get

get() -> list[Value]

consume

consume() -> bool

NamedBarrierValue

zerograph.channels.named_barrier.NamedBarrierValue

Bases: Generic[Value], BaseChannel[Value, Value, set]

Waits until all named values are received before becoming available.

__init__

__init__(typ: type, names: set) -> None

update

update(values: Sequence) -> bool

get

get()

消息 Channel

add_messages

zerograph.channels.messages.add_messages

add_messages(
    existing: Sequence, new_messages: Sequence
) -> list

Merge message lists: add new, update existing (by id), remove marked.

Messages are matched by their 'id' field. If a new message has the same id as an existing one, it replaces it. RemoveMessage entries remove by id. New messages without matching existing ids are appended.

RemoveMessage

zerograph.channels.messages.RemoveMessage

Marker to remove a message by ID.

__init__

__init__(id: str) -> None