Connections & Pipelines
This is the interoperability layer: how one agent's output plugs into another's input.
The connection protocol
ConnectionProtocol runs a four-step handshake for each handoff:
- Type match. The source output type must equal, or be declared compatible with, the target input type. Otherwise
ConnectionIncompatibleError(fail fast). - Shape validation. For
structured_json, the payload is validated against the target's expected shape. - Capability pre-check. Optional ranking by capability-tag affinity (the
Router). - Transform injection. Compatible-but-different types get an auto-injected transform.
from weaveflow import ConnectionProtocol, PortSchema, DataType, Payload
proto = ConnectionProtocol()
# Static check (no data), validates a link:
proto.check(PortSchema.of(DataType.STRUCTURED_JSON), PortSchema.of(DataType.TEXT))
# Runtime handoff: transforms + validates a payload for the target port:
out = await proto.handoff(
Payload.json({"a": 1}),
PortSchema.of(DataType.STRUCTURED_JSON), # source
PortSchema.of(DataType.TEXT), # target
)
out.value # '{"a": 1}'
Transforms
Transforms convert between compatible types. They live in a lookup map keyed by (source, target). Deterministic ones need no LLM; semantic ones (for example text → structured_json) require a brain and fail fast without one.
Built-in: code→text, structured_json→text, document→text, stream→text, text→code, and text→structured_json (LLM-backed).
Register your own (Open/Closed, so no edits to existing logic):
from weaveflow import register_transform, DataType, Payload
async def upper_text(payload, brain):
return payload.with_value(payload.value.upper())
register_transform(DataType.TEXT, DataType.TEXT, upper_text)
The Router (matcher)
Given a source agent, the Router returns compatible candidates ranked by capability-tag affinity (Jaccard overlap by default; inject score_fn for embeddings).
from weaveflow import Router
matches = Router().match(source_agent, [candidate_a, candidate_b, candidate_c])
for m in matches:
print(m.agent.name, m.score) # highest affinity first; incompatible ones filtered out
Pipelines
A Pipeline is an ordered chain. Every adjacent link is validated at construction, so an incompatible chain is rejected before any LLM call.
from weaveflow import Pipeline
pipe = Pipeline(
[cleaner, extractor, summarizer],
llm="anthropic:claude-opus-4-8", # used for any semantic transforms between hops
)
result = await pipe.run("raw input") # Payload | raw value in, Payload out
Between hops, the pipeline calls protocol.handoff(...) to validate and transform the previous output into the next agent's input type.
Parallel (fan-out / fan-in)
Parallel is the sibling of Pipeline for fan-out / fan-in. It feeds one input to every branch concurrently (asyncio.gather), transforming the input to each branch's own port via the connection protocol, then merges the results with a combine strategy. Branch links are validated at construction (fail fast), just like a pipeline.
from weaveflow import Parallel
# Default fan-in: {branch_name: value} as a structured_json payload.
fan = Parallel([sentiment, topics, summary], input=DataType.TEXT)
out = await fan.run("a customer review ...")
out.value # {"sentiment": ..., "topics": ..., "summary": ...}
# Custom fan-in, merge however you like (here: join into one text):
fan = Parallel(
[drafter_a, drafter_b],
input=DataType.TEXT, output=DataType.TEXT,
combine=lambda results: "\n---\n".join(p.value for _, p in results),
)
Both Pipeline and Parallel are BaseAgents. A pipeline's ports are its first agent's input and its last agent's output; a parallel's are its declared fan-out and fan-in ports. So either nests anywhere an agent does, with no wrappers. Compose any variant you like: series-of-parallels, parallel-of-series, or arbitrarily deep trees.
# series-of-parallels: a Parallel as a pipeline stage
pipe = Pipeline([clean, Parallel([analyze_a, analyze_b]), report])
# parallel-of-series: a Pipeline dropped straight in as a branch
fan = Parallel([Pipeline([clean, summarize]), classify])
Branches must have unique names (the default fan-in keys by name). If a branch raises, its error propagates as an AgentExecutionError (fail fast). For semantic fan-out transforms (say a branch whose input is structured_json), pass llm= so the injected text → structured_json transform has a brain.
By default all branches run at once. Cap the in-flight count with max_concurrency when branches are expensive (for instance many LLM-backed branches you don't want to fire simultaneously). Branches beyond the limit queue and run as slots free up:
fan = Parallel(branches, max_concurrency=4) # at most 4 branches in flight
The local runner
LocalRunner runs a chain in-process and records a per-hop trace, the fastest way to understand and debug a chain.
from weaveflow import LocalRunner
runner = LocalRunner(llm="anthropic:claude-opus-4-8")
# One agent:
out = await runner.run_agent(summarizer, "some text")
# A chain, with a trace:
trace = await runner.simulate([cleaner, extractor], "raw input")
for hop in trace.hops: # Hop(agent, output, elapsed_ms)
print(hop.agent, hop.output.value, hop.elapsed_ms)
print("final:", trace.output.value)
Mental model
cleaner ──(text)──> extractor ──(structured_json)──> summarizer
│ │
protocol.handoff protocol.handoff
(validate + transform) (validate + transform)