Pipelines
A pipeline is a set of persistent agents that react to events automatically — no user interaction required after setup. Describe what you want in plain language and Wactorz builds and wires the agents for you.
Overview
Pipelines are created by PlannerAgent, which is spawned automatically whenever MainActor classifies a request as PIPELINE. By default the planner queries your Home Assistant entities, generates a plan, and stores it as a pending proposal; after approval it spawns the required agents and registers a rule that persists across restarts. Prefix with pipeline!, coordinate!, or @planner! to bypass approval and execute immediately.
You say: "notify me on Discord when the front door opens"
MainActor → classifies as PIPELINE
→ spawns PlannerAgent
PlannerAgent → queries home-assistant-agent for entity list
→ LLM generates a plan (JSON array of agent specs)
→ returns a pending proposal for approval
→ after approval, spawns agents from plan
→ saves rule to MainActor's registry
→ exits
Agents run indefinitely, reacting to events via MQTT
Creating a pipeline
Just describe what you want. The intent classifier recognises pipeline requests from natural language:
"send me a Discord message if the lamp is on"
"turn on the hallway light when motion is detected"
"notify me on Telegram if someone is seen on the webcam"
"turn off all lights every day at midnight"
"if the temperature goes above 28 degrees, turn on the fan"
The default flow is a dry-run proposal. Reply yes or approve, or run /plans approve <id>, to spawn the agents. Use a bypass prefix (pipeline! <task>, coordinate! <task>, or @planner! <task>) only when you want to skip approval.
If you have a Discord webhook stored, the planner injects it automatically. Store one with:
/webhook discord https://discord.com/api/webhooks/...
💡 Entity resolution — The planner fetches all 280+ entities from your HA instance before generating code, so you can refer to devices by their friendly name — "the living room lamp", "front door sensor" — and the planner maps them to real entity IDs.
Canonical patterns
PlannerAgent uses seven canonical wiring patterns. Every pipeline request maps to one of these.
Pattern 1 — HA sensor → HA action
A Home Assistant sensor triggers a Home Assistant service call — e.g. motion sensor turns on a light, door sensor locks a switch, temperature sensor activates AC.
Because HA state is nested inside new_state.state and the ha_actuator agent can only filter top-level payload keys, this pattern requires a two-agent setup:
Agent 1 (dynamic) name: <slug>-state-filter
subscribe to homeassistant/state_changes/#
filter by entity_id in payload
check new_state["state"] against condition
if met: publish custom/triggers/<slug> {"triggered": true}
Agent 2 (ha_actuator) name: <slug>-actuator
mqtt_topics: ["custom/triggers/<slug>"]
detection_filter: {"triggered": true}
actions: [HA service call with real entity_id]
Example
"turn on the hallway light when motion is detected in the hallway"
Pattern 2 — HA sensor → notification
A Home Assistant state change triggers a notification — Discord, Telegram, or any HTTP webhook. This is the simplest real-world pattern: one dynamic agent handles both the state subscription and the HTTP call.
Agent 1 (dynamic) name: <slug>-notify
async def setup(agent):
async def on_state(payload):
if payload.get("entity_id") != "sensor.front_door": return
if payload.get("new_state", {}).get("state") != "on": return
async with httpx.AsyncClient() as c:
await c.post(webhook_url, json={"content": "Front door opened!"})
agent.subscribe("homeassistant/state_changes/#", on_state)
Examples
"send me a Discord message if the lamp is on"
"notify me on Telegram when the washing machine finishes"
ℹ Wildcard subscription — Always subscribe to
homeassistant/state_changes/#and filter byentity_idin the payload. This works regardless of whetherHA_STATE_BRIDGE_PER_ENTITYis on or off.
Pattern 3 — Webcam / camera detection → HA action
Object detection on a live camera stream triggers a Home Assistant service call — e.g. person detected unlocks the door, cat detected turns on the pet feeder.
Camera stream URLs are resolved from Home Assistant before the plan is generated and injected into the prompt. MJPEG proxy URLs (/api/camera_proxy_stream/…) require a Bearer token; the planner sets OPENCV_FFMPEG_CAPTURE_OPTIONS so OpenCV passes the header automatically.
Agent 1 (dynamic) name: <slug>-camera-detect
setup(): set OPENCV_FFMPEG_CAPTURE_OPTIONS with HA Bearer token
load YOLO model, open stream with exact URL from CAMERA STREAM URLS
process(): capture frame, run inference
publish custom/detections/<slug>
{"detected": bool, "target": "person", "objects": [...]}
poll_interval: 1s
install: ultralytics, opencv-python
Agent 2 (ha_actuator) name: <slug>-actuator
mqtt_topics: ["custom/detections/<slug>"]
detection_filter: {"detected": true}
actions: [HA service call]
Example
"unlock the front door when a person is detected on the front camera"
Pattern 4 — Webcam / camera detection → notification
Same detection agent as Pattern 3, but the second agent sends a notification instead of calling an HA service.
Agent 1 (dynamic) — same as Pattern 3 camera agent
Agent 2 (dynamic) name: <slug>-notify
setup(): subscribe to custom/detections/<slug>
when detected=true: POST to webhook
Example
"notify me on Discord if a person is seen on the front camera"
Pattern 5 — Scheduled trigger → HA action or notification
A scheduled trigger fires a Home Assistant service call (or notification) at a fixed time or interval. Always uses type: scheduled for the trigger — never a dynamic agent polling datetime.now().
Agent 1 (scheduled) name: <slug>-trigger
schedule: {"type": "daily", "at": "17:00"}
or {"type": "weekly", "at": "07:00", "days": ["mon","tue","wed","thu","fri"]}
or {"type": "interval", "seconds": 1800}
or {"type": "once", "at": "<ISO8601-timestamp>"}
publish_topic: 'schedule/<slug>-trigger/fired'
Agent 2 (ha_actuator OR dynamic) name: <slug>-action
subscribes to 'schedule/<slug>-trigger/fired'
ha_actuator: detection_filter null, actions = [HA service call]
dynamic: setup() agent.subscribe(...), callback does HTTP/work
Examples
"turn off all lights every day at midnight"
"turn on the coffee maker at 07:30 on weekdays"
"remind me tomorrow at 9am to call the dentist"
⚠ Never poll for clock time — the planner prompt explicitly forbids
while True: sleep(60)waiting for a time. Always usetype: scheduled.
Pattern 6 — MQTT sensor + condition → HA action
Combines multiple MQTT data sources, evaluates a condition across them, and triggers an HA action when the condition is met. Used for "if X and Y then Z" style rules.
Agent 1 (dynamic) name: <slug>-monitor
setup(agent):
agent.state['lamp_on'] = False
agent.state['temp'] = 0
async def on_temp(payload):
agent.state['temp'] = payload.get('temp', 0)
await check_and_trigger()
async def on_lamp(payload):
agent.state['lamp_on'] = payload.get('state') == 'on'
await check_and_trigger()
async def check_and_trigger():
if agent.state['lamp_on'] and agent.state['temp'] > 20:
await agent.publish('custom/triggers/<slug>', {'triggered': True})
agent.subscribe('custom/sensors/temp_humidity', on_temp)
agent.subscribe('lamp/status', on_lamp)
Agent 2 (ha_actuator) name: <slug>-actuator
mqtt_topics: ["custom/triggers/<slug>"]
detection_filter: {"triggered": true}
actions: [HA service call]
Example
"if the lamp is on and the temperature goes above 20, turn off the lamp"
Pattern 7 — One-shot camera snapshot
Fetches a single still image from a camera — e.g. "take a snapshot of the office camera". Use this instead of Pattern 3 when the task needs a single image, not a continuous detection loop.
Snapshot URLs are resolved from Home Assistant before the plan is generated. The URL (/api/camera_proxy/{entity_id}) requires an Authorization: Bearer header; always read the token from os.environ['HA_TOKEN'].
Agent (dynamic) name: <slug>-snapshot
setup(agent) or process(agent):
import httpx, os
headers = {"Authorization": f"Bearer {os.environ['HA_TOKEN']}"}
async with httpx.AsyncClient() as client:
resp = await client.get("<url-from-CAMERA-SNAPSHOT-URLS>", headers=headers)
image_bytes = resp.content
# process image_bytes (save to disk, run YOLO once, etc.)
install: httpx
If the result feeds an HA action (e.g. "if a desk is visible, turn on the light"), publish the detection result to a topic and pair with an ha_actuator (same second-agent wiring as Pattern 3).
Example
"take a snapshot of the garden camera and save it to /tmp/garden.jpg"
"check if the office camera shows anyone sitting at a desk"
Managing pipelines
Every active pipeline is saved as a rule in MainActor's pipeline-rules registry, and each spawned agent is also added to the spawn registry. Rules persist across restarts — agents are automatically re-spawned when Wactorz starts.
List active rules
/rules
Active pipeline rules (2):
🟢 [a1b2c3d4] — send me a Discord message if the lamp is on
agents : lamp-on-discord-notify
created : 2026-03-24 19:36
🟢 [e5f6a7b8] — turn on hallway light when motion detected
agents : motion-hallway-state-filter, motion-hallway-actuator
created : 2026-03-25 11:14
To delete a rule: /rules delete <rule_id>
Delete a rule
/rules delete a1b2c3d4
This stops all agents associated with the rule and removes it from the spawn registry. The agents will not be re-spawned on next restart.
Check agent status
@lamp-on-discord-notify {"action": "status"}
Memory and webhooks
The planner reads from MainActor's memory when generating pipeline code. Two things are particularly useful to store before requesting a pipeline:
Webhook URLs
Store notification destinations once — the planner injects them automatically into every generated notification agent.
/webhook discord https://discord.com/api/webhooks/...
/webhook telegram https://api.telegram.org/bot.../sendMessage
User facts
MainActor extracts facts from conversation automatically (HA URLs, entity IDs, preferences). You can also manage them explicitly:
/memory — view stored facts
/memory forget ha_url — remove a specific fact
/memory clear — wipe everything
How it works internally
When MainActor classifies a message as PIPELINE it spawns a short-lived PlannerAgent that:
- Sends a
list_entitiestask tohome-assistant-agentand waits for the full entity list (up to 60 s inside the planner; MainActor waits up to 180 s for the planner result) - Builds a prompt containing the entity list, available agent types, canonical patterns, stored webhook URLs, and the user's request
- Calls the LLM once to produce a JSON plan — an array of agent specs, each with
name,description, andspawn_config(dynamic,ha_actuator, orscheduled) - In the default dry-run path, returns the plan to MainActor as a pending proposal without spawning
- After approval, spawns each agent from the plan via the matching agent type
- Calls
MainActor.save_pipeline_rule()with the list of agent names - Fires a background
_bootstrap_ha_entity_states()task — extracts HA entity IDs from the plan and askshome-assistant-agentto re-publish their current state tohomeassistant/state_changes/{entity_id}over MQTT. This lets freshly-spawned agents evaluate the current state immediately, rather than waiting for the next real HA state change to arrive. - Exits — the spawned agents run indefinitely from this point
⚠ Planner hallucination — If the planner's response looks like a tool call (e.g.
<tool_call>agent.send_to...</tool_call>) with an instant perfect response, it is the LLM hallucinating — not a real agent interaction. Real pipeline creation takes 5–15 seconds and produces log lines showing agents being compiled and registered.
Agent types the planner can generate
| Type | Description | When used |
|---|---|---|
dynamic |
Full Python code string compiled at runtime by DynamicAgent | Any custom logic — MQTT subscriptions, HTTP calls, camera inference, timers |
ha_actuator |
Declarative: MQTT topic + detection filter + list of HA service calls | When the only job is to call an HA service when a payload matches |
scheduled |
Declarative: schedule spec + publish topic | Clock-time, recurring, interval, or one-time triggers |