Weave

Connections & Pipelines

This is the interoperability layer: how one agent's output plugs into another's input.

The connection protocol

ConnectionProtocol runs a four-step handshake for each handoff:

  1. Type match. The source output type must equal, or be declared compatible with, the target input type. Otherwise ConnectionIncompatibleError (fail fast).
  2. Shape validation. For structured_json, the payload is validated against the target's expected shape.
  3. Capability pre-check. Optional ranking by capability-tag affinity (the Router).
  4. Transform injection. Compatible-but-different types get an auto-injected transform.
from weave import ConnectionProtocol, PortSchema, DataType, Payload

proto = ConnectionProtocol()

# Static check (no data), validates a link:
proto.check(PortSchema.of(DataType.STRUCTURED_JSON), PortSchema.of(DataType.TEXT))

# Runtime handoff: transforms + validates a payload for the target port:
out = await proto.handoff(
    Payload.json({"a": 1}),
    PortSchema.of(DataType.STRUCTURED_JSON),   # source
    PortSchema.of(DataType.TEXT),              # target
)
out.value   # '{"a": 1}'

Transforms

Transforms convert between compatible types. They live in a lookup map keyed by (source, target). Deterministic ones need no LLM; semantic ones (for example text → structured_json) require a brain and fail fast without one.

Built-in: code→text, structured_json→text, document→text, stream→text, text→code, and text→structured_json (LLM-backed).

Register your own (Open/Closed, so no edits to existing logic):

from weave import register_transform, DataType, Payload

async def upper_text(payload, brain):
    return payload.with_value(payload.value.upper())

register_transform(DataType.TEXT, DataType.TEXT, upper_text)

The Router (matcher)

Given a source agent, the Router returns compatible candidates ranked by capability-tag affinity (Jaccard overlap by default; inject score_fn for embeddings).

from weave import Router

matches = Router().match(source_agent, [candidate_a, candidate_b, candidate_c])
for m in matches:
    print(m.agent.name, m.score)   # highest affinity first; incompatible ones filtered out

Pipelines

A Pipeline is an ordered chain. Every adjacent link is validated at construction, so an incompatible chain is rejected before any LLM call.

from weave import Pipeline

pipe = Pipeline(
    [cleaner, extractor, summarizer],
    llm="anthropic:claude-opus-4-8",   # used for any semantic transforms between hops
)
result = await pipe.run("raw input")   # Payload | raw value in, Payload out

Between hops, the pipeline calls protocol.handoff(...) to validate and transform the previous output into the next agent's input type.

Parallel (fan-out / fan-in)

Parallel is the sibling of Pipeline for fan-out / fan-in. It feeds one input to every branch concurrently (asyncio.gather), transforming the input to each branch's own port via the connection protocol, then merges the results with a combine strategy. Branch links are validated at construction (fail fast), just like a pipeline.

from weave import Parallel

# Default fan-in: {branch_name: value} as a structured_json payload.
fan = Parallel([sentiment, topics, summary], input=DataType.TEXT)
out = await fan.run("a customer review ...")
out.value   # {"sentiment": ..., "topics": ..., "summary": ...}

# Custom fan-in, merge however you like (here: join into one text):
fan = Parallel(
    [drafter_a, drafter_b],
    input=DataType.TEXT, output=DataType.TEXT,
    combine=lambda results: "\n---\n".join(p.value for _, p in results),
)

Both Pipeline and Parallel are BaseAgents. A pipeline's ports are its first agent's input and its last agent's output; a parallel's are its declared fan-out and fan-in ports. So either nests anywhere an agent does, with no wrappers. Compose any variant you like: series-of-parallels, parallel-of-series, or arbitrarily deep trees.

# series-of-parallels: a Parallel as a pipeline stage
pipe = Pipeline([clean, Parallel([analyze_a, analyze_b]), report])

# parallel-of-series: a Pipeline dropped straight in as a branch
fan = Parallel([Pipeline([clean, summarize]), classify])

Branches must have unique names (the default fan-in keys by name). If a branch raises, its error propagates as an AgentExecutionError (fail fast). For semantic fan-out transforms (say a branch whose input is structured_json), pass llm= so the injected text → structured_json transform has a brain.

By default all branches run at once. Cap the in-flight count with max_concurrency when branches are expensive (for instance many LLM-backed branches you don't want to fire simultaneously). Branches beyond the limit queue and run as slots free up:

fan = Parallel(branches, max_concurrency=4)   # at most 4 branches in flight

The local runner

LocalRunner runs a chain in-process and records a per-hop trace, the fastest way to understand and debug a chain.

from weave import LocalRunner

runner = LocalRunner(llm="anthropic:claude-opus-4-8")

# One agent:
out = await runner.run_agent(summarizer, "some text")

# A chain, with a trace:
trace = await runner.simulate([cleaner, extractor], "raw input")
for hop in trace.hops:           # Hop(agent, output, elapsed_ms)
    print(hop.agent, hop.output.value, hop.elapsed_ms)
print("final:", trace.output.value)

Mental model

cleaner ──(text)──> extractor ──(structured_json)──> summarizer
            │                         │
       protocol.handoff          protocol.handoff
   (validate + transform)    (validate + transform)