Public API Reference
Everything below is importable directly from weaveflow:
from weaveflow import (
agent, BaseAgent, AgentContext,
DataType, Payload, PortSchema, SchemaRegistry, validate,
Pipeline, Parallel, LocalRunner,
ConnectionProtocol, Router, register_transform,
from_callable, from_langchain, from_crewai,
Guardrails,
Memory, ShortTermMemory, LongTermMemory,
LLMAdapter, create_adapter, register_provider, supported_providers,
Logger, logger, WeaveError, __version__,
)
Agents
agent(*, name, input, output, tags=(), llm=None, memory=None, guardrails=None)
Decorator that turns async def handler(ctx) into a BaseAgent. input/output accept a DataType or a PortSchema. llm accepts a "provider:model" string or an LLMAdapter. Returns a runnable agent.
BaseAgent
Base class. Constructor: name, input_schema, output_schema, capability_tags=(), brain=None, memory=None, guardrails=None. Implement async def handle(self, ctx). Public methods: await run(payload) -> Payload, manifest() -> dict.
AgentContext
Injected into handle. Fields: input: Payload, brain: LLMAdapter | None, memory: Memory | None, logger. Methods: require_brain(), await complete(prompt, *, system=None, **opts) -> str, stream(prompt, *, system=None, **opts) (async iterator).
Types & schema
DataType
Enum: TEXT, STRUCTURED_JSON, IMAGE, CODE, AUDIO, DOCUMENT, EMBEDDING, STREAM.
Payload
Immutable envelope. Fields: type: DataType, value, metadata. Constructors: Payload.text(value, **meta), Payload.json(value, **meta). Methods: with_value(value), tagged(**meta).
PortSchema
Immutable port contract. Fields: type, shape, format, description. Build with PortSchema.of(type, *, shape=None, format=None, description=""). manifest() returns a serializable dict.
validate(payload, schema) -> Payload
Validates a payload against a PortSchema; returns it unchanged or raises SchemaValidationError. <100ms.
SchemaRegistry
Process-local registry: register(name, schema), get(name), names(), all().
LLM
create_adapter(spec, **options) -> LLMAdapter
spec is "provider:model" or an existing LLMAdapter (passed through).
register_provider(name, adapter_cls) / supported_providers() -> tuple[str, ...]
Register a custom provider / list the registered ones.
LLMAdapter (ABC)
async complete(prompt, *, system=None, **opts) -> str, stream(prompt, *, system=None, **opts) (async iterator).
Memory
Memory (ABC)
remember(key, value), recall(key, default=None), recent(limit).
ShortTermMemory(max_items=50)
Bounded insertion-ordered buffer.
LongTermMemory()
Vector store. Adds remember_vector(key, value, embedding) and search(query, top_k=5) -> tuple[ScoredRecord, ...] (.record, .score).
Guardrails
Guardrails
Chainable bundle: .pre(hook), .post(hook), .on_error(hook). pre/post: Payload -> Payload. on_error: Exception -> None. Runners: run_pre, run_post, run_on_error.
Connection & runtime
ConnectionProtocol
check(source_schema, target_schema) (raises ConnectionIncompatibleError), await handoff(payload, source_schema, target_schema, *, brain=None) -> Payload.
Router(score_fn=jaccard)
match(source_agent, candidates) -> tuple[Match, ...] (Match.agent, Match.score).
register_transform(source, target, async_fn)
Register a (Payload, brain|None) -> Payload transform.
Interop bridges
Wrap a foreign-framework agent as a Weave BaseAgent (duck-typed; no framework import). All accept name, input=TEXT, output=TEXT, prepare (weave_value -> foreign_input), extract (foreign_output -> weave_value), and tags. See the Interop guide.
from_callable(fn, *, name, ...): wrap any sync/asyncvalue -> valuefunction.from_langchain(runnable, *, name="langchain", ...): LangChain/LangGraphRunnable(ainvoke/invoke).from_crewai(crew, *, name="crewai", ...): CrewAIkickoff/kickoff_async.
Pipeline(agents, *, name="pipeline", tags=(), llm=None, protocol=None)
Ordered chain. await run(initial) -> Payload; property agents. Validates links at construction. Is a BaseAgent (ports = first agent's input / last agent's output), so it nests inside another Pipeline or as a Parallel branch.
Parallel(branches, *, input=TEXT, output=STRUCTURED_JSON, combine=None, name="parallel", tags=(), max_concurrency=None, llm=None, protocol=None)
Fan-out/fan-in BaseAgent. Runs branches concurrently, transforms the input to each branch's port, then merges via combine (default: {name: value} as structured_json). max_concurrency caps in-flight branches (default unbounded; must be positive). await run(initial) -> Payload; property branches. Validates fan-out at construction; nests inside a Pipeline.
LocalRunner(*, llm=None)
await run_agent(agent, value) -> Payload, await simulate(agents, value) -> TraceResult (.output, .hops of Hop(agent, output, elapsed_ms)).
Errors
WeaveError (the base, with code, message, docs_url) and subclasses: SchemaValidationError, ConnectionIncompatibleError, AdapterError, AdapterNotInstalledError, UnknownProviderError, AgentExecutionError, GuardrailRejectionError. Codes are WEAVE_*.
Logger
Logger singleton (logger). debug/info/warn/error(message, **context), configure(env=None, file_path=None), is_logging_enabled(level), with_correlation(id). Dev: all levels; production: warn/error only. Output is structured JSON with a correlation id.