Weave

Public API Reference

Everything below is importable directly from weaveflow:

from weaveflow import (
    agent, BaseAgent, AgentContext,
    DataType, Payload, PortSchema, SchemaRegistry, validate,
    Pipeline, Parallel, LocalRunner,
    ConnectionProtocol, Router, register_transform,
    from_callable, from_langchain, from_crewai,
    Guardrails,
    Memory, ShortTermMemory, LongTermMemory,
    LLMAdapter, create_adapter, register_provider, supported_providers,
    Logger, logger, WeaveError, __version__,
)

Agents

agent(*, name, input, output, tags=(), llm=None, memory=None, guardrails=None)

Decorator that turns async def handler(ctx) into a BaseAgent. input/output accept a DataType or a PortSchema. llm accepts a "provider:model" string or an LLMAdapter. Returns a runnable agent.

BaseAgent

Base class. Constructor: name, input_schema, output_schema, capability_tags=(), brain=None, memory=None, guardrails=None. Implement async def handle(self, ctx). Public methods: await run(payload) -> Payload, manifest() -> dict.

AgentContext

Injected into handle. Fields: input: Payload, brain: LLMAdapter | None, memory: Memory | None, logger. Methods: require_brain(), await complete(prompt, *, system=None, **opts) -> str, stream(prompt, *, system=None, **opts) (async iterator).

Types & schema

DataType

Enum: TEXT, STRUCTURED_JSON, IMAGE, CODE, AUDIO, DOCUMENT, EMBEDDING, STREAM.

Payload

Immutable envelope. Fields: type: DataType, value, metadata. Constructors: Payload.text(value, **meta), Payload.json(value, **meta). Methods: with_value(value), tagged(**meta).

PortSchema

Immutable port contract. Fields: type, shape, format, description. Build with PortSchema.of(type, *, shape=None, format=None, description=""). manifest() returns a serializable dict.

validate(payload, schema) -> Payload

Validates a payload against a PortSchema; returns it unchanged or raises SchemaValidationError. <100ms.

SchemaRegistry

Process-local registry: register(name, schema), get(name), names(), all().

LLM

create_adapter(spec, **options) -> LLMAdapter

spec is "provider:model" or an existing LLMAdapter (passed through).

register_provider(name, adapter_cls) / supported_providers() -> tuple[str, ...]

Register a custom provider / list the registered ones.

LLMAdapter (ABC)

async complete(prompt, *, system=None, **opts) -> str, stream(prompt, *, system=None, **opts) (async iterator).

Memory

Memory (ABC)

remember(key, value), recall(key, default=None), recent(limit).

ShortTermMemory(max_items=50)

Bounded insertion-ordered buffer.

LongTermMemory()

Vector store. Adds remember_vector(key, value, embedding) and search(query, top_k=5) -> tuple[ScoredRecord, ...] (.record, .score).

Guardrails

Guardrails

Chainable bundle: .pre(hook), .post(hook), .on_error(hook). pre/post: Payload -> Payload. on_error: Exception -> None. Runners: run_pre, run_post, run_on_error.

Connection & runtime

ConnectionProtocol

check(source_schema, target_schema) (raises ConnectionIncompatibleError), await handoff(payload, source_schema, target_schema, *, brain=None) -> Payload.

Router(score_fn=jaccard)

match(source_agent, candidates) -> tuple[Match, ...] (Match.agent, Match.score).

register_transform(source, target, async_fn)

Register a (Payload, brain|None) -> Payload transform.

Interop bridges

Wrap a foreign-framework agent as a Weave BaseAgent (duck-typed; no framework import). All accept name, input=TEXT, output=TEXT, prepare (weave_value -> foreign_input), extract (foreign_output -> weave_value), and tags. See the Interop guide.

  • from_callable(fn, *, name, ...): wrap any sync/async value -> value function.
  • from_langchain(runnable, *, name="langchain", ...): LangChain/LangGraph Runnable (ainvoke/invoke).
  • from_crewai(crew, *, name="crewai", ...): CrewAI kickoff/kickoff_async.

Pipeline(agents, *, name="pipeline", tags=(), llm=None, protocol=None)

Ordered chain. await run(initial) -> Payload; property agents. Validates links at construction. Is a BaseAgent (ports = first agent's input / last agent's output), so it nests inside another Pipeline or as a Parallel branch.

Parallel(branches, *, input=TEXT, output=STRUCTURED_JSON, combine=None, name="parallel", tags=(), max_concurrency=None, llm=None, protocol=None)

Fan-out/fan-in BaseAgent. Runs branches concurrently, transforms the input to each branch's port, then merges via combine (default: {name: value} as structured_json). max_concurrency caps in-flight branches (default unbounded; must be positive). await run(initial) -> Payload; property branches. Validates fan-out at construction; nests inside a Pipeline.

LocalRunner(*, llm=None)

await run_agent(agent, value) -> Payload, await simulate(agents, value) -> TraceResult (.output, .hops of Hop(agent, output, elapsed_ms)).

Errors

WeaveError (the base, with code, message, docs_url) and subclasses: SchemaValidationError, ConnectionIncompatibleError, AdapterError, AdapterNotInstalledError, UnknownProviderError, AgentExecutionError, GuardrailRejectionError. Codes are WEAVE_*.

Logger

Logger singleton (logger). debug/info/warn/error(message, **context), configure(env=None, file_path=None), is_logging_enabled(level), with_correlation(id). Dev: all levels; production: warn/error only. Output is structured JSON with a correlation id.