Agents
An agent is a self-contained capability with a typed input port, a typed output port, and capability tags. Its internals (brain, memory, prompts, tools) are private.
Two ways to define an agent
1. The @agent decorator (ergonomic)
from weaveflow import agent, DataType
@agent(
name="summarizer",
input=DataType.TEXT,
output=DataType.TEXT,
tags=["summarization", "english"],
llm="anthropic:claude-opus-4-8", # optional brain
)
async def summarizer(ctx):
return await ctx.complete(f"Summarize:\n{ctx.input.value}")
input / output accept a DataType (shorthand) or a full PortSchema when you need a shape or format:
from weaveflow import PortSchema, DataType
@agent(
name="extractor",
input=DataType.TEXT,
output=PortSchema.of(DataType.STRUCTURED_JSON, shape={"name": "str", "age": "int"}),
tags=["extraction"],
llm="openai:gpt-4o",
)
async def extractor(ctx):
raw = await ctx.complete(f"Extract name and age as JSON:\n{ctx.input.value}")
import json
return json.loads(raw)
2. Subclass BaseAgent (full control + DI)
Use this when you want constructor injection, shared state setup, or class-based organization.
from weaveflow import BaseAgent, DataType, PortSchema
class Reverser(BaseAgent):
def __init__(self):
super().__init__(
name="reverser",
input_schema=PortSchema.of(DataType.TEXT),
output_schema=PortSchema.of(DataType.TEXT),
capability_tags=("text",),
)
async def handle(self, ctx):
return ctx.input.value[::-1]
Both produce the same kind of object: anything runnable with .run(payload) and composable in a Pipeline.
The context (ctx)
The handler receives an AgentContext:
| Attribute / method | Purpose |
|---|---|
ctx.input | the input Payload (.value, .type, .metadata) |
ctx.brain | the LLMAdapter, or None |
ctx.memory | the injected Memory, or None |
ctx.logger | the structured logger |
await ctx.complete(prompt, system=None, **opts) | one-line LLM call |
ctx.stream(prompt, ...) | async token stream |
ctx.require_brain() | returns the brain or raises an actionable error |
Return values
A handler may return:
- a raw value (e.g.
str,dict). Weaveflow wraps it in aPayloadof the output type, or - a
Payload, used as-is.
Either way the result is validated against output_schema before it leaves the agent.
The execution lifecycle
run(payload) performs, in order:
validate(input)againstinput_schema. Fail fast.guardrails.run_pre(input).handle(ctx), your logic.- coerce the return into a
Payload. validate(output)againstoutput_schema.guardrails.run_post(output).
On exception, on_error hooks fire, the failure is logged, and a typed error is re-raised (never swallowed). See Guardrails.
Manifest
Every agent emits a portable manifest (used by weaveflow package and the catalog):
summarizer.manifest()
# {
# "name": "summarizer",
# "input_schema": {"type": "text", ...},
# "output_schema": {"type": "text", ...},
# "capability_tags": ["summarization", "english"],
# }