Agent-native event bus.
Connect agents, humans, and systems via typed events. Every event carries tenant, trace, causality, and context — not just bytes.
Installation
pip install agentmesh-bus # zero-dep core
pip install "agentmesh-bus[redis]" # Redis Streams transport
pip install "agentmesh-bus[nats]" # NATS JetStream transport
pip install "agentmesh-bus[kafka]" # Kafka transport (aiokafka)
pip install "agentmesh-bus[sqlite]" # SQLite event store
pip install "agentmesh-bus[otel]" # OpenTelemetry metrics + traces
pip install "agentmesh-bus[all]" # everything
Quickstart
import asyncio
from agentmesh import AgentMesh, AgentEvent
async def main():
mesh = AgentMesh()
await mesh.start()
# Subscribe with a wildcard — catches order.created, order.updated, ...
@mesh.subscribe("order.*")
async def handle(e: AgentEvent) -> None:
print(f"[{e.tenant_id}] {e.event_type}: {e.data}")
# Publish — tenant-scoped, trace-correlated, causally-linked
await mesh.publish("order.created",
data={"order_id": "ORD-001", "amount": 299.99},
publisher_id="billing-agent",
session_id="sess-001", run_id="run-001",
tenant_id="acme",
)
await asyncio.sleep(0.1)
print(mesh.stats())
await mesh.close()
asyncio.run(main())
Why AgentMesh
Kafka and Redis move bytes. AgentMesh moves meaning. Use Kafka as a transport under AgentMesh when you need production scale — same API either way.
| Kafka / Redis | AgentMesh | |
|---|---|---|
| Tenant isolation | Manual bolt-on | ✓ Built-in on every event |
| Trace propagation | Manual | ✓ trace_id propagates automatically |
| Causality chain | None | ✓ caused_by_event_id |
| Human publishers | No concept | ✓ publisher_type="human" |
| Policy enforcement | External middleware | ✓ agentplane integration |
| Zero-dep start | Needs JVM / server | ✓ pip install, run |
| Typed event taxonomy | Raw bytes / strings | ✓ 14 categories, 100+ types |
| Always persistent | Config required | ✓ JSONL by default |
Event Envelope
Every event — regardless of type — carries this standard envelope:
@dataclass
class AgentEvent:
# Identity
event_id: str # UUID — also the idempotency key
event_type: str # "order.created"
topic: str # "acme:order.created"
schema_version: str # "1.0"
timestamp: float # unix epoch
# Execution tree
session_id: str # top-level agent session
run_id: str # current operation
parent_run_id: str | None # nesting
caused_by_event_id: str | None # causality chain
# OTel propagation
trace_id: str | None
span_id: str | None
# Source
publisher_id: str # agent_id or user_id
publisher_type: str # "agent" | "human" | "system"
tenant_id: str | None
agent_id: str | None
agent_name: str | None
# Routing
delivery_mode: str # "broadcast" | "exclusive"
ttl_s: float | None
# Payload
data: dict[str, Any]
tags: list[str]
metadata: dict[str, Any]
Topics & Wildcards
Topics follow category.entity.verb naming. NATS-style wildcards are supported:
| Pattern | Matches | Not |
|---|---|---|
order.created | Exact match | order.updated |
order.* | order.created, order.updated | order.item.created |
order.> | order.created, order.item.created, order.item.v.updated | payment.created |
*.created | order.created, payment.created | order.updated |
> | Everything | — |
acme:order.* | acme:order.created (ACME only) | siemens:order.created |
acme:> | All ACME topics | siemens:* |
tenant_id: for hard tenant isolation. The mesh enforces that publishers cannot publish to another tenant's namespace.
Publisher Types
Three types of publishers — all first-class:
Agent
publisher_type="agent" — automated, high-frequency. Most events in production.
Human
publisher_type="human" — deliberate, awaited. Human decisions are audited and governed like any other event.
System
publisher_type="system" — infrastructure. The mesh itself, agentplane, agentguard publish here.
Consumer Groups
Consumer groups enable competing consumers — only one subscriber in the group processes each event:
# Both workers subscribe to the same topic + group
@mesh.subscribe("payment.initiated", group="billing-workers")
async def worker_1(e: AgentEvent) -> None:
await charge_card(e.data)
@mesh.subscribe("payment.initiated", group="billing-workers")
async def worker_2(e: AgentEvent) -> None:
await charge_card(e.data)
# Each payment event is processed by exactly ONE worker
# Round-robin load balancing across group members
group= for exclusive delivery within a group.
Replay
Every event is persisted before delivery. Replay any topic from any point in time:
# Replay all events
async for event in mesh.replay("order.created"):
print(event.data)
# Replay last 24 hours
import time
async for event in mesh.replay("order.created", since=time.time() - 86400):
print(event.data)
# Replay a specific window
async for event in mesh.replay("payment.charged",
since=start_ts, until=end_ts):
await reconcile(event)
Dead Letter Queue
# Configure per topic
mesh.configure_topic("payment.initiated",
dlq=True,
max_retries=3,
retry_backoff_ms=500,
)
# Fire-and-forget topics don't need DLQ
mesh.configure_topic("system.heartbeat", dlq=False)
# Inspect + retry
async for dead in mesh.dlq("payment.initiated"):
print(f"Failed {dead.attempts}x: {dead.error}")
await mesh.retry(dead)
Server-side Filters
Evaluated before delivery — only matching events reach your handler:
# Only high-value orders
@mesh.subscribe("order.created",
filter={"data.amount": {"$gt": 1000}, "tenant_id": "acme"})
async def handle_big_orders(e: AgentEvent) -> None:
await notify_account_manager(e)
# Supported operators: $gt, $lt, $gte, $lte, $in, $ne
# Dot-notation for nested fields: "data.customer.tier"
Idempotency
Set a deterministic event_id to make publish idempotent — safe to retry on network failure:
import uuid
# Deterministic ID — same order always produces same event_id
order_event_id = str(uuid.uuid5(uuid.NAMESPACE_URL, f"order.created:{order_id}"))
await mesh.publish("order.created",
data={"order_id": order_id},
publisher_id="shop-agent",
session_id="s1", run_id="r1",
event_id=order_event_id, # ← publish 10x, delivered 1x
)
Pause / Resume
# Pause during deployment — queue events, don't deliver
await mesh.pause("payment.initiated")
# Deploy new billing logic here...
# Resume — flush queued events to subscribers
await mesh.resume("payment.initiated")
Human-in-the-Loop
# Human reviewer subscribes and responds
@mesh.subscribe("human.approval.requested")
async def reviewer(e: AgentEvent) -> None:
action = e.data["action"]
amount = e.data["amount"]
request_id = e.data["_request_id"]
print(f"⚠ Review required: {action} ${amount:,.2f}")
# In production: send to Slack/email, wait for click
await mesh.publish(f"_reply.{request_id}",
data={"approved": True, "approver": "alice@acme.com"},
publisher_id="alice", publisher_type="human",
session_id=e.session_id, run_id=e.run_id,
)
# Agent awaits human decision
response = await mesh.request(
"human.approval.requested",
data={"action": "wire_transfer", "amount": 50_000},
publisher_id="billing-agent",
session_id="sess-001", run_id="run-001",
timeout_s=300.0, # 5 minutes
fallback=None, # None = timeout → denied
)
Transport: In-Process (default)
Zero external dependencies. Uses asyncio queues. Works within a single process. Perfect for development and single-service deployments.
mesh = AgentMesh() # InProcessTransport by default
Transport: Redis
pip install "agentmesh-bus[redis]"
from agentmesh.transport.redis import RedisTransport
mesh = AgentMesh(transport=RedisTransport("redis://localhost:6379"))
Transport: Kafka
pip install "agentmesh-bus[kafka]"
from agentmesh.transport.kafka import KafkaTransport
mesh = AgentMesh(transport=KafkaTransport(
brokers=["kafka:9092"],
))
OpenTelemetry — Grafana, Prometheus, Datadog, CloudWatch
AgentMesh emits standard OTel spans and metrics. Wire once to your OTel Collector — all backends receive the same data.
pip install "agentmesh-bus[otel]" opentelemetry-sdk opentelemetry-exporter-otlp-proto-grpc
Grafana / Prometheus
from opentelemetry import metrics, trace
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# Metrics → Prometheus → Grafana
reader = PrometheusMetricReader() # scrape at :9464/metrics
provider = MeterProvider(metric_readers=[reader])
metrics.set_meter_provider(provider)
# Traces → Tempo → Grafana
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
)
trace.set_tracer_provider(tracer_provider)
# agentmesh picks it up automatically
from agentmesh import AgentMesh
mesh = AgentMesh(otel_enabled=True)
Datadog
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(BatchSpanProcessor(
OTLPSpanExporter(
endpoint="https://trace.agent.datadoghq.com",
headers={"DD-API-KEY": "your-api-key"},
)
))
trace.set_tracer_provider(tracer_provider)
AWS CloudWatch
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# Point at AWS Distro for OpenTelemetry Collector (ADOT)
tracer_provider.add_span_processor(BatchSpanProcessor(
OTLPSpanExporter(endpoint="http://localhost:4317") # ADOT collector sidecar
))
Metrics emitted
agentmesh.events.published # Counter {topic, tenant_id, publisher_type}
agentmesh.events.delivered # Counter {topic, tenant_id, subscriber_id}
agentmesh.events.failed # Counter {topic, tenant_id, error}
agentmesh.delivery.latency_ms # Histogram {topic}
Integration: agentplane
from agentplane import PolicyEngine, Policy, Selector, AllowlistRule
engine = PolicyEngine()
engine.add_policy(Policy(
id="mesh.policy",
selector=Selector(tenants=["acme"]),
blocking=[AllowlistRule(tools=["order.*", "payment.*"])],
))
# Policy checked before every publish and delivery
mesh = AgentMesh(policy_engine=engine)
Integration: agenthooks
from agenthooks import HookRegistry
from agentmesh.integrations.agenthooks import register_mesh_hooks
registry = HookRegistry()
mesh = AgentMesh()
register_mesh_hooks(mesh, registry)
# Hook points: agentmesh.before_publish, agentmesh.after_publish,
# agentmesh.before_deliver, agentmesh.after_deliver
@registry.implement("agentmesh.before_publish")
async def enrich(ctx):
return ctx.enrich("region", "eu-west-1")
14 Event Categories
Researched across LangGraph, CrewAI, AutoGen, Anthropic, OpenAI, MCP, A2A protocol, OTel GenAI conventions, and Semantic Kernel.
| Category | Description | Frameworks |
|---|---|---|
session.* | Agent session lifecycle | All |
llm.* | LLM inference + streaming | Anthropic, OpenAI, LangGraph |
tool.* | Tool/function execution | All |
memory.* | Memory read/write | CrewAI, OTel |
retrieval.* | RAG / knowledge search | LangGraph, CrewAI, OTel |
human.* | Human-in-the-loop | CrewAI, AutoGen, MCP, A2A |
agent.* | Reasoning + planning | CrewAI, Anthropic thinking |
multiagent.* | Agent-to-agent coordination | A2A, AutoGen, LangGraph |
connection.* | External service connections | MCP, A2A |
resource.* | MCP resources | MCP |
guardrail.* | Safety + policy enforcement | CrewAI, SK, AgentGuard |
flow.* | Orchestration + workflow | CrewAI, LangGraph |
system.* | Infrastructure + ops | All |
push.* | Async webhook delivery | A2A |
API Reference
AgentMesh
class AgentMesh:
def __init__(self,
transport: Transport | None = None, # default: InProcessTransport
store: EventStore | None = None, # default: JsonlStore
store_path: str = "~/.agentmesh/events.jsonl",
dedup_window_s: float = 86400.0, # 24h dedup window
policy_engine: Any | None = None,
hook_registry: Any | None = None,
otel_enabled: bool = True,
) -> None: ...
async def start(self) -> None: ...
async def close(self) -> None: ...
async def publish(self, topic: str, data: dict,
publisher_id: str, session_id: str, run_id: str,
publisher_type: str = "agent",
event_id: str | None = None,
tenant_id: str | None = None,
caused_by_event_id: str | None = None,
trace_id: str | None = None,
tags: list[str] | None = None,
ttl_s: float | None = None,
) -> AgentEvent: ...
def subscribe(self, topic: str,
group: str | None = None,
filter: dict | None = None,
) -> Callable: ... # use as decorator
def unsubscribe(self, topic: str, handler: Callable) -> None: ...
async def request(self, topic: str, data: dict,
publisher_id: str, session_id: str, run_id: str,
timeout_s: float = 30.0,
fallback: Any = None,
) -> Any: ...
async def replay(self, topic: str,
since: float = 0.0,
until: float | None = None,
) -> AsyncIterator[AgentEvent]: ...
def configure_topic(self, topic: str,
dlq: bool = True, max_retries: int = 3,
retry_backoff_ms: int = 500,
ttl_s: float | None = None,
) -> None: ...
async def pause(self, topic: str) -> None: ...
async def resume(self, topic: str) -> None: ...
def stats(self) -> dict: ...
async def dlq(self, topic: str) -> AsyncIterator[DeadEvent]: ...
async def retry(self, dead: DeadEvent) -> None: ...
Examples
Hello Mesh
Simplest possible example. Publish one event, subscribe, print.
examples/01_hello_mesh.pyConsumer Groups
4 payment events, 2 workers — each event handled by exactly one.
examples/02_consumer_groups.pyHuman-in-the-Loop
Agent requests approval, mock human reviews, mesh routes response.
examples/03_human_in_the_loop.pyReplay
Publish 5 events, restart mesh, replay full history from JSONL.
examples/04_replay.pyMulti-Agent Workflow
order → inventory → billing → notification using causality chains.
examples/05_multi_agent_workflow.pyMore examples on
agent-examples ↗
pip install agentmesh-bus
python examples/01_hello_mesh.py
The Stack
agentmesh is the connective tissue. Every other layer emits into it or reacts to it.
agentmesh # event bus connects agents, humans, systems
agentplane # control plane runtime policy, versioning, escalation
agenthooks # extensibility hookpoints, customer hooks
AgentGuard # safety injection, PII, jailbreak
agentregistry # discovery publish, version, deploy agents
agenteval # quality golden, adversarial, policy tests
agentobserve # observability unified dashboard