Part 1: User Guide
graphcrew -- User Guide & Feature Reference #
1.1 Introduction & Overview #
What is graphcrew? #
graphcrew is a Python library built on LangGraph for building production-grade, supervisor-led multi-agent systems. It provides the interfaces, orchestration scaffolding, and configuration system that developers need to compose LLM-powered agents into coordinated workflows.
What it is NOT #
graphcrew is not an application. It does not ship with a web server, CLI tool, or runnable program. It is a library that provides:
- Abstract base classes (
BaseAgent,SessionManager,SlotPersistence,EvictionStrategy,TokenCounter,RateLimiter,KnowledgeLoader) - Two built-in agents (
SupervisorAgentandSessionManagerAgent) - Declarative YAML configuration with Pydantic validation
- Graph builder utilities that wire agents into LangGraph
StateGraphinstances - Pluggable middleware, persistence, and observability layers
You build your own agents, wire them into a graph, and run the graph using LangGraph's execution engine.
Who is it for? #
Developers building multi-agent LLM applications who need:
- Constrained dynamic routing (agents choose next agent at runtime, validated against a topology)
- Tiered context slots with token budgets and automatic eviction
- A ReAct knowledge engine with per-agent knowledge modules loaded from the filesystem
- Production concerns out of the box: structured logging, rate limiting, circuit breakers, multi-tenant isolation, OpenTelemetry tracing
Core Value Propositions #
| Capability | Description |
|---|---|
| Declarative YAML config | Define agents, topology, context budgets, retry/timeout policies, and knowledge settings in a single YAML file. Pydantic validates everything at load time. |
| Constrained dynamic routing | Agents choose next_agent at runtime. The topology restricts which transitions are legal. Invalid routes fall back to default_return. |
| Tiered slot persistence | Context slots have TURN, SESSION, and LONG_TERM tiers. Eviction strategies (LRU, FIFO, priority, demotion) keep slots within token budgets. Pluggable SlotPersistence backends persist SESSION and LONG_TERM slots. |
| ReAct knowledge engine | Per-agent react_config.yaml declares actions and knowledge slots. Markdown files are loaded on-demand during the thought/action/observation loop. Token budgets cap loaded knowledge. |
1.2 Installation & Quick Start #
Installation #
# With pip
pip install graphcrew
# With uv
uv add graphcrewDependencies #
| Package | Version |
|---|---|
langgraph |
>= 1.0, < 2.0 |
langchain-core |
>= 0.3, < 1.0 |
pydantic |
>= 2.0, < 3.0 |
pyyaml |
>= 6.0, < 7.0 |
structlog |
>= 24.0, < 26.0 |
tenacity |
>= 8.0, < 10.0 |
Optional extras:
# OpenTelemetry instrumentation
pip install graphcrew[otel]Python Version #
Python >= 3.11 is required.
Minimal Example #
1. Define your config (config.yaml):
agents:
- name: supervisor
type: supervisor
description: Routes user requests to the appropriate agent
- name: greeter
type: custom
description: Greets users and tracks greeting count
topology:
edges:
- from_agent: supervisor
to_agent: greeter
- from_agent: greeter
to_agent: supervisor
default_return: supervisor
context:
shared_slots:
max_tokens: 4096
eviction_strategy: lru
session:
enabled: false2. Create a custom agent (agent.py):
from graphcrew import AgentResponse, BaseAgent, OrchestratorState
from langchain_core.runnables import RunnableConfig
class GreeterAgent(BaseAgent):
async def process(
self,
state: OrchestratorState,
config: RunnableConfig | None = None,
) -> AgentResponse:
count = state.get("context_slots", {}).get("greeting_count", 0) + 1
last_msg = state["messages"][-1].content if state["messages"] else ""
content = f"Hello! This is greeting #{count}. You said: {last_msg}"
return AgentResponse(
agent_name=self.name,
content=content,
next_agent="supervisor",
slots_update={"greeting_count": count},
)3. Build and run the graph (main.py):
import asyncio
from langchain_openai import ChatOpenAI
from graphcrew import (
AgentConfig,
SupervisorAgent,
TopologyResolver,
create_initial_state,
load_config,
)
from graphcrew.routing.graph_builder import build_hub_spoke_graph
from agent import GreeterAgent
async def main():
config = load_config("config.yaml")
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
topology = TopologyResolver(config=config.topology)
agent_configs = {a.name: a for a in config.agents}
supervisor = SupervisorAgent(
config=agent_configs["supervisor"],
llm=llm,
topology=topology,
)
greeter = GreeterAgent(config=agent_configs["greeter"])
graph = build_hub_spoke_graph(supervisor, {"greeter": greeter}, topology)
state = create_initial_state("Hello!", session_id="s1", user_id="alice")
result = await graph.ainvoke(state)
print(result["messages"][-1].content)
asyncio.run(main())For a complete runnable example, see examples/simple_chatbot/.
1.3 Core Concepts #
Agents #
Every agent extends BaseAgent (defined in agents/base.py). The only method you must implement is process():
class BaseAgent(ABC):
def __init__(
self,
config: AgentConfig,
template_manager: ReactTemplateManager | None = None,
react_executor: ReactExecutor | None = None,
middleware: Sequence[AgentMiddleware] | None = None,
*,
redact_pii: bool = False,
) -> None: ...
@abstractmethod
async def process(
self,
state: OrchestratorState,
config: RunnableConfig | None = None,
) -> AgentResponse: ...
async def invoke(
self,
state: OrchestratorState,
config: RunnableConfig | None = None,
) -> dict[str, Any]: ...process()is your agent logic. Return anAgentResponse.invoke()is called by LangGraph. It runs middleware, callsprocess(), builds the state update dict, and setscurrent_agentfor routing.
AgentResponse is a frozen dataclass:
@dataclass(frozen=True, slots=True)
class AgentResponse:
agent_name: str
content: str
next_agent: str | None = None # None = terminate (no handoff)
slots_update: dict[str, Any] = field(default_factory=dict)
slots_tier: dict[str, SlotTier] = field(default_factory=dict)
token_usage: dict[str, int] | None = NoneThe library ships two built-in agents:
SupervisorAgent-- Routes to worker agents via single-pass LLM call or ReAct loop.SessionManagerAgent-- ReAct agent that exposesSessionManagermethods as actions.
OrchestratorState #
OrchestratorState is a TypedDict shared across all graph nodes:
class OrchestratorState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
context_slots: Annotated[dict[str, Any], _merge_dicts]
current_agent: str
knowledge_context: Annotated[dict[str, Any], _merge_dicts]
session_id: NotRequired[str]
user_id: NotRequired[str]
request_id: NotRequired[str]messagesuses LangGraph'sadd_messagesreducer (appends and deduplicates by ID).context_slotsandknowledge_contextuse_merge_dictsreducer for safe parallel fan-in.current_agentis set byBaseAgent.invoke()-- tonext_agentwhen routing, or to the agent's own name as the END sentinel.request_idis auto-generated as UUID4 bycreate_initial_state().
Create initial state with:
state = create_initial_state(
"Hello!",
session_id="s1", # default: "default"
user_id="alice", # default: "anonymous"
current_agent="supervisor", # default: "supervisor"
)Topology #
TopologyResolver (defined in routing/topology.py) validates routing decisions against a directed edge graph:
topology = TopologyResolver(config=config.topology)
topology.is_allowed("supervisor", "greeter") # True if edge exists
topology.get_allowed_targets("supervisor") # Sorted list, always includes default_return
topology.validate(["supervisor", "greeter"]) # Returns warnings for unreachable agentsThe topology is defined in YAML:
topology:
edges:
- from_agent: supervisor
to_agent: worker_a
- from_agent: supervisor
to_agent: worker_b
- from_agent: worker_a
to_agent: supervisor
- from_agent: worker_b
to_agent: supervisor
default_return: supervisorContext Slots #
Context slots are schemaless key-value pairs with token budgets and tiered persistence. They are managed by SlotManager (defined in context/manager.py):
SlotTier.TURN-- Discarded at end of turn.SlotTier.SESSION-- Persisted for the session viaSlotPersistence.SlotTier.LONG_TERM-- Persisted across sessions, keyed byuser_id.
Agents write slots through AgentResponse.slots_update and slots_tier. BaseAgent.invoke() merges these into state["context_slots"].
Knowledge Modules #
The knowledge system (in knowledge/) provides per-agent markdown files loaded on-demand during ReAct execution. Each agent has:
llm_resources/<agent_name>/react_config.yaml-- Declares actions, slots, and their dependencies.llm_resources/<agent_name>/slots/*.md-- Markdown knowledge files.llm_resources/<agent_name>/react_core.md-- Core prompt template with{variable}placeholders.
ReactTemplateManager loads and validates the config, resolves slot dependencies, and assembles prompts. FileSystemKnowledgeLoader reads files with path containment enforcement.
ReAct Execution #
ReactExecutor (in react/executor.py) implements a multi-pass thought/action/observation loop:
- Build prompt from core template + dynamic context (previous thoughts, observations, loaded knowledge).
- Call LLM.
- Parse response -- either XML (
<thought>...</thought><action type="...">...</action>) or structured output (ReactStructuredResponse). - Execute the action handler.
- Collect observation, loop until a handler raises
TerminalActionResultormax_passesis reached.
Sessions #
SessionManager (in session/interface.py) is an ABC with three abstract methods:
save_messages(session_id, messages)-- Append messages.get_messages(session_id, limit, user_id)-- Return most recent messages.clear_session(session_id)-- Remove all data for a session.
Optional methods (default to NotImplementedError): save_summary, get_summary, store_memory, recall_memory, search_memories, search_memories_by_user, get_messages_paginated, update_memory, delete_memory, list_memories.
InMemorySessionManager ships for dev/testing with all 13 methods implemented.
1.4 Feature Reference #
1. Declarative YAML Configuration #
Define your entire orchestrator in a single YAML file. All fields are validated by Pydantic with frozen=True and extra="forbid" (typos raise ValidationError immediately).
When to use: Every project. YAML config is the entry point for defining agents, topology, context budgets, and operational parameters.
Loading config:
from graphcrew import load_config, load_config_from_string, load_config_from_dict
# From a file (accepts str or Path)
config = load_config("config.yaml")
# From a YAML string
config = load_config_from_string(yaml_str)
# From a dict
config = load_config_from_dict({"agents": [...], "topology": {...}})Complete YAML example with all sections:
agents:
- name: supervisor
type: supervisor
description: Routes requests to the right agent
model: gpt-4o # optional LLM model override
knowledge_enabled: true # enable knowledge module system
- name: researcher
type: custom
description: Performs research tasks
topology:
edges:
- from_agent: supervisor
to_agent: researcher
- from_agent: researcher
to_agent: supervisor
default_return: supervisor
context:
shared_slots:
max_tokens: 4096 # token budget for the slot pool (must be > 0)
tiered_slots:
turn:
max_tokens: 4096
session:
max_tokens: 4096
long_term:
max_tokens: 4096
eviction_strategy: lru # lru | fifo | priority | demotion
session:
enabled: true
knowledge:
enabled: true
base_dir: llm_resources # root directory for agent knowledge folders
max_total_tokens: 8192 # token budget for loaded knowledge (must be > 0)
retry:
initial_interval: 0.5 # seconds before first retry (must be > 0)
backoff_factor: 2.0 # exponential base multiplier
max_interval: 128.0 # maximum seconds between retries
max_attempts: 3 # total attempts including first (1 = no retry)
jitter: true # random jitter to prevent thundering herd
timeout:
llm_call_seconds: 30.0 # per-call LLM timeout
file_io_seconds: 5.0 # per-file knowledge load timeout
persistence_seconds: 10.0 # per-call slot persistence timeout
total_request_seconds: 120.0 # total timeout across all ReAct passesCross-validation: The OrchestratorConfig model validator ensures all topology edges and default_return reference agent names defined in the agents list.
2. Custom Agents #
Extend BaseAgent and implement process() to create any agent.
When to use: Every project. All worker agents are custom agents.
from graphcrew import AgentConfig, AgentResponse, BaseAgent, OrchestratorState, SlotTier
from langchain_core.runnables import RunnableConfig
class SummarizerAgent(BaseAgent):
async def process(
self,
state: OrchestratorState,
config: RunnableConfig | None = None,
) -> AgentResponse:
messages = state["messages"]
# Your summarization logic here
summary = f"Summarized {len(messages)} messages."
return AgentResponse(
agent_name=self.name,
content=summary,
next_agent=None, # None = terminate, graph ends
slots_update={"last_summary": summary},
slots_tier={"last_summary": SlotTier.SESSION},
)
# Instantiate with a config
agent = SummarizerAgent(
config=AgentConfig(name="summarizer", type="custom", description="Summarizes conversations")
)Agent name validation: Names must be 1--64 characters, start with a letter, match [a-zA-Z][a-zA-Z0-9_-]*, and cannot be __end__ or __start__ (LangGraph reserved).
3. Supervisor Routing #
SupervisorAgent routes to worker agents. It supports two modes:
- Simple mode (no
ReactTemplateManager) -- Single-pass LLM call. Attempts structured output (with_structured_output) first, falls back to regex word-boundary matching. - ReAct mode (with
ReactTemplateManager) -- Multi-pass ReAct loop with knowledge loading and adecideterminal action.
When to use: Every project needs a supervisor (or a custom routing agent).
from graphcrew import (
SupervisorAgent,
AgentConfig,
TopologyResolver,
RetryConfig,
TimeoutConfig,
SemaphoreRateLimiter,
MetricsMiddleware,
InMemoryMetricsCollector,
)
collector = InMemoryMetricsCollector()
supervisor = SupervisorAgent(
config=AgentConfig(name="supervisor", type="supervisor", description="Routes requests"),
llm=llm,
topology=topology,
retry_config=RetryConfig(max_attempts=3),
timeout_config=TimeoutConfig(llm_call_seconds=30.0),
rate_limiter=SemaphoreRateLimiter(max_concurrency=5),
use_structured_output=True,
middleware=[MetricsMiddleware(collector)],
redact_pii=True,
)Self-loop detection: if the supervisor routes to itself, RoutingSelfLoopError is raised.
4. Hub-Spoke Topology #
All worker agents route back to the supervisor unconditionally. The supervisor routes to workers via conditional edges.
When to use: Standard supervisor-led architectures where all coordination goes through a central supervisor.
from graphcrew.routing.graph_builder import build_hub_spoke_graph
graph = build_hub_spoke_graph(
supervisor=supervisor,
agents={"greeter": greeter, "researcher": researcher},
topology=topology,
)
result = await graph.ainvoke(create_initial_state("Hello"))topology:
edges:
- from_agent: supervisor
to_agent: greeter
- from_agent: supervisor
to_agent: researcher
- from_agent: greeter
to_agent: supervisor
- from_agent: researcher
to_agent: supervisor
default_return: supervisor5. Mesh Topology #
Workers can route directly to other workers without going through the supervisor.
When to use: When agents need peer-to-peer communication (e.g., a billing agent hands off directly to a support agent).
from graphcrew.routing.graph_builder import build_topology_graph
graph = build_topology_graph(
agents={
"supervisor": supervisor,
"billing": billing,
"support": support,
},
topology=topology,
entry_point="supervisor",
)topology:
edges:
- from_agent: supervisor
to_agent: billing
- from_agent: supervisor
to_agent: support
- from_agent: billing
to_agent: support # direct worker-to-worker
- from_agent: support
to_agent: supervisor
- from_agent: billing
to_agent: supervisor
default_return: supervisorbuild_conditional_edge() is also available for manual graph wiring:
from graphcrew.routing.graph_builder import build_conditional_edge
routing_fn = build_conditional_edge(topology, "supervisor")
# Returns END when state["current_agent"] == "supervisor" (termination sentinel)
# Falls back to topology.default_return for unknown targets6. Context Slots #
Schemaless key-value context with token budgets and tiered persistence.
When to use: When agents need to share structured data (user preferences, intermediate results, counters) across turns or sessions.
from graphcrew import SlotManager, SlotTier, ContextConfig
manager = SlotManager(config=ContextConfig())
# Shared slots (default tier: TURN)
manager.set_shared("user_name", "Alice")
manager.set_shared_tiered("preference", "dark_mode", tier=SlotTier.SESSION)
# Per-agent slots
manager.set_agent("researcher", "search_query", "quantum computing")
# Promote a slot to a higher tier
manager.promote_slot("user_name", SlotTier.LONG_TERM)
# Serialization
data = manager.to_dict() # {"shared": {...}, "agents": {...}}
manager.from_dict(data) # Restore from dict
# Lifecycle
async with SlotManager(config=ContextConfig()) as mgr:
mgr.set_shared("key", "value")Agents write slots via AgentResponse:
return AgentResponse(
agent_name=self.name,
content="Done",
slots_update={"result": computed_value},
slots_tier={"result": SlotTier.SESSION},
)7. Eviction Strategies #
When a slot pool exceeds its max_tokens budget, slots are evicted automatically.
When to use: When you need control over which slots are removed under memory pressure.
| Strategy | Behavior |
|---|---|
LRUEviction |
Evicts least recently accessed slots (default) |
FIFOEviction |
Evicts oldest created slots |
PriorityEviction |
Evicts lowest priority slots |
DemotionEviction |
Demotes TURN -> SESSION -> LONG_TERM before removing |
context:
eviction_strategy: demotion # or lru, fifo, priorityCustom eviction strategies extend EvictionStrategy:
from graphcrew import EvictionStrategy, Slot
class CustomEviction(EvictionStrategy):
def select(self, slots: list[Slot], count: int) -> list[Slot]:
# Return up to `count` slots to evict
return sorted(slots, key=lambda s: s.priority)[:count]8. Slot Persistence #
SlotPersistence is an ABC for persisting SESSION and LONG_TERM slots to durable storage.
When to use: When context must survive process restarts (production deployments).
from graphcrew import SlotPersistence, Slot
class PostgresSlotPersistence(SlotPersistence):
async def load_session_slots(self, session_id: str) -> list[Slot]: ...
async def save_session_slots(self, session_id: str, slots: list[Slot]) -> None: ...
async def delete_session_slots(self, session_id: str) -> None: ...
async def load_long_term_slots(self, user_id: str) -> list[Slot]: ...
async def save_long_term_slots(self, user_id: str, slots: list[Slot]) -> None: ...
async def delete_long_term_slots(self, user_id: str) -> None: ...The library ships InMemorySlotPersistence for dev/testing:
from graphcrew import InMemorySlotPersistence, SlotManager, ContextConfig
persistence = InMemorySlotPersistence()
manager = SlotManager(config=ContextConfig(), persistence=persistence)
await manager.hydrate(session_id="s1", user_id="alice")
# ... run your graph ...
await manager.flush(session_id="s1", user_id="alice")
await manager.clear_turn_slots()SlotManager.hydrate() loads persisted slots at graph start. SlotManager.flush() persists SESSION and LONG_TERM slots at graph end.
9. Knowledge Modules #
Per-agent markdown knowledge files loaded on-demand during ReAct execution.
When to use: When agents need access to structured reference information (API docs, routing rules, policies) that should be loaded selectively to stay within token budgets.
Directory structure:
llm_resources/
supervisor/
react_config.yaml
react_core.md
slots/
routing_rules.md
escalation_policy.mdreact_config.yaml:
max_passes: 5
actions:
request_slot:
description: Load a knowledge slot for reference
parameters:
slot: Name of the slot to load
required:
- slot
decide:
description: Make a final routing decision
parameters:
next_agent: The agent to route to
content: Summary of reasoning
required:
- next_agent
slots:
routing_rules:
description: Rules for routing requests to agents
modules:
- slots/routing_rules.md
triggers: When deciding which agent should handle the request
max_tokens: 2000
escalation_policy:
description: Escalation rules
modules:
- slots/escalation_policy.md
dependencies:
- routing_rulesReactTemplateManager validates the config at construction, including slot dependency cycle detection. ReactConfigSchema uses a merged validate_slot_dependencies validator that checks both reference validity and circular dependencies via DFS.
10. ReAct Execution Engine #
Multi-pass thought/action/observation loop implemented by ReactExecutor.
When to use: When agents need multi-step reasoning with access to knowledge modules and custom actions.
from graphcrew import (
ReactExecutor,
ReactTemplateManager,
FileSystemKnowledgeLoader,
RetryConfig,
TimeoutConfig,
)
loader = FileSystemKnowledgeLoader(base_dir="llm_resources")
template_manager = ReactTemplateManager(
agent_name="supervisor",
base_dir="llm_resources",
loader=loader,
max_loaded_tokens=8192,
)
executor = ReactExecutor(
agent_name="supervisor",
llm=llm,
template_manager=template_manager,
action_handlers={"request_slot": slot_handler, "decide": decide_handler},
max_passes=5,
retry_config=RetryConfig(max_attempts=3),
timeout_config=TimeoutConfig(llm_call_seconds=30.0, total_request_seconds=120.0),
max_history_tokens=2000,
rate_limiter=limiter,
use_structured_output=True,
)
result = await executor.execute(user_input="Hello", context={"key": "value"}, config=config)Key features:
- History truncation:
max_history_tokenskeeps only the most recent (thought, observation) pairs within a token budget. - Cancellation: Pass
cancel_event=asyncio.Event()to abort long-running loops. RaisesReactCancellationError. - Total timeout:
total_request_secondswraps the entire pass loop. - Structured output: When
use_structured_output=True, the executor usesReactStructuredResponseviawith_structured_output(). Falls back to XML parsing if unsupported. The capability is cached after the first probe. - Reserved state keys:
_user_input,_pass_number,_previous_thoughts,_observations,_loaded_slots,_loaded_slot_contentsare reserved.ReservedStateKeyErroris raised if context or handlers attempt to overwrite them. - Token usage tracking: Accumulated across all passes, returned in the terminal result as
_token_usage.
Action handlers implement the ActionHandler protocol:
class ActionHandler(Protocol):
async def __call__(
self,
params: dict[str, Any],
state: dict[str, Any],
config: RunnableConfig | None = None,
) -> tuple[str, dict[str, Any]]: ...Return (observation, state_updates). Raise TerminalActionResult(result_dict) to end the loop.
11. Session Management #
SessionManager ABC with three abstract methods plus optional methods for summaries, memories, and pagination.
When to use: When you need conversation history, summaries, or long-term memory.
from graphcrew import InMemorySessionManager, SessionMessage, Memory, Summary
session_mgr = InMemorySessionManager()
# Save messages
await session_mgr.save_messages("s1", [
SessionMessage(role="user", content="Hello", session_id="s1", user_id="alice"),
])
# Get messages (limit must be >= 1)
messages = await session_mgr.get_messages("s1", limit=50, user_id="alice")
# Store and recall memories
await session_mgr.store_memory("s1", Memory(
session_id="s1", key="preference", value="dark mode",
importance=0.8, user_id="alice",
), user_id="alice")
memory = await session_mgr.recall_memory("s1", "preference")
# Search memories (sorted by importance descending, expired memories excluded)
results = await session_mgr.search_memories("s1", "preference", limit=5)
# Cross-session search by user
results = await session_mgr.search_memories_by_user("alice", "preference", limit=10)
# Pagination
page = await session_mgr.get_messages_paginated("s1", cursor=None, limit=20)
# page.items, page.next_cursor, page.has_more
# Update memory (only content fields: value, importance, source_agent, categories, metadata, expires_at)
updated = await session_mgr.update_memory("s1", memory.id, {"importance": 0.9}, user_id="alice")
# Delete memory (with ownership check)
deleted = await session_mgr.delete_memory("s1", memory.id, user_id="alice")
# Lifecycle
await session_mgr.clear_session("s1") # Clears data but retains session lock
await session_mgr.close() # Sets _closed flag, clears all dataCapability discovery:
caps = session_mgr.get_capabilities()
# InMemorySessionManager returns all 13 methodsAsync context manager:
async with InMemorySessionManager() as mgr:
await mgr.save_messages("s1", messages)SessionMessage.to_langchain() converts to LangChain message types: "user" -> HumanMessage, "assistant"/"ai" -> AIMessage, "system" -> SystemMessage.
12. Multi-Tenant Isolation #
TenantIsolationMiddleware prevents agents from modifying identity fields (session_id, user_id).
When to use: Multi-tenant deployments where agents must not leak data across tenants.
from graphcrew import TenantIsolationMiddleware
middleware = TenantIsolationMiddleware()
agent = MyAgent(
config=agent_config,
middleware=[middleware],
)The middleware captures session_id and user_id in before_invoke, then checks both response.slots_update and direct state mutations in after_invoke. Raises TenantIsolationError on identity field changes.
BaseAgent.invoke() also independently logs a WARNING (identity_field_mutated) if process() mutates identity fields in state.
13. Agent Middleware #
AgentMiddleware provides lifecycle hooks around agent execution.
When to use: Cross-cutting concerns like logging, authentication, metrics, circuit breaking, tenant isolation.
from graphcrew import AgentMiddleware, OrchestratorState, AgentResponse
from langchain_core.runnables import RunnableConfig
class LoggingMiddleware(AgentMiddleware):
async def before_invoke(
self,
agent_name: str,
state: OrchestratorState,
config: RunnableConfig | None = None,
) -> OrchestratorState:
print(f"Before: {agent_name}")
return state
async def after_invoke(
self,
agent_name: str,
state: OrchestratorState,
response: AgentResponse,
config: RunnableConfig | None = None,
) -> AgentResponse:
print(f"After: {agent_name} -> {response.next_agent}")
return response
async def on_error(
self,
agent_name: str,
state: OrchestratorState,
error: Exception,
config: RunnableConfig | None = None,
) -> None:
print(f"Error: {agent_name}: {error}")Execution order (onion model):
before_invoke: first -> lastafter_invoke: last -> firston_error: last -> first (fire-and-forget, failures logged but never mask the original exception)
Middleware can modify state (in before_invoke) and response (in after_invoke).
Built-in middleware:
| Middleware | Purpose |
|---|---|
MetricsMiddleware |
Records invocation duration, token usage, errors |
TenantIsolationMiddleware |
Prevents identity field mutation |
CircuitBreakerMiddleware |
CLOSED -> OPEN -> HALF_OPEN state machine |
MemoryInjectionMiddleware |
Injects relevant memories into knowledge_context |
OpenTelemetryMiddleware |
Creates OTel spans around invocations |
All built-in middleware use contextvars.ContextVar for per-invocation state, making them safe for concurrent use across agents.
14. Metrics Collection #
MetricsMiddleware records invocation duration, token usage, and errors via a MetricsCollector protocol.
When to use: When you need visibility into agent performance and error rates.
from graphcrew import MetricsMiddleware, InMemoryMetricsCollector
collector = InMemoryMetricsCollector()
metrics_mw = MetricsMiddleware(collector)
agent = MyAgent(config=agent_config, middleware=[metrics_mw])MetricsCollector protocol:
class MetricsCollector(Protocol):
def record_invocation(self, record: InvocationRecord) -> None: ...
def record_error(self, record: ErrorRecord) -> None: ...InMemoryMetricsCollector ships for dev/testing. For production, implement MetricsCollector with Prometheus, Datadog, or your preferred metrics backend.
Duration is measured via time.monotonic() for clock-skew safety.
15. Circuit Breaker #
CircuitBreakerMiddleware implements a CLOSED -> OPEN -> HALF_OPEN -> CLOSED state machine.
When to use: When you need to fail fast during sustained failures to avoid wasting LLM spend.
from graphcrew import CircuitBreakerMiddleware
cb = CircuitBreakerMiddleware(
failure_threshold=5, # failures before opening
recovery_timeout_seconds=60, # seconds before half-open probe
)
agent = MyAgent(config=agent_config, middleware=[cb])When the circuit is open, CircuitBreakerOpenError is raised immediately without calling the agent.
16. Memory Injection #
MemoryInjectionMiddleware automatically searches session and cross-session memories and injects them into state["knowledge_context"]["_injected_memories"].
When to use: When agents should have access to relevant long-term context from previous interactions.
from graphcrew import MemoryInjectionMiddleware, InMemorySessionManager
session_mgr = InMemorySessionManager()
memory_mw = MemoryInjectionMiddleware(session_mgr, limit=5)
supervisor = SupervisorAgent(
config=agent_config,
llm=llm,
topology=topology,
middleware=[memory_mw],
)Search strategy:
- Session-scoped:
search_memories(session_id, query)using the last message as query. - Cross-session:
search_memories_by_user(user_id, query)only whenuser_idis set and not"anonymous". - Deduplicate by
Memory.key(higher importance wins), sort by importance descending, cap atlimit.
NotImplementedError from either search is silently ignored. Any other exception is logged as warning and injection is skipped. The middleware never blocks agent execution.
17. Rate Limiting #
RateLimiter ABC with acquire(), release(), and close() methods. Called before every LLM invocation.
When to use: When you need to control LLM API call concurrency or add delays between requests.
SemaphoreRateLimiter -- Single-tenant semaphore-based limiter:
from graphcrew import SemaphoreRateLimiter
limiter = SemaphoreRateLimiter(max_concurrency=5, delay_seconds=0.1)
supervisor = SupervisorAgent(
config=agent_config, llm=llm, topology=topology,
rate_limiter=limiter,
)PerTenantRateLimiter -- Per-tenant semaphore isolation with LRU eviction:
from graphcrew import PerTenantRateLimiter
limiter = PerTenantRateLimiter(
max_concurrency=5,
delay_seconds=0.0,
max_tenants=10_000,
)
# In your request handler:
limiter.set_tenant("tenant-abc")
await limiter.acquire()
try:
# ... LLM call ...
finally:
await limiter.release()set_tenant() must be called before acquire() or RuntimeError is raised. When max_tenants is reached, the least-recently-used tenant's limiter is evicted.
All rate limiters support async with for resource cleanup:
async with SemaphoreRateLimiter(max_concurrency=5) as limiter:
# ...18. Streaming #
SupervisorAgent.astream_process() supports streaming in simple mode (without ReAct).
When to use: Real-time response streaming (e.g., server-sent events in a web app).
async for chunk_meta, accumulated in supervisor.astream_process(state, config):
if chunk_meta["chunk_type"] == "token":
print(chunk_meta["delta"], end="", flush=True)
elif chunk_meta["chunk_type"] == "final":
print(f"\nRouting to: {chunk_meta['next_agent']}")astream_with_timeout() in core/llm_utils.py provides the underlying streaming primitive:
from graphcrew import astream_with_timeout
async for chunk in astream_with_timeout(llm, messages, timeout_seconds=30.0, config=config):
print(chunk.content, end="")Streaming is not supported in ReAct mode (XML parsing needs the full response). astream_process() raises NotImplementedError when a ReactExecutor is configured.
19. Structured Output #
Opt-in structured output for ReAct responses using Pydantic models.
When to use: When your LLM supports with_structured_output() and you want to avoid XML parsing.
executor = ReactExecutor(
agent_name="supervisor",
llm=llm,
template_manager=template_manager,
action_handlers=handlers,
use_structured_output=True,
)ReactStructuredResponse Pydantic model:
class ReactStructuredResponse(BaseModel):
thought: str
action: str
action_params: dict[str, Any] = Field(default_factory=dict)The executor probes llm.with_structured_output(ReactStructuredResponse) on the first invocation. If the LLM does not support it (AttributeError or NotImplementedError), it falls back to XML parsing. The capability result is cached for all subsequent passes.
The SupervisorAgent simple mode also uses structured output for routing decisions via an internal _RouteDecision model.
20. Observability (OpenTelemetry) #
Optional OpenTelemetry instrumentation via the [otel] extra.
When to use: When you use OpenTelemetry for distributed tracing (LangSmith, Langfuse, Jaeger, Datadog).
pip install graphcrew[otel]from graphcrew.observability import (
OTEL_AVAILABLE,
OpenTelemetryMiddleware,
get_tracer,
otel_span,
sync_otel_span,
)
# Middleware: creates spans around agent invocations
otel_mw = OpenTelemetryMiddleware()
agent = MyAgent(config=agent_config, middleware=[otel_mw])
# Manual spans
async with otel_span("my_operation", attributes={"key": "value"}) as span:
# ... your code ...
pass
with sync_otel_span("react_pass", agent_name="supervisor", pass_num=1):
# ... your code ...
passAll operations are no-ops when OpenTelemetry is not installed (OTEL_AVAILABLE == False), ensuring zero overhead in environments without OTel. _NoOpSpan and _NoOpTracer handle the fallback.
RunnableConfig (carrying LangSmith/Langfuse/OTel callbacks) is threaded from BaseAgent.invoke() through process(), ReactExecutor.execute(), and down to llm.ainvoke(messages, config=config).
21. Structured Logging #
Request and per-invocation correlation via structlog contextvars.
When to use: Every production deployment. Enables log correlation across agent invocations.
from graphcrew import bind_logging_context, clear_logging_context
# bind_logging_context is called automatically by BaseAgent.invoke()
# It binds: request_id, session_id, user_id, langgraph_step, langgraph_node
# App developers must configure structlog with merge_contextvars:
import structlog
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.dev.ConsoleRenderer(),
],
)
# Clear at request boundaries:
clear_logging_context()PII redaction: Set redact_pii=True on any agent to hash session_id and user_id with 12-char SHA-256 prefix in log output. request_id is left untouched.
agent = MyAgent(config=agent_config, redact_pii=True)Log events emitted by the library:
agent_invoke_start-- withagent_name,request_idagent_invoke_completed-- withagent_name,next_agent,slot_keys_updated, etc.agent_invoke_failed-- withagent_name,exc_type,exc_msgreact_pass-- withagent_name,pass_num,thought,actionreact_terminal-- withagent_name,pass_num,next_agentrouting_decision-- withnext_agent,allowed_targets
All log events use agent_name= (not agent=) for field naming consistency. response_metadata on AIMessages carries request_id, session_id, user_id, langgraph_step, and langgraph_node for downstream correlation.
22. Token Counting #
Pluggable TokenCounter ABC with a single count(text) -> int method.
When to use: Injected into SlotPool, SlotManager, ReactTemplateManager for token budget enforcement. Replace CharDivisionTokenCounter with a model-specific counter for production accuracy.
from graphcrew import TokenCounter, CharDivisionTokenCounter
# Default: len(text) // 4, minimum 1 for non-empty text, 0 for empty
counter = CharDivisionTokenCounter(chars_per_token=4)
counter.count("hello world") # 2
# Custom counter (e.g., tiktoken-based)
class TiktokenCounter(TokenCounter):
def __init__(self, model: str = "gpt-4o"):
import tiktoken
self._enc = tiktoken.encoding_for_model(model)
def count(self, text: str) -> int:
if not text:
return 0
return len(self._enc.encode(text))Inject into components:
slot_pool = SlotPool(token_counter=counter)
slot_manager = SlotManager(config=context_config, token_counter=counter)
template_manager = ReactTemplateManager(
agent_name="supervisor", base_dir="llm_resources",
loader=loader, token_counter=counter,
)23. Error Handling #
All public-facing errors inherit from OrchestratorError. The hierarchy:
OrchestratorError
ConfigurationError
ConfigLoadError
ConfigValidationError
LLMNotConfiguredError
ReactExecutorNotConfiguredError
AgentExecutionError
AgentProcessError
LLMError
RateLimitExceededError
LLMInvocationError
LLMRetryExhaustedError
SessionError
SessionManagerError
SessionManagerNotConfiguredError
StateError
InvalidStateError
ContextError
SlotPersistenceError
KnowledgeError
KnowledgeSlotNotFoundError
KnowledgePathError
KnowledgeLoadError
KnowledgeConfigValidationError
ReactError
ReactCancellationError
ReactMaxPassesError
ReservedStateKeyError
ReactConsecutiveErrorsError
RoutingError
RoutingSelfLoopError
TenantIsolationError
CircuitBreakerOpenErrorAll exceptions carry context attributes (e.g., AgentProcessError.agent_name, LLMRetryExhaustedError.attempts). Catch OrchestratorError for broad error handling, or specific subclasses for targeted recovery.
24. Health Checks #
SessionManager.health_check() and SlotPersistence.health_check() return {"status": "ok"} by default.
When to use: Kubernetes liveness/readiness probes, monitoring dashboards.
# SessionManager
status = await session_mgr.health_check()
# InMemorySessionManager: {"status": "ok", "sessions": 3}
# When closed: {"status": "closed", "sessions": 0}
# SlotPersistence
status = await persistence.health_check()
# InMemorySlotPersistence: {"status": "ok", "session_slots": 5, "long_term_slots": 2}Override in your custom implementation to add backend-specific checks (database connectivity, latency, etc.).
25. Transient Exception Registration #
Register provider-specific exception types as retryable.
When to use: When using LLM providers whose rate limit or transient errors are not OSError or TimeoutError.
from graphcrew import (
register_transient_exception,
deregister_transient_exception,
get_transient_exceptions,
)
# At app startup
from anthropic import RateLimitError
register_transient_exception(RateLimitError)
# Check current set
print(get_transient_exceptions()) # (OSError, TimeoutError, RateLimitError)
# Deregister (safe in test teardown, no-op if not registered)
deregister_transient_exception(RateLimitError)build_async_retryer() reads the transient exception set at call time, so newly registered types are picked up immediately. Registration is thread-safe via threading.Lock.
Default transient exceptions: OSError (covers ConnectionError, network errors) and TimeoutError.
26. Semantic Search #
SemanticSearchMixin is an abstract mix-in for adding vector-based memory search.
When to use: When you want semantic similarity search on memories using pgvector, Chroma, Pinecone, or another vector database.
from graphcrew import SemanticSearchMixin, InMemorySessionManager, Memory
class MySemanticSessionManager(SemanticSearchMixin, InMemorySessionManager):
async def embed_text(self, text: str) -> list[float]:
# Your embedding provider
return await my_embedder.embed(text)
async def search_memories_semantic(
self, session_id: str, query: str, limit: int = 5,
) -> list[Memory]:
query_vec = await self.embed_text(query)
# Your vector similarity search implementation
return await self._vector_store.search(session_id, query_vec, limit)SemanticSearchMixin must appear first in the MRO (class MyManager(SemanticSearchMixin, InMemorySessionManager)). It declares one abstract method embed_text(text: str) -> list[float] and a concrete search_memories_semantic() that raises NotImplementedError by default.
27. OrchestratorContext Lifecycle #
OrchestratorContext manages coordinated lifecycle for all orchestrator components.
When to use: Production deployments, especially with FastAPI or other async frameworks where deterministic resource cleanup is needed.
from graphcrew import OrchestratorContext
ctx = OrchestratorContext(
slot_manager=slot_mgr,
session_manager=session_mgr,
template_managers=[tm],
rate_limiter=limiter,
extras=[custom_resource],
)
async with ctx:
# Run your graph
result = await graph.ainvoke(state)
# ctx.close() tears down in safe order:
# extras -> template managers -> rate limiter -> slot manager -> session manager
# Errors are logged but never mask other component cleanup.
# Check status
print(ctx.is_closed) # True after close()28. Testing Utilities #
The graphcrew.testing subpackage provides utilities for unit and integration tests.
When to use: Testing agents, supervisors, and ReAct loops without real LLM API calls.
from graphcrew.testing import MockLLM, make_test_state, InMemorySessionManager
# MockLLM returns scripted responses
llm = MockLLM(responses=[
'<thought>Routing to greeter</thought><action type="decide">{"next_agent": "greeter"}</action>',
"Hello!",
])
# Raises IndexError when exhausted
# Optional: raise_on_call=ValueError to test error handling
# Optional: stream_chunks=[["chunk1", "chunk2"]] for astream testing
# Optional: structured_responses=[obj] for with_structured_output testing
# Optional: usage_tokens={"input_tokens": 10, "output_tokens": 5}
# make_test_state creates a pre-populated OrchestratorState
state = make_test_state("Hello")
# Defaults: session_id="test-session", user_id="test-user"
state = make_test_state("Hello", session_id="s1", user_id="alice")
# InMemorySessionManager re-exported for convenience
session_mgr = InMemorySessionManager()29. Graph Builders #
Helper functions that eliminate LangGraph graph wiring boilerplate.
When to use: Every project. Use build_hub_spoke_graph for standard supervisor architectures, build_topology_graph for mesh/hybrid topologies.
from graphcrew.routing.graph_builder import (
build_hub_spoke_graph,
build_topology_graph,
build_conditional_edge,
)
# Hub-spoke: all workers route back to supervisor
graph = build_hub_spoke_graph(supervisor, {"greeter": greeter}, topology)
# Topology-driven: edges match topology exactly (supports mesh)
graph = build_topology_graph(
agents={"supervisor": supervisor, "billing": billing, "support": support},
topology=topology,
entry_point="supervisor",
)
# Manual edge wiring
routing_fn = build_conditional_edge(topology, "supervisor")
builder.add_conditional_edges("supervisor", routing_fn, edge_map)build_conditional_edge() routing logic:
- Returns
ENDwhenstate["current_agent"] == from_agent_name(termination sentinel). - Falls back to
topology.default_returnfor empty/unknown targets. - Routes normally when the target is in
topology.get_allowed_targets().
Both build_hub_spoke_graph and build_topology_graph accept an optional state_schema parameter (defaults to OrchestratorState).
1.5 Production-Ready Example: DevOps Incident Response System #
This section presents a complete, production-grade multi-agent application built with
graphcrew. The system automates DevOps incident response using five agents
in a mesh topology.
System Overview #
The DevOps Incident Response System receives alert payloads, classifies the incident,
performs root-cause analysis, executes remediation, and generates post-mortem reports.
Five agents collaborate via a mesh topology:
+-----------+
+---->| monitor |----+
| +-----------+ |
| v
+------------+ +--------------+
| supervisor |<--------->| diagnostician|
+------------+ +--------------+
| |
| +-----------+ |
+---->| remediator|<---+
| +-----------+
| |
| +-----------+
+---->| reporter |
+-----------+Agents:
| Agent | Role | Type |
|---|---|---|
supervisor |
Orchestrates incident workflow, routes between agents | Built-in supervisor |
monitor |
Detects and classifies incidents from raw alert data | Custom BaseAgent |
diagnostician |
Performs root-cause analysis using knowledge modules (ReAct) | Custom BaseAgent |
remediator |
Executes remediation playbooks using knowledge modules (ReAct) | Custom BaseAgent |
reporter |
Generates incident reports and post-mortem summaries | Custom BaseAgent |
Library features demonstrated:
- Mesh topology with
build_topology_graph() - Custom agents extending
BaseAgent - Knowledge modules with ReAct config, core prompts, and slot files
InMemorySessionManagerwith memory storage and cross-session retrieval- Full middleware stack:
MetricsMiddleware,TenantIsolationMiddleware,CircuitBreakerMiddleware,MemoryInjectionMiddleware OrchestratorContextlifecycle managementPerTenantRateLimiterfor multi-tenant isolation- Streaming via SSE with
SupervisorAgent.astream_process() - Structured logging with PII redaction
- Health checks via
SessionManager.health_check() - Testing with
MockLLMandmake_test_state() - Full exception hierarchy handling
File: config.yaml #
The declarative YAML configuration defines all five agents, a mesh topology with
direct agent-to-agent edges, context slot budgets, session management, knowledge
module settings, retry/timeout policies, and eviction strategy.
# config.yaml -- DevOps Incident Response System
agents:
- name: supervisor
type: supervisor
description: >-
Orchestrates the incident response workflow. Routes incoming alerts to
the monitor for classification, then to diagnostician for root-cause
analysis, remediator for fixes, and reporter for post-mortems.
knowledge_enabled: true
- name: monitor
type: custom
description: >-
Detects and classifies incidents from raw alert payloads. Determines
severity (P1-P4) and affected service.
- name: diagnostician
type: custom
description: >-
Analyzes classified incidents to determine root cause. Uses runbook
knowledge modules for system-specific diagnostic procedures.
knowledge_enabled: true
- name: remediator
type: custom
description: >-
Executes remediation playbooks based on diagnosis. Uses playbook
knowledge modules for step-by-step remediation procedures.
knowledge_enabled: true
- name: reporter
type: custom
description: >-
Generates structured incident reports and post-mortem documents.
topology:
edges:
- from_agent: supervisor
to_agent: monitor
- from_agent: supervisor
to_agent: diagnostician
- from_agent: supervisor
to_agent: remediator
- from_agent: supervisor
to_agent: reporter
- from_agent: monitor
to_agent: diagnostician
- from_agent: monitor
to_agent: supervisor
- from_agent: diagnostician
to_agent: remediator
- from_agent: diagnostician
to_agent: supervisor
- from_agent: remediator
to_agent: reporter
- from_agent: remediator
to_agent: supervisor
- from_agent: reporter
to_agent: supervisor
default_return: supervisor
context:
shared_slots:
max_tokens: 8192
tiered_slots:
turn:
max_tokens: 4096
session:
max_tokens: 8192
long_term:
max_tokens: 4096
eviction_strategy: lru
session:
enabled: true
knowledge:
enabled: true
base_dir: knowledge
max_total_tokens: 16384
retry:
initial_interval: 1.0
backoff_factor: 2.0
max_interval: 30.0
max_attempts: 3
jitter: true
timeout:
llm_call_seconds: 30.0
total_request_seconds: 120.0File: agents.py #
Four custom agents, each extending BaseAgent with a full process() implementation.
"""Custom agents for the DevOps Incident Response System."""
from __future__ import annotations
import json
from typing import Any
from langchain_core.runnables import RunnableConfig
from graphcrew import AgentResponse, BaseAgent, OrchestratorState
from graphcrew.context.slots import SlotTier
class MonitorAgent(BaseAgent):
"""Classifies raw alerts into structured incidents."""
async def process(
self,
state: OrchestratorState,
config: RunnableConfig | None = None,
) -> AgentResponse:
messages = state.get("messages", [])
alert_text = str(messages[-1].content) if messages else "unknown alert"
severity = "P2"
service = "unknown"
category = "infrastructure"
if "cpu" in alert_text.lower() or "memory" in alert_text.lower():
category = "resource_exhaustion"
service = "compute"
elif "latency" in alert_text.lower() or "timeout" in alert_text.lower():
category = "performance"
service = "api-gateway"
elif "error rate" in alert_text.lower() or "5xx" in alert_text.lower():
category = "availability"
service = "web-service"
severity = "P1"
classification = {
"severity": severity,
"service": service,
"category": category,
"raw_alert": alert_text[:500],
}
return AgentResponse(
agent_name=self.name,
content=f"Incident classified: {severity} {category} on {service}",
next_agent="diagnostician",
slots_update={
"incident_classification": json.dumps(classification),
"incident_severity": severity,
"incident_service": service,
},
slots_tier={
"incident_severity": SlotTier.SESSION,
"incident_service": SlotTier.SESSION,
},
)
class DiagnosticianAgent(BaseAgent):
"""Performs root-cause analysis using context from the monitor."""
async def process(
self,
state: OrchestratorState,
config: RunnableConfig | None = None,
) -> AgentResponse:
slots = state.get("context_slots", {})
classification_raw = slots.get("incident_classification", "{}")
classification = json.loads(classification_raw) if isinstance(
classification_raw, str
) else classification_raw
category = classification.get("category", "unknown")
service = classification.get("service", "unknown")
root_cause = "unknown"
recommended_action = "escalate"
if category == "resource_exhaustion":
root_cause = f"Resource limits exceeded on {service}"
recommended_action = "scale_horizontally"
elif category == "performance":
root_cause = f"Latency degradation in {service}"
recommended_action = "circuit_breaker_and_retry"
elif category == "availability":
root_cause = f"Service {service} returning errors due to deployment regression"
recommended_action = "rollback_deployment"
diagnosis = {
"root_cause": root_cause,
"recommended_action": recommended_action,
"confidence": 0.85,
}
return AgentResponse(
agent_name=self.name,
content=f"Diagnosis: {root_cause}. Recommended: {recommended_action}",
next_agent="remediator",
slots_update={"incident_diagnosis": json.dumps(diagnosis)},
slots_tier={"incident_diagnosis": SlotTier.SESSION},
)
class RemediatorAgent(BaseAgent):
"""Executes remediation based on the diagnosis."""
async def process(
self,
state: OrchestratorState,
config: RunnableConfig | None = None,
) -> AgentResponse:
slots = state.get("context_slots", {})
diagnosis_raw = slots.get("incident_diagnosis", "{}")
diagnosis = json.loads(diagnosis_raw) if isinstance(
diagnosis_raw, str
) else diagnosis_raw
recommended_action = diagnosis.get("recommended_action", "escalate")
status = "completed"
steps = ["Executed remediation steps"]
remediation = {
"action_taken": recommended_action,
"steps": steps,
"status": status,
}
return AgentResponse(
agent_name=self.name,
content=f"Remediation {status}: {recommended_action}",
next_agent="reporter",
slots_update={"incident_remediation": json.dumps(remediation)},
slots_tier={"incident_remediation": SlotTier.SESSION},
)
class ReporterAgent(BaseAgent):
"""Generates incident reports from the full incident context."""
async def process(
self,
state: OrchestratorState,
config: RunnableConfig | None = None,
) -> AgentResponse:
slots = state.get("context_slots", {})
classification = json.loads(slots.get("incident_classification", "{}"))
diagnosis = json.loads(slots.get("incident_diagnosis", "{}"))
remediation = json.loads(slots.get("incident_remediation", "{}"))
report = f"""# Incident Report
**Severity:** {classification.get('severity', 'P3')}
**Service:** {classification.get('service', 'unknown')}
## Root Cause
{diagnosis.get('root_cause', 'unknown')}
## Remediation
**Action:** {remediation.get('action_taken', 'none')}
**Status:** {remediation.get('status', 'unknown')}"""
return AgentResponse(
agent_name=self.name,
content=report,
next_agent=None, # Terminates the graph
slots_update={"incident_report": report},
slots_tier={"incident_report": SlotTier.LONG_TERM},
)File: app.py #
The FastAPI application ties everything together: OrchestratorContext lifecycle,
PerTenantRateLimiter, mesh topology graph construction, streaming SSE endpoint,
health checks, and structured logging with PII redaction.
"""FastAPI application for the DevOps Incident Response System."""
from __future__ import annotations
import uuid
from contextlib import asynccontextmanager
from pathlib import Path
import structlog
from fastapi import FastAPI, Header, HTTPException
from pydantic import BaseModel
from graphcrew import (
AgentConfig,
CircuitBreakerOpenError,
InMemoryMetricsCollector,
OrchestratorContext,
OrchestratorError,
PerTenantRateLimiter,
RateLimitExceededError,
SupervisorAgent,
TopologyResolver,
clear_logging_context,
create_initial_state,
load_config,
)
from graphcrew.routing.graph_builder import build_topology_graph
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.dev.ConsoleRenderer(),
],
)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage OrchestratorContext lifecycle."""
config = load_config(Path("config.yaml"))
rate_limiter = PerTenantRateLimiter(max_concurrency=5, max_tenants=1000)
ctx = OrchestratorContext(rate_limiter=rate_limiter)
app.state.ctx = ctx
yield
await ctx.close()
app = FastAPI(
title="DevOps Incident Response System",
lifespan=lifespan,
)
@app.post("/incident")
async def create_incident(
alert: str,
x_user_id: str = Header(..., alias="X-User-Id"),
):
clear_logging_context()
session_id = f"incident-{uuid.uuid4()}"
state = create_initial_state(alert, session_id=session_id, user_id=x_user_id)
try:
result = await app.state.graph.ainvoke(state)
return {"incident_id": session_id, "report": result["messages"][-1].content}
except RateLimitExceededError:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
except CircuitBreakerOpenError:
raise HTTPException(status_code=503, detail="Service temporarily unavailable")
except OrchestratorError as exc:
raise HTTPException(status_code=500, detail=str(exc))
@app.get("/health")
async def health():
ctx = app.state.ctx
return {"status": "ok" if not ctx.is_closed else "closed"}Running the Application #
# Install dependencies
pip install graphcrew fastapi uvicorn langchain-openai
# Start the server
uvicorn devops_incident.app:app --host 0.0.0.0 --port 8000
# Submit an incident
curl -X POST http://localhost:8000/incident \
-H "Content-Type: application/json" \
-H "X-User-Id: ops-team" \
-d '{"alert": "CRITICAL: 5xx error rate above 10% on web-service"}'
# Health check
curl http://localhost:8000/healthFeature Coverage Summary #
| Library Feature | Where Demonstrated |
|---|---|
| Declarative YAML config | config.yaml |
| Mesh topology | config.yaml topology edges + app.py build_topology_graph() |
| Custom agents (BaseAgent) | agents.py -- 4 agents with full process() |
| AgentResponse with slots | agents.py -- slots_update, slots_tier, next_agent |
| SlotTier (SESSION, LONG_TERM) | agents.py -- tiered slot persistence |
| OrchestratorContext lifecycle | app.py -- async with + close() teardown |
| PerTenantRateLimiter | app.py -- set_tenant() before graph invocation |
| Health checks | app.py -- health_check() + is_closed |
| Exception hierarchy | app.py -- RateLimitExceededError, CircuitBreakerOpenError, OrchestratorError |
create_initial_state() |
app.py -- explicit session_id and user_id |
clear_logging_context() |
app.py -- request boundary cleanup |
1.6 Configuration Reference #
Complete YAML Schema #
All configuration models use ConfigDict(frozen=True, extra="forbid"). Unknown fields cause immediate ValidationError.
OrchestratorConfig (top-level) #
| Field | Type | Default | Description |
|---|---|---|---|
agents |
list[AgentConfig] |
required (min 1) | Agent definitions |
topology |
TopologyConfig |
{} |
Communication topology |
context |
ContextConfig |
{} |
Context management settings |
session |
SessionConfig |
{} |
Session management settings |
knowledge |
KnowledgeConfig |
{} |
Knowledge module system settings |
retry |
RetryConfig |
{} |
LLM retry with exponential backoff |
timeout |
TimeoutConfig |
{} |
Async timeout settings |
Model validator: validate_topology_agents ensures all topology edges and default_return reference agents from the agents list.
AgentConfig #
| Field | Type | Default | Validation | |
|---|---|---|---|---|
name |
str |
required | 1--64 chars, [a-zA-Z][a-zA-Z0-9_-]*, not __end__/__start__ |
|
type |
Literal["supervisor", "session_manager", "custom"] |
required | ||
description |
str |
required | min_length=1 | |
model |
`str \ | None` | None |
LLM model name override |
knowledge_enabled |
bool |
False |
Enable knowledge module system |
TopologyConfig #
| Field | Type | Default |
|---|---|---|
edges |
list[TopologyEdgeConfig] |
[] |
default_return |
str |
"supervisor" |
TopologyEdgeConfig #
| Field | Type |
|---|---|
from_agent |
str |
to_agent |
str |
ContextConfig #
| Field | Type | Default |
|---|---|---|
shared_slots |
SlotConfig |
{max_tokens: 4096} |
tiered_slots |
TieredSlotConfig |
(see below) |
eviction_strategy |
Literal["lru", "fifo", "priority", "demotion"] |
"lru" |
SlotConfig #
| Field | Type | Default | Validation |
|---|---|---|---|
max_tokens |
int |
4096 |
gt=0 |
TieredSlotConfig #
| Field | Type | Default |
|---|---|---|
turn |
TierConfig |
{max_tokens: 4096} |
session |
TierConfig |
{max_tokens: 4096} |
long_term |
TierConfig |
{max_tokens: 4096} |
TierConfig #
| Field | Type | Default | Validation |
|---|---|---|---|
max_tokens |
int |
4096 |
gt=0 |
SessionConfig #
| Field | Type | Default |
|---|---|---|
enabled |
bool |
True |
KnowledgeConfig #
| Field | Type | Default | Validation |
|---|---|---|---|
enabled |
bool |
False |
|
base_dir |
str |
"llm_resources" |
|
max_total_tokens |
int |
8192 |
gt=0 |
RetryConfig #
| Field | Type | Default | Validation | Description |
|---|---|---|---|---|
initial_interval |
float |
0.5 |
gt=0 | Seconds before first retry |
backoff_factor |
float |
2.0 |
gt=0 | Exponential base multiplier |
max_interval |
float |
128.0 |
gt=0 | Maximum seconds between retries |
max_attempts |
int |
3 |
ge=1 | Total attempts including first (1 = no retry) |
jitter |
bool |
True |
Random jitter (AWS Full Jitter algorithm) |
Uses tenacity's wait_random_exponential when jitter=True, wait_exponential_jitter when jitter=False.
TimeoutConfig #
All values in seconds. None disables the timeout (default for all fields). Production guidance: set at 2--3x your p99 observed latency.
| Field | Type | Default | Validation | |
|---|---|---|---|---|
llm_call_seconds |
`float \ | None` | None |
gt=0 |
file_io_seconds |
`float \ | None` | None |
gt=0 |
persistence_seconds |
`float \ | None` | None |
gt=0 |
total_request_seconds |
`float \ | None` | None |
gt=0 |
react_config.yaml Schema #
Per-agent config file at <base_dir>/<agent_name>/react_config.yaml:
ReactConfigSchema #
| Field | Type | Default | Validation |
|---|---|---|---|
max_passes |
int |
5 (from DEFAULT_MAX_PASSES) |
gt=0 |
actions |
dict[str, ReactActionSchema] |
{} |
|
slots |
dict[str, ReactSlotSchema] |
{} |
Model validator: validate_slot_dependencies checks reference validity (rejects unknown/self dependencies) and detects cycles via DFS.
ReactActionSchema #
| Field | Type | Default |
|---|---|---|
description |
str |
required |
parameters |
dict[str, str] |
{} |
required |
list[str] |
[] |
ReactSlotSchema #
| Field | Type | Default | Validation | |
|---|---|---|---|---|
description |
str |
required | ||
modules |
list[str] |
required | min_length=1 | |
dependencies |
list[str] |
[] |
||
triggers |
str |
"" |
||
max_tokens |
`int \ | None` | None |
gt=0 |
1.7 Migration & Compatibility #
Python Version #
Python >= 3.11 is required. The library uses:
StrEnum(Python 3.11+) forSlotTierasyncio.timeout()(Python 3.11+) for timeout managementfrom __future__ import annotationsin every module- Type union syntax (
X | Y) throughout
CI tests on Python 3.11, 3.12, and 3.13.
Dependency Constraints #
| Dependency | Range |
|---|---|
langgraph |
>= 1.0, < 2.0 |
langchain-core |
>= 0.3, < 1.0 |
pydantic |
>= 2.0, < 3.0 |
pyyaml |
>= 6.0, < 7.0 |
structlog |
>= 24.0, < 26.0 |
tenacity |
>= 8.0, < 10.0 |
opentelemetry-api (optional) |
>= 1.24, < 2.0 |
Dev dependencies have upper bounds to prevent surprise major-version breakage (e.g., pytest>=8.0,<9.0, hypothesis>=6.100,<7.0).
Frozen Config Models #
All Pydantic config models use ConfigDict(frozen=True, extra="forbid"). This means:
- Config objects are immutable after creation.
- Unknown fields (typos) raise
ValidationErrorimmediately at load time. - Session data models (
SessionMessage,Summary,Memory,PaginatedResult) and knowledge schema models (ReactActionSchema,ReactSlotSchema,ReactConfigSchema) also use this pattern.
Backward Compatibility #
SlotTieris aStrEnum, soSlotTier.TURN == "turn"works directly.SlotManager.from_dict()accepts both the new format ({"value": ..., "tier": ..., "priority": ...}) and the legacy bare-value format (defaults toSlotTier.TURN).load_config()accepts bothstrandPatharguments.RateLimiter.release()has a concrete no-op default for backward compat.SessionMessage.roleaccepts"assistant"and"ai"interchangeably (both map toAIMessage).AgentResponse.slots_tieris typed asdict[str, SlotTier]but is backward-compatible viaStrEnumequality.
Part 2: Developer Guide
graphcrew -- Part 2: Architecture Deep Dive #
Library version: 0.1.0 | Python: >= 3.11 | Engine: LangGraph >= 1.0 | License: MIT
2.1 Development Environment Setup #
Prerequisites #
| Tool | Version | Purpose |
|---|---|---|
| Python | >= 3.11 | Runtime (StrEnum, ExceptionGroup, asyncio.TaskGroup) |
| uv | latest | Package manager (replaces pip + virtualenv) |
| git | >= 2.30 | Version control |
| make | any | Task runner (make all = lint + typecheck + test) |
Clone and Install #
git clone https://github.com/Atiqul-Islam/swagent.git
cd swagent
# Create virtual environment and install all deps (including dev)
uv sync --all-extras --devVerify Installation #
# Run the full quality gate (lint + typecheck + tests)
make all
# Or run individually:
uv run ruff check src/ tests/ # Linting
uv run ruff format --check src/ tests/ # Formatting
uv run mypy src/ tests/ # Type checking (strict)
uv run pytest # 1300+ tests, 98%+ coverageIDE Setup #
VS Code (recommended):
- Install the Python, Pylance, and Ruff extensions.
- Set the interpreter to the
.venvcreated byuv sync. - Pylance will pick up
py.typedfor PEP 561 type information.
PyCharm:
- Mark
src/as a Sources Root. - Configure the project interpreter to the
uv-managed virtualenv.
Optional Extras #
# OpenTelemetry instrumentation support
uv sync --extra otel2.2 Architecture Overview #
Layer Diagram (8 Layers) #
+---------------------------------------------------------------------------+
| graphcrew (library) |
| |
| +---------------------------------------------------------------------+ |
| | 1. Configuration | |
| | YAML -> Pydantic -> agents, topology, slots, knowledge | |
| +----------------------------+----------------------------------------+ |
| | |
| v |
| +---------------------------------------------------------------------+ |
| | 2. Core | |
| | OrchestratorState, types, exceptions | |
| +--------+------------------------------+-----------------------------+ |
| | | |
| v v |
| +----------------+ +-------------------------------------------+ |
| | 3. Built-in | | Routing | |
| | Agents | | TopologyResolver (validates) | |
| | | | | |
| | Supervisor | | | |
| | SessionMgr | +-------------------------------------------+ |
| +----------------+ |
| |
| +---------------------------------------------------------------------+ |
| | 4. Slot-Based Context Management | |
| | SlotManager -> per-agent pools + shared pool | |
| | SlotTier (TURN / SESSION / LONG_TERM) | |
| | TokenCounter (ABC) -> pluggable token estimation | |
| | EvictionStrategy (ABC) -> LRU / FIFO / Priority / Demotion | |
| | SlotPersistence (ABC) -> pluggable durable storage | |
| +---------------------------------------------------------------------+ |
| |
| +---------------------------------------------------------------------+ |
| | 5. Session Layer | |
| | SessionManager (ABC: 3 abstract methods) | |
| | Summaries + memories default to NotImplementedError | |
| | InMemorySessionManager ships for dev/testing (per-session locks) | |
| +---------------------------------------------------------------------+ |
| |
| +---------------------------------------------------------------------+ |
| | 6. Knowledge Module System | |
| | KnowledgeLoader (ABC) + FileSystemKnowledgeLoader | |
| | ReactTemplateManager (per-agent config + slot loading) | |
| | ReactConfigSchema (Pydantic validation for react_config.yaml) | |
| +---------------------------------------------------------------------+ |
| |
| +---------------------------------------------------------------------+ |
| | 7. ReAct Execution Engine | |
| | ReactExecutor (multi-pass loop + token-budgeted history window) | |
| | ReactParsedResponse, TerminalActionResult, ActionHandler | |
| +---------------------------------------------------------------------+ |
| |
| +---------------------------------------------------------------------+ |
| | 8. Extension Points (what developers implement) | |
| | BaseAgent (ABC) | SessionManager (ABC) | KnowledgeLoader (ABC) | |
| | EvictionStrategy (ABC) | SlotPersistence (ABC) | TokenCounter (ABC)| |
| | AgentMiddleware | RateLimiter (ABC) | YAML config | |
| +---------------------------------------------------------------------+ |
+---------------------------------------------------------------------------+Request Flow #
User Input
|
v
[Supervisor Node] --- reads context slots ---> [SlotManager]
| |
| (conditional edge: route_decision) | (optional)
v v
[Agent-X Node] <--- historical context --- [SessionManagerAgent]
| |
| (handoff / return) v
v [SessionManager impl]
[Supervisor Node] (developer-provided)
|
v
Final Response- User input enters the Supervisor node.
- Supervisor reads context from SlotManager and optionally retrieves history via SessionManagerAgent.
- Supervisor selects the target agent, constrained by the topology configuration.
- The target agent executes. It may hand off to another agent or return to the supervisor.
- Supervisor aggregates results and produces the final response.
Layer Responsibilities #
| Layer | Responsibility |
|---|---|
| 1. Configuration | YAML files parsed into frozen Pydantic models. Validates agents, topology edges, slot budgets, knowledge paths, retry/timeout settings. |
| 2. Core | OrchestratorState TypedDict with dict-merge reducers for safe parallel fan-in. Shared types (AgentResponse), exception hierarchy, token counting ABC, LLM utilities (retry, timeout, streaming), rate limiting, structured logging context, OrchestratorContext lifecycle manager. |
| 3. Agent Runtime | BaseAgent ABC with middleware hooks, SupervisorAgent (simple + ReAct + streaming modes), SessionManagerAgent (ReAct agent using session methods as tools), routing via TopologyResolver. |
| 4. Context Management | Tiered slot pools (TURN/SESSION/LONG_TERM) with configurable eviction (LRU, FIFO, priority, demotion) and pluggable persistence backends. |
| 5. Session Layer | SessionManager ABC with 3 abstract methods (messages + lifecycle). Summaries and memories opt-in via override. InMemorySessionManager for dev/testing with per-session async locks. SemanticSearchMixin for vector search. |
| 6. Knowledge | Per-agent react_config.yaml defining knowledge slots and actions. ReactTemplateManager handles validation, dependency resolution, and prompt assembly with async-safe caching. |
| 7. ReAct Engine | Multi-pass thought/action/observation loop. Each pass: build prompt, call LLM, parse XML or structured response, execute action handler, collect observation. Loops until terminal action or max_passes. |
| 8. Extension Points | ABCs and protocols that developers implement: BaseAgent, SessionManager, KnowledgeLoader, SlotPersistence, EvictionStrategy, TokenCounter, AgentMiddleware, RateLimiter. |
2.3 Package-by-Package Documentation #
2.3.1 config/ -- Configuration #
Purpose: Load YAML configuration files, validate them with Pydantic, and provide typed config objects to the rest of the library.
Files:
schema.py-- Pydantic models for all configuration sectionsloader.py-- YAML loading, validation,load_config()/load_config_from_dict()/load_config_from_string()defaults.py--get_default_config()factory
Key Classes:
class OrchestratorConfig(BaseModel):
"""Top-level config. frozen=True, extra='forbid'."""
agents: list[AgentConfig]
topology: TopologyConfig
context: ContextConfig
session: SessionConfig
knowledge: KnowledgeConfig
retry: RetryConfig
timeout: TimeoutConfig
class AgentConfig(BaseModel):
"""Per-agent config with name validation."""
name: str # [a-zA-Z][a-zA-Z0-9_-]*, <= 64 chars, no reserved names
type: Literal["supervisor", "session_manager", "custom"]
description: str # min_length=1
model: str | None
knowledge_enabled: bool
class TopologyConfig(BaseModel):
edges: list[TopologyEdgeConfig]
default_return: str # default "supervisor"
class TimeoutConfig(BaseModel):
llm_call_seconds: float | None
file_io_seconds: float | None
persistence_seconds: float | None
total_request_seconds: float | None
class RetryConfig(BaseModel):
initial_interval: float # 0.5
backoff_factor: float # 2.0
max_interval: float # 128.0
max_attempts: int # 3 (ge=1; 1 = no retry)
jitter: bool # True (AWS Full Jitter via tenacity)Functions:
def load_config(path: str | Path) -> OrchestratorConfig: ...
def load_config_from_dict(data: dict) -> OrchestratorConfig: ...
def load_config_from_string(yaml_str: str) -> OrchestratorConfig: ...
def get_default_config() -> OrchestratorConfig: ...Patterns:
- All models use
ConfigDict(frozen=True, extra="forbid")-- prevents mutation and rejects unknown fields. - Cross-validation via
@model_validator: topology edges anddefault_returnare checked against known agent names. - Agent name validation via
@field_validator: regex pattern, length limit, reserved name rejection.
Dependencies: pydantic, pyyaml
Extension Points: None. Config models are final. Developers write YAML files that conform to the schema.
2.3.2 core/ -- Core Types and Infrastructure #
Purpose: Shared state definition, exception hierarchy, token counting, LLM call utilities, rate limiting, logging context, and the OrchestratorContext lifecycle manager.
Files:
state.py--OrchestratorStateTypedDict,create_initial_state()types.py--AgentResponsefrozen dataclassexceptions.py-- Full exception hierarchy (see Section 2.5)token_counting.py--TokenCounterABC,CharDivisionTokenCounterlogging_context.py--bind_logging_context(),clear_logging_context()llm_utils.py--ainvoke_with_timeout(),astream_with_timeout(),build_async_retryer(), transient exception registryrate_limiting.py--RateLimiterABC,SemaphoreRateLimiter,PerTenantRateLimiterorchestrator_context.py--OrchestratorContext(coordinated lifecycle)
Key Classes:
class OrchestratorState(TypedDict, total=False):
"""LangGraph state. Uses _merge_dicts reducer for context_slots and knowledge_context."""
messages: Annotated[list[BaseMessage], add_messages]
current_agent: str
context_slots: Annotated[dict[str, Any], _merge_dicts]
knowledge_context: Annotated[dict[str, Any], _merge_dicts]
session_id: str
user_id: str
request_id: str
@dataclass(frozen=True, slots=True)
class AgentResponse:
agent_name: str
content: str
next_agent: str | None = None
slots_update: dict[str, Any] = field(default_factory=dict)
slots_tier: dict[str, SlotTier] = field(default_factory=dict)
token_usage: dict[str, int] | None = None
class TokenCounter(ABC):
@abstractmethod
def count(self, text: str) -> int: ...
class CharDivisionTokenCounter(TokenCounter):
"""max(1, len(text) // chars_per_token) for non-empty; 0 for empty. Default chars_per_token=4."""
class RateLimiter(ABC):
async def acquire(self) -> None: ...
async def release(self) -> None: ... # concrete no-op default
async def close(self) -> None: ...
# Also supports async with protocol
class SemaphoreRateLimiter(RateLimiter):
def __init__(self, *, max_concurrency: int = 1, delay_seconds: float = 0.0): ...
class PerTenantRateLimiter(RateLimiter):
def __init__(self, *, max_concurrency: int = 1, delay_seconds: float = 0.0, max_tenants: int = 10_000): ...
def set_tenant(self, tenant_id: str) -> None: ...
class OrchestratorContext:
"""Manages coordinated startup/shutdown of SlotManager, SessionManager,
ReactTemplateManager, RateLimiter, and extras. Supports async with."""
async def close(self) -> None: ...
@property
def is_closed(self) -> bool: ...Functions:
def create_initial_state(
user_input: str,
*,
current_agent: str = "supervisor",
session_id: str = "default",
user_id: str = "anonymous",
) -> OrchestratorState: ...
def bind_logging_context(state, config, *, redact_pii=False) -> None: ...
def clear_logging_context() -> None: ...
async def ainvoke_with_timeout(llm, messages, timeout_seconds, config) -> Any: ...
async def astream_with_timeout(llm, messages, timeout_seconds, config) -> AsyncIterator: ...
def build_async_retryer(retry_config: RetryConfig) -> AsyncRetrying: ...
def register_transient_exception(exc_type: type[Exception]) -> None: ...
def deregister_transient_exception(exc_type: type[Exception]) -> None: ...
def get_transient_exceptions() -> tuple[type[Exception], ...]: ...Patterns:
_merge_dictsreducer oncontext_slotsandknowledge_contextenables safe LangGraph parallel fan-in.- Logging context uses structlog contextvars;
bind_logging_contextinjectsrequest_id,session_id,user_id,langgraph_step,langgraph_node. - PII redaction:
redact_pii=Truehashessession_id/user_idwith 12-char SHA-256 prefix. - Transient exception registry is thread-safe (
threading.Lock); default set is(OSError, TimeoutError). OrchestratorContext.close()tears down in safe order: extras, template managers, rate limiter, slot manager, session manager.
Dependencies: langchain-core, structlog, tenacity
Extension Points: TokenCounter ABC, RateLimiter ABC.
2.3.3 agents/ -- Agent Runtime #
Purpose: Define the agent lifecycle (BaseAgent ABC), two built-in agents (SupervisorAgent, SessionManagerAgent), and a middleware system for cross-cutting concerns.
Files:
base.py--BaseAgentABCmiddleware.py--AgentMiddlewarebase classsupervisor.py--SupervisorAgent(simple mode, ReAct mode, streaming)session_manager_agent.py--SessionManagerAgent(ReAct agent wrappingSessionManagermethods)metrics.py--MetricsCollectorprotocol,MetricsMiddleware,InMemoryMetricsCollectortenant_isolation.py--TenantIsolationMiddlewarecircuit_breaker.py--CircuitBreakerMiddleware,CircuitBreakerStateenummemory_injection.py--MemoryInjectionMiddleware
Key Classes:
class BaseAgent(ABC):
def __init__(
self,
config: AgentConfig,
template_manager: ReactTemplateManager | None = None,
react_executor: ReactExecutor | None = None,
middleware: Sequence[AgentMiddleware] | None = None,
*,
redact_pii: bool = False,
) -> None: ...
@abstractmethod
async def process(self, state: OrchestratorState, config: RunnableConfig | None = None) -> AgentResponse: ...
async def astream_process(self, state, config) -> AsyncGenerator[tuple[dict, str], None]: ...
async def invoke(self, state: OrchestratorState, config: RunnableConfig | None = None) -> dict[str, Any]: ...
class AgentMiddleware:
"""Override only what you need. All hooks have concrete no-op defaults."""
async def before_invoke(self, agent_name, state, config) -> OrchestratorState: ...
async def after_invoke(self, agent_name, state, response, config) -> AgentResponse: ...
async def on_error(self, agent_name, state, error, config) -> None: ...
class SupervisorAgent(BaseAgent):
"""Built-in supervisor. Simple mode (single LLM call) or ReAct mode."""
def __init__(self, config, llm, topology, *, template_manager=None,
react_executor=None, middleware=None, rate_limiter=None,
use_structured_output=False, redact_pii=False): ...
async def process(self, state, config) -> AgentResponse: ...
async def astream_process(self, state, config) -> AsyncGenerator: ...
class SessionManagerAgent(BaseAgent):
"""ReAct agent using SessionManager methods as action handlers."""
def __init__(self, config, llm, session_manager, *,
template_manager=None, middleware=None,
rate_limiter=None, redact_pii=False): ...
class MetricsMiddleware(AgentMiddleware):
def __init__(self, collector: MetricsCollector): ...
class TenantIsolationMiddleware(AgentMiddleware):
"""Raises TenantIsolationError if process() mutates session_id/user_id."""
class CircuitBreakerMiddleware(AgentMiddleware):
"""CLOSED -> OPEN -> HALF_OPEN -> CLOSED state machine."""
def __init__(self, *, failure_threshold: int = 5, recovery_timeout_seconds: float = 30.0): ...
class MemoryInjectionMiddleware(AgentMiddleware):
"""Injects relevant memories into state['knowledge_context']['_injected_memories']."""
def __init__(self, session_manager: SessionManager, *, limit: int = 5): ...Patterns:
invoke()is the LangGraph node entry point. It callsprocess()(the abstract method subclasses implement), wraps the result in anAIMessage, and returns a state update dict.- Middleware follows the onion model:
before_invokeruns first-to-last,after_invokeruns last-to-first,on_errorruns last-to-first fire-and-forget. invoke()always setscurrent_agent-- toresponse.next_agentwhen routing, or toself.namewhen terminating (the END sentinel).- Identity field mutation detection:
invoke()snapshotssession_id/user_idbeforeprocess()and logs a WARNING if they change. response_metadataon AIMessages carriesrequest_id,session_id,user_id,langgraph_step,langgraph_node, andtoken_usage.- Middleware state is stored in
contextvars.ContextVarfor concurrency safety. - Constructor optional params are keyword-only (
*separator).
Dependencies: core/, config/, knowledge/, react/, routing/
Extension Points: BaseAgent ABC (implement process()), AgentMiddleware (override hooks).
2.3.4 context/ -- Slot-Based Context Management #
Purpose: Manage key-value context slots with token budgets, tiered persistence (TURN/SESSION/LONG_TERM), configurable eviction, and pluggable durable storage.
Files:
slots.py--Slotdataclass,SlotPool,SlotTier(StrEnum),TIER_ORDERtuplemanager.py--SlotManager(shared + per-agent pools, hydrate/flush lifecycle)eviction.py--EvictionStrategyABC,LRUEviction,FIFOEviction,PriorityEviction,DemotionEvictionpersistence.py--SlotPersistenceABC,InMemorySlotPersistence
Key Classes:
class SlotTier(StrEnum):
TURN = "turn" # Discarded at end of turn
SESSION = "session" # Persisted for the session
LONG_TERM = "long_term" # Persisted across sessions
TIER_ORDER: tuple[SlotTier, ...] = (SlotTier.TURN, SlotTier.SESSION, SlotTier.LONG_TERM)
@dataclass(slots=True)
class Slot:
key: str
value: Any
priority: int
tier: SlotTier
created_at: float
accessed_at: float
tokens: int
class SlotPool:
"""Token-budgeted collection. Supports len() and 'in' operators."""
def add(self, key, value, priority=0, *, tier=SlotTier.TURN) -> None: ...
def get(self, key) -> Any | None: ... # Updates accessed_at
def get_slot(self, key) -> Slot | None: ... # Updates accessed_at
def remove(self, key) -> bool: ...
def slots(self) -> list[Slot]: ...
@property
def total_tokens(self) -> int: ...
class SlotManager:
def __init__(self, config: ContextConfig, persistence=None, token_counter=None, timeout_config=None): ...
def set_shared(self, key, value, priority=0) -> None: ...
def get_shared(self, key) -> Any | None: ...
def set_shared_tiered(self, key, value, tier: SlotTier, priority=0) -> None: ...
def promote_slot(self, key, target_tier: SlotTier) -> bool: ...
def set_agent(self, agent_name, key, value, priority=0) -> None: ...
def get_agent(self, agent_name, key) -> Any | None: ...
async def hydrate(self, session_id, user_id=None) -> None: ...
async def flush(self, session_id, user_id=None) -> None: ...
async def clear_turn_slots(self) -> None: ...
def to_dict(self) -> dict: ...
def from_dict(self, data: dict) -> None: ...
async def close(self) -> None: ...
# Supports async with
class EvictionStrategy(ABC):
@abstractmethod
def select(self, slots: list[Slot], count: int) -> list[Slot]: ...
class SlotPersistence(ABC):
@abstractmethod
async def load_session_slots(self, session_id: str) -> list[Slot]: ...
@abstractmethod
async def save_session_slots(self, session_id: str, slots: list[Slot]) -> None: ...
@abstractmethod
async def load_long_term_slots(self, user_id: str) -> list[Slot]: ...
@abstractmethod
async def save_long_term_slots(self, user_id: str, slots: list[Slot]) -> None: ...
async def health_check(self) -> dict[str, Any]: ...Patterns:
- Eviction runs synchronously after every
set_shared()/set_shared_tiered()call. It is atomic in asyncio's single-threaded model. DemotionEvictiondemotes slots through tiers (LONG_TERM -> SESSION -> TURN) before removing.hydrate()andflush()useasyncio.Lockwith snapshot-then-release pattern: I/O outside lock, writes inside lock.to_dict()uses theslots()bulk enumerator to avoid mutatingaccessed_attimestamps.from_dict()supports both new format ({"value": ..., "tier": ..., "priority": ...}) and legacy bare values.- Closed guard: post-close operations raise
RuntimeError.
Dependencies: core/ (token counting, exceptions, config schema)
Extension Points: EvictionStrategy ABC, SlotPersistence ABC, TokenCounter ABC (injected).
2.3.5 session/ -- Session Layer #
Purpose: Abstract interface for message persistence, summaries, and memories. Ships InMemorySessionManager for development and SemanticSearchMixin for vector search integration.
Files:
models.py--SessionMessage,Summary,Memory,PaginatedResult(all frozen Pydantic)interface.py--SessionManagerABCin_memory.py--InMemorySessionManagersemantic.py--SemanticSearchMixin
Key Classes:
class SessionManager(ABC):
"""3 abstract methods. Everything else raises NotImplementedError by default."""
# Abstract (must implement)
@abstractmethod
async def save_messages(self, session_id: str, messages: list[SessionMessage]) -> None: ...
@abstractmethod
async def get_messages(self, session_id: str, limit: int = 50, user_id: str | None = None) -> list[SessionMessage]: ...
@abstractmethod
async def clear_session(self, session_id: str) -> None: ...
# Concrete defaults (override to enable)
async def save_summary(self, session_id, summary) -> None: ... # raises NotImplementedError
async def get_summary(self, session_id) -> Summary | None: ... # raises NotImplementedError
async def store_memory(self, session_id, memory, user_id=None) -> None: ...
async def recall_memory(self, session_id, key) -> Memory | None: ...
async def search_memories(self, session_id, query, limit=5) -> list[Memory]: ...
async def search_memories_by_user(self, user_id, query, limit=10) -> list[Memory]: ...
async def get_messages_paginated(self, session_id, cursor=None, limit=50, user_id=None) -> PaginatedResult: ...
async def update_memory(self, session_id, memory_id, updates, user_id=None) -> Memory | None: ...
async def delete_memory(self, session_id, memory_id, user_id=None) -> bool: ...
async def list_memories(self, session_id, cursor=None, limit=20) -> PaginatedResult[Memory]: ...
# Utility
async def health_check(self) -> dict[str, Any]: ...
def get_capabilities(self) -> set[str]: ...
async def close(self) -> None: ...
# Supports async with
class InMemorySessionManager(SessionManager):
"""Full implementation for dev/testing. Per-session asyncio.Lock guards."""
class SemanticSearchMixin(ABC):
"""Mix-in for vector-based memory search. Must appear first in MRO."""
@abstractmethod
async def embed_text(self, text: str) -> list[float]: ...
async def search_memories_semantic(self, session_id, query, limit=5) -> list[Memory]: ...Data Models (all frozen=True, extra="forbid"):
class SessionMessage(BaseModel):
id: str # UUID
session_id: str
role: Literal["user", "assistant", "ai", "system"]
content: str
timestamp: datetime
user_id: str | None = None
metadata: dict[str, Any] = {}
def to_langchain(self) -> BaseMessage: ...
class Memory(BaseModel):
id: str
session_id: str
key: str
value: str
importance: float # ge=0.0, le=1.0
user_id: str | None = None
expires_at: datetime | None = None
# ... categories, metadata, source_agent, etc.
class Summary(BaseModel):
id: str
session_id: str
content: str
user_id: str | None = None
class PaginatedResult(BaseModel, Generic[T]):
items: list[T]
next_cursor: str | None
has_more: boolPatterns:
SessionMessage.rolevalidated at construction:"user","assistant","ai","system".to_langchain()maps roles:"user"->HumanMessage,"assistant"/"ai"->AIMessage,"system"->SystemMessage.InMemorySessionManageruses per-sessionasyncio.Lockfor safe concurrent access.close()sets_closedflag and clears data; subsequent operations raiseRuntimeError.recall_memory()filters expired memories at read time.update_memory()anddelete_memory()enforce ownership viauser_idcheck.get_messages()andget_messages_paginated()raiseValueErrorwhenlimit < 1.
Dependencies: pydantic, langchain-core
Extension Points: SessionManager ABC (implement 3 abstract methods, override optional methods as needed), SemanticSearchMixin (implement embed_text()).
2.3.6 routing/ -- Topology and Graph Building #
Purpose: Validate agent-to-agent routes against a declared topology and provide helper functions that eliminate LangGraph graph-wiring boilerplate.
Files:
topology.py--TopologyResolvergraph_builder.py--build_hub_spoke_graph(),build_topology_graph(),build_conditional_edge()
Key Classes:
class TopologyResolver:
"""Validates allowed routes from topology config."""
def __init__(self, config: TopologyConfig): ...
def get_allowed_targets(self, from_agent: str) -> list[str]: ...
@property
def default_return(self) -> str: ...Functions:
def build_conditional_edge(
topology: TopologyResolver, from_agent_name: str
) -> Callable[[OrchestratorState], str]:
"""Returns a routing function for StateGraph.add_conditional_edges().
Returns END when current_agent == from_agent_name (termination sentinel).
Falls back to default_return for unknown targets."""
def build_hub_spoke_graph(
supervisor, agents: dict[str, Any], topology: TopologyResolver,
*, state_schema=OrchestratorState
) -> CompiledStateGraph:
"""Hub-and-spoke: all workers route back to supervisor."""
def build_topology_graph(
agents: dict[str, Any], topology: TopologyResolver,
*, entry_point: str, state_schema=OrchestratorState
) -> CompiledStateGraph:
"""Flexible: edges match topology exactly. Supports mesh, hybrid."""Patterns:
- Termination sentinel: when an agent sets
current_agentto its own name (i.e.,next_agent is None),build_conditional_edgereturnsEND. get_allowed_targets()includesdefault_returnin the returned list.RoutingSelfLoopErrorraised when supervisor tries to route to itself.
Dependencies: core/ (state, exceptions), langgraph
Extension Points: None. Developers use the provided graph builders or wire LangGraph graphs manually.
2.3.7 knowledge/ -- Knowledge Module System #
Purpose: Per-agent knowledge configuration (react_config.yaml), filesystem-based module loading, prompt assembly, and dependency resolution between knowledge slots.
Files:
schema.py--ReactConfigSchema,ReactSlotSchema,ReactActionSchema(Pydantic validation)models.py--KnowledgeSlotDef,KnowledgeLoadResultloader.py--KnowledgeLoaderABC,FileSystemKnowledgeLoadertemplate_manager.py--ReactTemplateManager
Key Classes:
class ReactConfigSchema(BaseModel):
"""frozen=True, extra='forbid'. Validates react_config.yaml."""
max_passes: int = DEFAULT_MAX_PASSES # default 5, gt=0
actions: dict[str, ReactActionSchema]
slots: dict[str, ReactSlotSchema]
# @model_validator: validates dependency references + cycle detection (DFS)
class ReactSlotSchema(BaseModel):
description: str
modules: list[str] # min_length=1, relative .md file paths
dependencies: list[str] # slot names that must load first
triggers: str # hint for when to load
max_tokens: int | None # per-slot token budget cap
class ReactActionSchema(BaseModel):
description: str
parameters: dict[str, str] # param name -> description
required: list[str]
DEFAULT_MAX_PASSES: int = 5
class KnowledgeLoader(ABC):
@abstractmethod
async def load_module(self, module_ref: str) -> str: ...
@abstractmethod
def resolve_module_path(self, module_ref: str) -> Path: ...
class FileSystemKnowledgeLoader(KnowledgeLoader):
"""Enforces base_dir containment. Rejects absolute paths and traversal."""
class ReactTemplateManager:
"""Per-agent config + slot loading. Async-safe with asyncio.Lock."""
@classmethod
async def create(cls, ...) -> ReactTemplateManager: ... # async factory
async def load_slot(self, slot_name: str) -> KnowledgeLoadResult: ...
async def load_core_prompt(self, agent_name, **kwargs) -> str: ...
def format_available_slots(self) -> str: ...
def format_available_actions(self) -> str: ...
async def reset(self) -> None: ...
async def close(self) -> None: ...
@property
def max_passes(self) -> int: ...
@property
def token_counter(self) -> TokenCounter: ...
# Supports async withPatterns:
ReactConfigSchemahas a merged@model_validatorthat does both reference validation and DFS cycle detection.FileSystemKnowledgeLoader.resolve_module_path()enforces base_dir containment; raisesKnowledgePathErrorfor path traversal.ReactTemplateManager.load_slot()uses double-check cache pattern underasyncio.Lock.- Per-slot
max_tokensenforcement: checked after loading, before global budget check. - Global
max_loaded_tokensbudget:load_slot()raisesKnowledgeLoadErrorif exceeded. create()async factory wraps config file I/O inasyncio.to_thread.format_available_actions()renders parameter descriptions and required lists.- Closed guard: post-close operations raise
RuntimeError.
Dependencies: core/ (token counting, exceptions), pydantic, pyyaml
Extension Points: KnowledgeLoader ABC (implement for non-filesystem sources like S3, databases).
2.3.8 react/ -- ReAct Execution Engine #
Purpose: Multi-pass thought/action/observation loop that drives ReAct-mode agents.
Files:
executor.py--ReactExecutormodels.py--ReactParsedResponse,ReactStructuredResponse,TerminalActionResult,ActionHandler
Key Classes:
class ReactExecutor:
def __init__(
self,
agent_name: str,
llm: BaseLanguageModel,
template_manager: ReactTemplateManager,
action_handlers: dict[str, ActionHandler],
*,
max_passes: int | None = None,
retry_config: RetryConfig | None = None,
timeout_config: TimeoutConfig | None = None,
max_consecutive_errors: int | None = None,
max_history_tokens: int | None = None,
rate_limiter: RateLimiter | None = None,
cancel_event: asyncio.Event | None = None,
use_structured_output: bool = False,
total_request_seconds: float | None = None,
) -> None: ...
async def execute(
self, user_input: str, context: dict[str, Any],
config: RunnableConfig | None = None, *, request_id: str | None = None
) -> dict[str, Any]: ...
class ReactParsedResponse:
thought: str
action: str
action_params: dict[str, Any]
class ReactStructuredResponse(BaseModel):
"""Pydantic model for llm.with_structured_output()."""
thought: str
action: str
action_params: dict[str, Any]
class TerminalActionResult(Exception):
"""Raised by action handlers to signal loop termination."""
def __init__(self, result: dict[str, Any]): ...
# ActionHandler is a Protocol with optional config param
ActionHandler = Callable[[dict, dict, RunnableConfig | None], Awaitable[tuple[str, dict]]]Pass Loop (per pass):
- Check cancellation event.
- Build prompt: static core prompt + dynamic per-pass prompt (user input, loaded knowledge, history).
- Invoke LLM (with retry, rate limiting, timeout).
- Accumulate token usage.
- Parse response: XML (
<thought>,<action type="...">) or structured (ReactStructuredResponse). - Look up action handler. If unknown, increment consecutive errors and continue.
- Execute handler. If
TerminalActionResultraised, return the result. Otherwise, collect observation. - On
max_passesexhaustion, raiseReactMaxPassesError.
Reserved State Keys: _user_input, _pass_number, _previous_thoughts, _observations, _loaded_slots, _loaded_slot_contents. Overwriting these from context or handler state_updates raises ReservedStateKeyError.
Patterns:
- XML parsing strips
<thought>before searching for<action>to prevent injection. - History truncation: walks from newest to oldest, keeping pairs within
max_history_tokensbudget. - Structured output capability is cached after first probe -- avoids N redundant calls per N-pass loop.
total_request_secondswraps the entire passes loop inasyncio.timeout().- Rate limiter uses
acquiredflag pattern to preventrelease()whenacquire()fails. - Non-terminal handler exceptions become observations so the loop continues.
Dependencies: core/ (LLM utils, exceptions, rate limiting), knowledge/ (template manager)
Extension Points: Action handlers (any async callable matching the ActionHandler protocol).
2.3.9 observability/ -- OpenTelemetry Instrumentation #
Purpose: Optional OpenTelemetry integration. All operations are zero-overhead no-ops when the opentelemetry-api package is not installed.
Files:
otel.py--OTEL_AVAILABLE,get_tracer(),otel_span(),sync_otel_span()middleware.py--OpenTelemetryMiddleware
Key Exports:
OTEL_AVAILABLE: bool # True if opentelemetry-api is importable
def get_tracer(name: str = "graphcrew") -> Tracer: ...
async def otel_span(name: str, **attributes) -> AsyncContextManager: ...
def sync_otel_span(name: str, **attributes) -> ContextManager: ...
class OpenTelemetryMiddleware(AgentMiddleware):
"""Creates spans for agent invocations. Per-invocation state in contextvars."""Patterns:
- Install via
pip install graphcrew[otel]. get_tracer()returns a real tracer when OTel is available, a no-op stub otherwise.otel_span()/sync_otel_span()create spans with arbitrary attributes.- Middleware state stored in
contextvars.ContextVarfor concurrency safety.
Dependencies: opentelemetry-api (optional, >=1.24,<2.0)
Extension Points: None directly. Developers configure OTel exporters and processors at the application level.
2.3.10 testing/ -- Test Utilities #
Purpose: Provide mock objects and factory helpers for unit and integration tests. Not intended for production use.
Files:
mock_llm.py--MockLLMfactories.py--make_test_state()__init__.py-- Re-exportsInMemorySessionManager
Key Exports:
class MockLLM:
"""Returns scripted responses without real API calls."""
# Supports:
# raise_on_call: exception class to raise
# stream_chunks: per-call chunk lists for astream
# structured_responses: FIFO list for with_structured_output
# usage_tokens: dict attached to every response
def make_test_state(user_input: str, **kwargs) -> OrchestratorState:
"""Creates a minimal OrchestratorState for testing."""
# Re-exported for convenience:
InMemorySessionManagerPatterns:
MockLLMsupports multiple response modes for testing different code paths (normal, streaming, structured, error).make_test_state()wrapscreate_initial_state()with sensible test defaults.
Dependencies: core/, session/
Extension Points: None. These are test utilities.
2.4 Architectural Decisions Record #
All 68 architectural decisions from the project, grouped by category.
Foundation and Philosophy #
AD-1. Library, not application. Developers build their own LangGraph graphs using library components. The library provides building blocks and interfaces, not a turnkey solution.
AD-2. Declarative config. YAML defines agents, topology, and slots. Developers build their own LangGraph graphs using library components wired together via config.
AD-5. LangGraph as execution engine. Chosen over AutoGen/AG2 (fragmented) and Google ADK (too new). LangGraph has 35.9k+ dependents, production use at Klarna/Uber/LinkedIn, and is MIT-licensed.
AD-6. Only 2 built-in agents. Supervisor and SessionManagerAgent. Everything else is developer-provided by design.
AD-21. Code conventions. from __future__ import annotations in every file. Slot uses slots=True. Memory is frozen. All dataclasses use frozen=True or slots=True where applicable.
AD-24. __version__ public attribute. Reads from package metadata via importlib.metadata.version(). Included in __all__.
Routing and Topology #
AD-3. Constrained dynamic routing. Agents choose next_agent at runtime, validated against topology via TopologyResolver. Invalid routes fall back to default_return.
AD-22. Agent name and description validation. AgentConfig.name enforces [a-zA-Z][a-zA-Z0-9_-]*, <= 64 chars, rejects __end__/__start__. Description requires min_length=1.
AD-58. Graph builder helpers. build_hub_spoke_graph(), build_topology_graph(), and build_conditional_edge() eliminate LangGraph wiring boilerplate. build_conditional_edge returns END when current_agent == from_agent_name.
AD-61. RoutingSelfLoopError. Raised when supervisor tries to route to itself, replacing the previous warning-and-fallback behavior.
Context and Slots #
AD-4. Schemaless slots with tiered persistence. Arbitrary key-value pairs managed by SlotManager with token budgets and eviction. Three tiers: TURN (end of turn), SESSION (session lifetime), LONG_TERM (cross-session).
AD-8. Tiered slot persistence. DemotionEviction demotes slots through tiers before evicting. SlotPersistence ABC allows any durable backend.
AD-10. Pluggable token counting. TokenCounter ABC with count(text) -> int. Default CharDivisionTokenCounter estimates len(text) // 4. Injected into SlotPool, SlotManager, and ReactTemplateManager.
AD-35. InMemorySlotPersistence. Concrete implementation using dicts with copy.deepcopy for mutation isolation. Ships for dev/testing.
AD-36. SlotTier as StrEnum. Enables SlotTier.TURN == "turn" string comparison directly. Requires Python 3.11+.
AD-54. AgentResponse.slots_tier typed as dict[str, SlotTier]. Uses SlotTier enum instead of str. Backward-compatible via StrEnum equality.
AD-56. max_tokens validation. All max_tokens and max_total_tokens fields have gt=0 constraint.
Session Layer #
AD-5b. Single SessionManager ABC. One interface with 3 abstract methods (messages + lifecycle). Summaries and memories default to NotImplementedError. Override only what you need.
AD-23. SessionMessage.to_langchain() role mapping. "user" -> HumanMessage, "assistant"/"ai" -> AIMessage, "system" -> SystemMessage. Raises ValueError on unknown roles.
AD-42. Health checks. SessionManager.health_check() and SlotPersistence.health_check() return {"status": "ok"} by default. Override for backend-specific connectivity checks.
AD-52. knowledge_context populated by BaseAgent.invoke(). After process(), _loaded_slot_contents is extracted into update["knowledge_context"] and removed from context_slots.
AD-62. InMemorySessionManager parameter validation. get_messages(), get_messages_paginated(), and list_memories() raise ValueError when limit < 1.
AD-66. SemanticSearchMixin. Abstract mix-in for vector-based memory search. Declares embed_text() and search_memories_semantic(). Combine via MRO with SemanticSearchMixin first.
ReAct Engine #
AD-7. ReAct execution engine. Multi-pass thought/action/observation loop. Agents load knowledge on-demand. Optional max_history_tokens sliding window for prompt overflow prevention.
AD-9. External knowledge modules. Per-agent react_config.yaml validated by ReactConfigSchema. Enforces dependency references and circular dependency detection via DFS.
AD-15. Reserved ReactExecutor state keys. 6 internal keys guarded by _RESERVED_STATE_KEYS frozenset. ReservedStateKeyError raised on overwrite attempts.
AD-19. ReAct cancellation. cancel_event: asyncio.Event checked at top of each pass. Raises ReactCancellationError when set.
AD-25. DEFAULT_MAX_PASSES constant. Exported from knowledge/schema.py as DEFAULT_MAX_PASSES = 5.
AD-31. Structured output. Opt-in use_structured_output=True. ReactStructuredResponse Pydantic model. Graceful fallback to XML when LLM does not support with_structured_output().
AD-42b. Knowledge slot loading budget. ReactTemplateManager.max_loaded_tokens enforced in load_slot(). Per-slot max_tokens checked before global budget.
AD-60. TimeoutConfig.total_request_seconds. Wraps the entire ReactExecutor pass loop with a single outer timeout, complementing the per-call llm_call_seconds.
AD-63. Per-slot max_tokens enforcement. ReactTemplateManager.load_slot() checks per-slot token limit after loading content. Was in schema but not enforced at load time previously.
AD-64. Structured output capability cached. ReactExecutor caches whether LLM supports with_structured_output() after first detection call.
Agent Middleware #
AD-16. Agent middleware. AgentMiddleware with before_invoke, after_invoke, on_error hooks. All have concrete defaults. Onion model execution order.
AD-38. MetricsMiddleware. MetricsCollector protocol + middleware recording duration, token usage, and success/error. InMemoryMetricsCollector for dev.
AD-39. TenantIsolationMiddleware. Captures session_id/user_id in before_invoke. Raises TenantIsolationError if process mutates identity fields.
AD-40. CircuitBreakerMiddleware. CLOSED -> OPEN -> HALF_OPEN -> CLOSED state machine. Configurable failure_threshold and recovery_timeout_seconds.
AD-46. Middleware concurrency safety. MetricsMiddleware, TenantIsolationMiddleware, and OpenTelemetryMiddleware use contextvars.ContextVar for per-invocation state.
AD-68. MemoryInjectionMiddleware. Searches session and cross-session memories, deduplicates by key (higher importance wins), injects into state["knowledge_context"]["_injected_memories"]. Never blocks agent execution on failure.
Rate Limiting #
AD-17. Rate limiter ABC. acquire(), release(), close(). Called before every LLM invocation. release() has concrete no-op default.
AD-34. SemaphoreRateLimiter. Concrete implementation using asyncio.Semaphore. Holds semaphore from acquire() until release(). Optional delay_seconds.
AD-48. RateLimiter async context manager. Supports async with rate_limiter: usage.
AD-55. Rate limiter acquire/release safety. acquired flag prevents release() when acquire() fails.
AD-67. PerTenantRateLimiter. Per-tenant semaphore isolation via OrderedDict. LRU eviction when max_tenants is reached. set_tenant() stores tenant ID in contextvars.ContextVar.
Observability and Logging #
AD-12. Request and per-invocation correlation. request_id (UUID4) + bind_logging_context() binds request_id, session_id, user_id, langgraph_step, langgraph_node into structlog contextvars. All agent lifecycle logs include explicit request_id.
AD-32. OpenTelemetry instrumentation. Optional [otel] extra. Zero-overhead no-ops when not installed. OpenTelemetryMiddleware creates spans for agent invocations.
AD-33. Token usage tracking. AgentResponse.token_usage accumulated across ReAct passes. Propagated to AIMessage.response_metadata.
AD-41. PII redaction. bind_logging_context(redact_pii=True) hashes session_id/user_id with 12-char SHA-256 prefix. Deterministic for log correlation.
Async Safety and Concurrency #
AD-11. Async concurrency safety. Targeted asyncio.Lock guards: ReactTemplateManager (load_slot, reset), SlotManager (hydrate, flush), InMemorySessionManager (per-session locks). SlotPool.total_tokens snapshots dict values. OrchestratorState uses _merge_dicts reducer.
AD-18. Async context manager protocol. close() + async with on SlotManager, ReactTemplateManager, SessionManager, and RateLimiter. Follows aiohttp.ClientSession pattern.
AD-49. Closed guards. SlotManager and ReactTemplateManager have _closed flag. Post-close operations raise RuntimeError. TOCTOU fix in ReactTemplateManager.load_slot() moves budget check inside lock.
AD-37. OrchestratorContext coordinated lifecycle. Manages startup/shutdown of all components. close() tears down in safe order (extras, template managers, rate limiter, slot manager, session manager). Errors logged but never masking.
AD-50. OrchestratorContext.__repr__ and is_closed. __repr__ shows managed component names and open/closed status. is_closed property for health checks.
Configuration and Validation #
AD-13. Immutable config models. All Pydantic config models use ConfigDict(frozen=True, extra="forbid"). Prevents mutation and rejects unknown fields (typos raise ValidationError immediately).
AD-44. load_config() accepts str | Path. Coerces strings to Path internally so callers no longer need explicit Path() wrapping.
AD-53. ConfigLoadError for missing files. Raises ConfigLoadError (not FileNotFoundError) for consistent error hierarchy.
AD-47. Agent constructors forward middleware and rate limiter. SupervisorAgent and SessionManagerAgent accept middleware param (forwarded to BaseAgent). SessionManagerAgent also accepts rate_limiter.
AD-48b. Keyword-only constructor args. Optional params on ReactExecutor, SupervisorAgent, and SessionManagerAgent use * separator to prevent positional argument mistakes.
AD-27. _build_action_handlers startup validation. SessionManagerAgent validates all method names in _CAPABILITY_HANDLER_MAP exist on the class at construction time. Raises AttributeError for missing handlers.
LLM Integration #
AD-14. RunnableConfig threading. RunnableConfig carrying LangSmith/Langfuse/OTel callbacks threaded from invoke() through process(), execute(), to llm.ainvoke(). LangGraph auto-passes config to node functions.
AD-22b. Shared LLM utilities. ainvoke_with_timeout, build_async_retryer, TRANSIENT_EXCEPTIONS in core/llm_utils.py. Retries target OSError/TimeoutError only, not logic errors.
AD-30. Streaming support. Opt-in astream_with_timeout() + BaseAgent.astream_process() (default raises NotImplementedError). SupervisorAgent supports simple mode streaming. ReAct stays non-streaming (XML parsing needs full response).
AD-57. Transient exception registry. register_transient_exception(), deregister_transient_exception(), get_transient_exceptions(). Thread-safe via threading.Lock. Register provider-specific exceptions (e.g., anthropic.RateLimitError) at app startup.
Testing and CI #
AD-28. Dev dependency upper bounds. pytest>=8.0,<9.0, hypothesis>=6.100,<7.0, etc. Prevents surprise major-version breakage.
AD-29. Parametrized and property-based testing. @pytest.mark.parametrize for validation cases. hypothesis for agent name regex exhaustive testing.
AD-43. pip-audit in CI. Vulnerability scanning in lint job after ruff, before mypy.
AD-59. graphcrew.testing subpackage. MockLLM (scripted responses, streaming chunks, structured output, error injection), make_test_state(), InMemorySessionManager re-exported for convenience.
AD-65. mypy applied to tests/. Strict mode with ignore_errors = true override so test files can intentionally pass wrong types to verify error handling.
Debuggability #
AD-26. __repr__ on key classes. BaseAgent, SlotPool, SlotManager, ReactTemplateManager, ReactExecutor. BaseAgent.__repr__ uses type(self).__name__ for subclass accuracy.
Multi-Tenant Support #
AD-39b. BaseAgent.invoke() identity mutation logging. Independently of TenantIsolationMiddleware, invoke() logs a WARNING if process() mutates session_id or user_id.
AD-52b. create_initial_state() defaults. session_id defaults to "default", user_id defaults to "anonymous". These are for single-user development only -- production code MUST pass explicit values.
2.5 Exception Hierarchy #
All exceptions inherit from OrchestratorError. The tree below shows the full hierarchy with brief descriptions of when each is raised.
Exception
|
+-- OrchestratorError # Base for all library exceptions
|
+-- ConfigurationError # Base for config/setup errors
| +-- LLMNotConfiguredError # Agent requires LLM but none provided
| +-- ReactExecutorNotConfiguredError # ReAct mode needs template_manager
| +-- ConfigLoadError* # Config file not found or unreadable
| +-- ConfigValidationError* # Config YAML fails Pydantic validation
|
+-- AgentExecutionError # Base for agent runtime failures
| +-- AgentProcessError # Agent's process() method raised
|
+-- LLMError # Base for LLM call failures
| +-- RateLimitExceededError # Rate limiter rejected the call
| +-- LLMInvocationError # Single LLM ainvoke() call failed
| +-- LLMRetryExhaustedError # All retry attempts exhausted
|
+-- SessionError # Base for session layer errors
| +-- SessionManagerError # SessionManager operation failed
| +-- SessionManagerNotConfiguredError # No session manager provided
|
+-- StateError # Base for state validation errors
| +-- InvalidStateError # Required state key missing
|
+-- ContextError # Base for slot/context errors
| +-- SlotPersistenceError # Persistence load/save failed
|
+-- KnowledgeError # Base for knowledge module errors
| +-- KnowledgeSlotNotFoundError # Requested slot does not exist
| +-- KnowledgePathError # Module path escapes base directory
| +-- KnowledgeLoadError # Module file failed to load
| +-- KnowledgeConfigValidationError # react_config.yaml validation failed
|
+-- ReactError # Base for ReAct execution errors
| +-- ReactCancellationError # ReAct loop cancelled via asyncio.Event
| +-- ReactMaxPassesError # max_passes exceeded without terminal action
| +-- ReservedStateKeyError # Attempted overwrite of reserved state keys
| +-- ReactConsecutiveErrorsError # Max consecutive invalid actions reached
|
+-- RoutingError # Base for routing errors
| +-- RoutingSelfLoopError # Agent tried to route to itself
|
+-- TenantIsolationError # Agent mutated session_id/user_id
|
+-- CircuitBreakerOpenError # Circuit breaker rejecting callsConfigLoadError and ConfigValidationError are defined in config/loader.py and inherit from ConfigurationError.
When Each Exception Is Raised #
| Exception | Trigger |
|---|---|
LLMNotConfiguredError |
Agent constructor called without an LLM when one is required |
ReactExecutorNotConfiguredError |
SupervisorAgent._process_react() called without a ReactTemplateManager |
ConfigLoadError |
load_config() given a nonexistent file path |
ConfigValidationError |
YAML content fails Pydantic validation |
AgentProcessError |
BaseAgent.invoke() catches a non-OrchestratorError from process() |
RateLimitExceededError |
rate_limiter.acquire() raises an exception |
LLMInvocationError |
Single LLM call fails (no retry configured) |
LLMRetryExhaustedError |
All retry attempts fail for an LLM call |
SessionManagerError |
Any SessionManager operation fails (wraps original exception) |
SessionManagerNotConfiguredError |
Session operation requested but no SessionManager provided |
InvalidStateError |
Required key missing from OrchestratorState |
SlotPersistenceError |
hydrate() or flush() persistence backend raises |
KnowledgeSlotNotFoundError |
ReactTemplateManager.load_slot() given an unknown slot name |
KnowledgePathError |
Module path resolves outside the knowledge base directory |
KnowledgeLoadError |
Module file cannot be read, or token budget exceeded |
KnowledgeConfigValidationError |
react_config.yaml fails ReactConfigSchema validation |
ReactCancellationError |
cancel_event.is_set() detected at start of a ReAct pass |
ReactMaxPassesError |
ReAct loop completes max_passes without a terminal action |
ReservedStateKeyError |
context dict or handler state_updates contain reserved keys |
ReactConsecutiveErrorsError |
max_consecutive_errors consecutive unknown/failed actions |
RoutingSelfLoopError |
Supervisor attempts to route to itself |
TenantIsolationError |
TenantIsolationMiddleware detects identity field mutation |
CircuitBreakerOpenError |
CircuitBreakerMiddleware is in OPEN state, rejecting calls |
Error Handling Best Practices #
- Catch
OrchestratorErrorat the application boundary to handle all library errors uniformly. - Catch specific subclasses when you need targeted recovery (e.g.,
RateLimitExceededErrorfor backoff,ReactCancellationErrorfor graceful shutdown). - Never catch
ReservedStateKeyError-- it indicates a programming error (fix the offending handler). LLMRetryExhaustedErrorvsLLMInvocationError-- the former means all retries failed; the latter means a single call failed with no retry configured. Both carry the original exception as.cause.ConfigLoadErrorandConfigValidationErrorshould be caught at startup and surfaced clearly to the developer.
Part 2: Developer Practices (2.6--2.9) #
2.6 Testing Guide #
Test Directory Structure #
tests/
conftest.py # Global fixtures (autouse structlog cleanup)
__init__.py
helpers/
__init__.py
knowledge_helpers.py # setup_supervisor_agent_dir(), setup_minimal_agent_dir()
session_helpers.py # FullSessionManager, make_session_agent_config()
unit/
__init__.py
config/
test_schema.py # Config model validation
test_loader.py # YAML loading, ConfigLoadError
test_defaults.py # Default value verification
test_agent_name_property.py # Hypothesis property-based name validation
test_retry_config.py # Retry/backoff config
test_timeout_config.py # Timeout config validation
core/
test_state.py # OrchestratorState, create_initial_state()
test_types.py # Type aliases and enums
test_exceptions.py # Exception hierarchy
test_logging_context.py # bind_logging_context, PII redaction
test_token_counting.py # CharDivisionTokenCounter + hypothesis
test_llm_utils.py # Retry, transient exceptions, timeouts
test_rate_limiting.py # SemaphoreRateLimiter, PerTenantRateLimiter
test_orchestrator_context.py # OrchestratorContext lifecycle
agents/
test_base.py # BaseAgent ABC, middleware execution
test_supervisor.py # SupervisorAgent routing
test_session_manager_agent.py # SessionManagerAgent capabilities
test_middleware.py # AgentMiddleware onion model
test_metrics.py # MetricsMiddleware, InMemoryMetricsCollector
test_tenant_isolation.py # TenantIsolationMiddleware
test_circuit_breaker.py # CircuitBreakerMiddleware state machine
test_memory_injection.py # MemoryInjectionMiddleware
test_streaming.py # astream_process()
context/
test_slots.py # SlotPool, SlotTier + hypothesis
test_manager.py # SlotManager lifecycle
test_eviction.py # FIFO, LRU, Priority eviction
test_persistence.py # InMemorySlotPersistence
session/
test_interface.py # SessionManager ABC
test_in_memory.py # InMemorySessionManager
test_models.py # SessionMessage, Memory, Summary + hypothesis
test_semantic.py # SemanticSearchMixin
routing/
test_topology.py # TopologyResolver
test_graph_builder.py # build_hub_spoke_graph(), build_conditional_edge()
knowledge/
test_loader.py # FileSystemKnowledgeLoader
test_template_manager.py # ReactTemplateManager lifecycle
test_models.py # ReactActionSchema, ReactSlotSchema
test_react_config_schema.py # react_config.yaml validation
react/
test_executor.py # ReactExecutor multi-pass loop
test_structured_output.py # ReactStructuredResponse, fallback
test_models.py # ReAct response models
observability/
test_otel.py # OpenTelemetry middleware (optional)
test_concurrency.py # asyncio.Lock guards across components
test_testing_subpackage.py # MockLLM, make_test_state validation
test_public_api.py # __all__ exports, __version__
test_subpackage_exports.py # Subpackage re-exports
test_chat.py # run_chat_turn()
integration/
test_agent_e2e.py # End-to-end agent invocation
test_react_integration.py # Full ReAct loop with MockLLM
test_session_integration.py # Session persistence round-trips
test_example_chatbot.py # Smoke test for simple_chatbot example
test_example_react_knowledge.py # Smoke test for react_knowledge_agent example
test_graph_execution.py # Compiled LangGraph execution
test_multi_tenant.py # Multi-tenant isolation, cross-session memory
performance/
__init__.py
test_benchmarks.py # Opt-in benchmarksThe global conftest.py provides a single autouse fixture that clears structlog contextvars before and after every test.
Test Utilities: graphcrew.testing #
The library ships a testing subpackage for writing tests without real API calls:
from graphcrew.testing import MockLLM, make_test_state, InMemorySessionManagerMockLLM -- A BaseChatModel subclass that returns scripted responses in FIFO order.
make_test_state() -- Creates a pre-populated OrchestratorState with sensible defaults.
InMemorySessionManager -- Re-exported from graphcrew.session for convenience in tests.
Writing a Unit Test #
"""Tests for the FooBar component."""
from __future__ import annotations
import pytest
from pydantic import ValidationError
from graphcrew.config.schema import AgentConfig
class TestAgentConfig:
def test_valid_config(self) -> None:
cfg = AgentConfig(name="supervisor", type="supervisor", description="Main")
assert cfg.name == "supervisor"
def test_missing_required_field_raises(self) -> None:
with pytest.raises(ValidationError):
AgentConfig(type="custom", description="No name") # type: ignore[call-arg]
@pytest.mark.parametrize(
("name", "match"),
[
("", None),
("__end__", "reserved"),
("1agent", "invalid"),
],
ids=["empty", "reserved-end", "digit-start"],
)
def test_invalid_names(self, name: str, match: str | None) -> None:
with pytest.raises(ValidationError):
AgentConfig(name=name, type="custom", description="desc")Property-Based Testing with Hypothesis #
The project uses Hypothesis for property-based testing where the input space is large.
Running Tests #
# Full test suite with coverage
uv run pytest --cov=graphcrew --cov-report=term-missing --cov-fail-under=90
# Via Make
make test
# Specific test file
uv run pytest tests/unit/agents/test_supervisor.py -vCoverage #
The suite maintains 98%+ coverage across 1300+ tests. Branch coverage is enabled, and show_missing = true reports uncovered lines. The fail_under = 90 threshold is enforced in CI and make test.
CI Pipeline #
The GitHub Actions CI pipeline runs on every push to main and every pull request.
Lint job (Python 3.13 only):
ruff check-- lintingruff format --check-- format verificationpip-audit-- vulnerability scanningmypy src/ tests/-- strict type checkinguv build-- verify the package builds
Test job (Python 3.11, 3.12, 3.13 matrix, depends on lint):
pytest --cov=graphcrew --cov-report=term-missing --cov-fail-under=90- Upload coverage to Codecov (3.13 only)
2.7 Code Conventions #
Future Annotations #
Every .py file starts with:
from __future__ import annotationsFrozen and Strict Pydantic Models #
All Pydantic config models use:
model_config = ConfigDict(frozen=True, extra="forbid")Keyword-Only Constructor Parameters #
Optional parameters on public constructors use the * separator.
structlog Field Naming #
All log events use agent_name= (not agent=) for consistency.
Ruff Configuration #
| Code | Rule Set | Purpose |
|---|---|---|
E |
pycodestyle errors | Style errors |
F |
Pyflakes | Logical errors |
I |
isort | Import ordering |
N |
pep8-naming | Naming conventions |
W |
pycodestyle warnings | Style warnings |
UP |
pyupgrade | Python version upgrade suggestions |
B |
flake8-bugbear | Common bug patterns |
SIM |
flake8-simplify | Code simplification |
RUF |
Ruff-specific | Ruff own rules |
T20 |
flake8-print | Disallow print statements |
S |
flake8-bandit | Security checks |
C4 |
flake8-comprehensions | Comprehension improvements |
PTH |
flake8-use-pathlib | Prefer pathlib over os.path |
mypy Strict Mode #
[tool.mypy]
python_version = "3.11"
strict = true
warn_return_any = true
warn_unused_configs = true2.8 Contributing Guide #
Prerequisites #
- Python >= 3.11
- uv package manager
Setup #
git clone https://github.com/Atiqul-Islam/swagent.git
cd swagent
uv sync --group dev
pre-commit installDevelopment Commands #
make test # Run tests with coverage (fail-under=90)
make lint # Run ruff linter
make typecheck # Run mypy strict type checking
make format # Auto-format code with ruff
make audit # Run pip-audit vulnerability scan
make all # Run lint + typecheck + audit + test
make clean # Remove build artifacts, cachesRequired Checks #
| Check | Tool | Command |
|---|---|---|
| Lint | ruff | uv run ruff check src/ tests/ examples/ |
| Format | ruff | uv run ruff format --check src/ tests/ examples/ |
| Vulnerabilities | pip-audit | uv run pip-audit |
| Type check | mypy (strict) | uv run mypy src/ tests/ |
| Build | hatchling | uv build |
| Tests | pytest | uv run pytest --cov=graphcrew --cov-fail-under=90 |
How to Add a New Agent #
- Create your agent class extending
BaseAgent - Register it in your YAML config under
agents:withtype: custom - Write unit tests in
tests/unit/agents/and integration tests intests/integration/
Code of Conduct #
The project follows the Contributor Covenant Code of Conduct v2.1.
2.9 Release and Deployment #
Build System #
[build-system]
requires = ["hatchling>=1.25"]
build-backend = "hatchling.build"Versioning #
The version is declared in pyproject.toml. At runtime, graphcrew.__version__ reads from installed package metadata.
Publish Pipeline #
The publish.yml workflow triggers when a GitHub release is published.
Step 1: Test gate (Python 3.11, 3.12, 3.13 matrix)
Step 2: Build and publish
Step 3: Smoke test
Release Process #
- Update the version in
pyproject.toml - Move changelog entries from
[Unreleased]to the new version section - Commit and push to
main - Create a GitHub release with a tag matching the version
- Pipeline runs full test matrix, publishes, and smoke tests
CI vs Publish Pipeline Comparison #
| Check | CI | Publish |
|---|---|---|
| Trigger | Push/PR to main | GitHub release published |
| Lint | Python 3.13 only | All matrix versions |
| Tests | 3.11, 3.12, 3.13 | 3.11, 3.12, 3.13 |
| Package build | Verify only | Build + publish to PyPI |
| Smoke test | No | Yes |
Optional Dependencies #
[project.optional-dependencies]
otel = ["opentelemetry-api>=1.24,<2.0"]Install with: pip install graphcrew[otel]