Part 1: User Guide

graphcrew -- User Guide & Feature Reference #

1.1 Introduction & Overview #

What is graphcrew? #

graphcrew is a Python library built on LangGraph for building production-grade, supervisor-led multi-agent systems. It provides the interfaces, orchestration scaffolding, and configuration system that developers need to compose LLM-powered agents into coordinated workflows.

What it is NOT #

graphcrew is not an application. It does not ship with a web server, CLI tool, or runnable program. It is a library that provides:

You build your own agents, wire them into a graph, and run the graph using LangGraph's execution engine.

Who is it for? #

Developers building multi-agent LLM applications who need:

Core Value Propositions #

Capability Description
Declarative YAML config Define agents, topology, context budgets, retry/timeout policies, and knowledge settings in a single YAML file. Pydantic validates everything at load time.
Constrained dynamic routing Agents choose next_agent at runtime. The topology restricts which transitions are legal. Invalid routes fall back to default_return.
Tiered slot persistence Context slots have TURN, SESSION, and LONG_TERM tiers. Eviction strategies (LRU, FIFO, priority, demotion) keep slots within token budgets. Pluggable SlotPersistence backends persist SESSION and LONG_TERM slots.
ReAct knowledge engine Per-agent react_config.yaml declares actions and knowledge slots. Markdown files are loaded on-demand during the thought/action/observation loop. Token budgets cap loaded knowledge.

1.2 Installation & Quick Start #

Installation #

bash
# With pip
pip install graphcrew

# With uv
uv add graphcrew

Dependencies #

Package Version
langgraph >= 1.0, < 2.0
langchain-core >= 0.3, < 1.0
pydantic >= 2.0, < 3.0
pyyaml >= 6.0, < 7.0
structlog >= 24.0, < 26.0
tenacity >= 8.0, < 10.0

Optional extras:

bash
# OpenTelemetry instrumentation
pip install graphcrew[otel]

Python Version #

Python >= 3.11 is required.

Minimal Example #

1. Define your config (config.yaml):

yaml
agents:
  - name: supervisor
    type: supervisor
    description: Routes user requests to the appropriate agent
  - name: greeter
    type: custom
    description: Greets users and tracks greeting count

topology:
  edges:
    - from_agent: supervisor
      to_agent: greeter
    - from_agent: greeter
      to_agent: supervisor
  default_return: supervisor

context:
  shared_slots:
    max_tokens: 4096
  eviction_strategy: lru

session:
  enabled: false

2. Create a custom agent (agent.py):

python
from graphcrew import AgentResponse, BaseAgent, OrchestratorState
from langchain_core.runnables import RunnableConfig


class GreeterAgent(BaseAgent):
    async def process(
        self,
        state: OrchestratorState,
        config: RunnableConfig | None = None,
    ) -> AgentResponse:
        count = state.get("context_slots", {}).get("greeting_count", 0) + 1
        last_msg = state["messages"][-1].content if state["messages"] else ""

        content = f"Hello! This is greeting #{count}. You said: {last_msg}"

        return AgentResponse(
            agent_name=self.name,
            content=content,
            next_agent="supervisor",
            slots_update={"greeting_count": count},
        )

3. Build and run the graph (main.py):

python
import asyncio
from langchain_openai import ChatOpenAI

from graphcrew import (
    AgentConfig,
    SupervisorAgent,
    TopologyResolver,
    create_initial_state,
    load_config,
)
from graphcrew.routing.graph_builder import build_hub_spoke_graph
from agent import GreeterAgent


async def main():
    config = load_config("config.yaml")
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

    topology = TopologyResolver(config=config.topology)
    agent_configs = {a.name: a for a in config.agents}

    supervisor = SupervisorAgent(
        config=agent_configs["supervisor"],
        llm=llm,
        topology=topology,
    )
    greeter = GreeterAgent(config=agent_configs["greeter"])

    graph = build_hub_spoke_graph(supervisor, {"greeter": greeter}, topology)

    state = create_initial_state("Hello!", session_id="s1", user_id="alice")
    result = await graph.ainvoke(state)
    print(result["messages"][-1].content)


asyncio.run(main())

For a complete runnable example, see examples/simple_chatbot/.


1.3 Core Concepts #

Agents #

Every agent extends BaseAgent (defined in agents/base.py). The only method you must implement is process():

python
class BaseAgent(ABC):
    def __init__(
        self,
        config: AgentConfig,
        template_manager: ReactTemplateManager | None = None,
        react_executor: ReactExecutor | None = None,
        middleware: Sequence[AgentMiddleware] | None = None,
        *,
        redact_pii: bool = False,
    ) -> None: ...

    @abstractmethod
    async def process(
        self,
        state: OrchestratorState,
        config: RunnableConfig | None = None,
    ) -> AgentResponse: ...

    async def invoke(
        self,
        state: OrchestratorState,
        config: RunnableConfig | None = None,
    ) -> dict[str, Any]: ...

AgentResponse is a frozen dataclass:

python
@dataclass(frozen=True, slots=True)
class AgentResponse:
    agent_name: str
    content: str
    next_agent: str | None = None           # None = terminate (no handoff)
    slots_update: dict[str, Any] = field(default_factory=dict)
    slots_tier: dict[str, SlotTier] = field(default_factory=dict)
    token_usage: dict[str, int] | None = None

The library ships two built-in agents:

OrchestratorState #

OrchestratorState is a TypedDict shared across all graph nodes:

python
class OrchestratorState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    context_slots: Annotated[dict[str, Any], _merge_dicts]
    current_agent: str
    knowledge_context: Annotated[dict[str, Any], _merge_dicts]
    session_id: NotRequired[str]
    user_id: NotRequired[str]
    request_id: NotRequired[str]

Create initial state with:

python
state = create_initial_state(
    "Hello!",
    session_id="s1",         # default: "default"
    user_id="alice",          # default: "anonymous"
    current_agent="supervisor",  # default: "supervisor"
)

Topology #

TopologyResolver (defined in routing/topology.py) validates routing decisions against a directed edge graph:

python
topology = TopologyResolver(config=config.topology)

topology.is_allowed("supervisor", "greeter")    # True if edge exists
topology.get_allowed_targets("supervisor")       # Sorted list, always includes default_return
topology.validate(["supervisor", "greeter"])     # Returns warnings for unreachable agents

The topology is defined in YAML:

yaml
topology:
  edges:
    - from_agent: supervisor
      to_agent: worker_a
    - from_agent: supervisor
      to_agent: worker_b
    - from_agent: worker_a
      to_agent: supervisor
    - from_agent: worker_b
      to_agent: supervisor
  default_return: supervisor

Context Slots #

Context slots are schemaless key-value pairs with token budgets and tiered persistence. They are managed by SlotManager (defined in context/manager.py):

Agents write slots through AgentResponse.slots_update and slots_tier. BaseAgent.invoke() merges these into state["context_slots"].

Knowledge Modules #

The knowledge system (in knowledge/) provides per-agent markdown files loaded on-demand during ReAct execution. Each agent has:

ReactTemplateManager loads and validates the config, resolves slot dependencies, and assembles prompts. FileSystemKnowledgeLoader reads files with path containment enforcement.

ReAct Execution #

ReactExecutor (in react/executor.py) implements a multi-pass thought/action/observation loop:

  1. Build prompt from core template + dynamic context (previous thoughts, observations, loaded knowledge).
  2. Call LLM.
  3. Parse response -- either XML (<thought>...</thought><action type="...">...</action>) or structured output (ReactStructuredResponse).
  4. Execute the action handler.
  5. Collect observation, loop until a handler raises TerminalActionResult or max_passes is reached.

Sessions #

SessionManager (in session/interface.py) is an ABC with three abstract methods:

Optional methods (default to NotImplementedError): save_summary, get_summary, store_memory, recall_memory, search_memories, search_memories_by_user, get_messages_paginated, update_memory, delete_memory, list_memories.

InMemorySessionManager ships for dev/testing with all 13 methods implemented.


1.4 Feature Reference #

1. Declarative YAML Configuration #

Define your entire orchestrator in a single YAML file. All fields are validated by Pydantic with frozen=True and extra="forbid" (typos raise ValidationError immediately).

When to use: Every project. YAML config is the entry point for defining agents, topology, context budgets, and operational parameters.

Loading config:

python
from graphcrew import load_config, load_config_from_string, load_config_from_dict

# From a file (accepts str or Path)
config = load_config("config.yaml")

# From a YAML string
config = load_config_from_string(yaml_str)

# From a dict
config = load_config_from_dict({"agents": [...], "topology": {...}})

Complete YAML example with all sections:

yaml
agents:
  - name: supervisor
    type: supervisor
    description: Routes requests to the right agent
    model: gpt-4o           # optional LLM model override
    knowledge_enabled: true  # enable knowledge module system
  - name: researcher
    type: custom
    description: Performs research tasks

topology:
  edges:
    - from_agent: supervisor
      to_agent: researcher
    - from_agent: researcher
      to_agent: supervisor
  default_return: supervisor

context:
  shared_slots:
    max_tokens: 4096        # token budget for the slot pool (must be > 0)
  tiered_slots:
    turn:
      max_tokens: 4096
    session:
      max_tokens: 4096
    long_term:
      max_tokens: 4096
  eviction_strategy: lru    # lru | fifo | priority | demotion

session:
  enabled: true

knowledge:
  enabled: true
  base_dir: llm_resources   # root directory for agent knowledge folders
  max_total_tokens: 8192    # token budget for loaded knowledge (must be > 0)

retry:
  initial_interval: 0.5     # seconds before first retry (must be > 0)
  backoff_factor: 2.0       # exponential base multiplier
  max_interval: 128.0       # maximum seconds between retries
  max_attempts: 3           # total attempts including first (1 = no retry)
  jitter: true              # random jitter to prevent thundering herd

timeout:
  llm_call_seconds: 30.0    # per-call LLM timeout
  file_io_seconds: 5.0      # per-file knowledge load timeout
  persistence_seconds: 10.0 # per-call slot persistence timeout
  total_request_seconds: 120.0 # total timeout across all ReAct passes

Cross-validation: The OrchestratorConfig model validator ensures all topology edges and default_return reference agent names defined in the agents list.


2. Custom Agents #

Extend BaseAgent and implement process() to create any agent.

When to use: Every project. All worker agents are custom agents.

python
from graphcrew import AgentConfig, AgentResponse, BaseAgent, OrchestratorState, SlotTier
from langchain_core.runnables import RunnableConfig


class SummarizerAgent(BaseAgent):
    async def process(
        self,
        state: OrchestratorState,
        config: RunnableConfig | None = None,
    ) -> AgentResponse:
        messages = state["messages"]
        # Your summarization logic here
        summary = f"Summarized {len(messages)} messages."

        return AgentResponse(
            agent_name=self.name,
            content=summary,
            next_agent=None,  # None = terminate, graph ends
            slots_update={"last_summary": summary},
            slots_tier={"last_summary": SlotTier.SESSION},
        )


# Instantiate with a config
agent = SummarizerAgent(
    config=AgentConfig(name="summarizer", type="custom", description="Summarizes conversations")
)

Agent name validation: Names must be 1--64 characters, start with a letter, match [a-zA-Z][a-zA-Z0-9_-]*, and cannot be __end__ or __start__ (LangGraph reserved).


3. Supervisor Routing #

SupervisorAgent routes to worker agents. It supports two modes:

When to use: Every project needs a supervisor (or a custom routing agent).

python
from graphcrew import (
    SupervisorAgent,
    AgentConfig,
    TopologyResolver,
    RetryConfig,
    TimeoutConfig,
    SemaphoreRateLimiter,
    MetricsMiddleware,
    InMemoryMetricsCollector,
)

collector = InMemoryMetricsCollector()

supervisor = SupervisorAgent(
    config=AgentConfig(name="supervisor", type="supervisor", description="Routes requests"),
    llm=llm,
    topology=topology,
    retry_config=RetryConfig(max_attempts=3),
    timeout_config=TimeoutConfig(llm_call_seconds=30.0),
    rate_limiter=SemaphoreRateLimiter(max_concurrency=5),
    use_structured_output=True,
    middleware=[MetricsMiddleware(collector)],
    redact_pii=True,
)

Self-loop detection: if the supervisor routes to itself, RoutingSelfLoopError is raised.


4. Hub-Spoke Topology #

All worker agents route back to the supervisor unconditionally. The supervisor routes to workers via conditional edges.

When to use: Standard supervisor-led architectures where all coordination goes through a central supervisor.

python
from graphcrew.routing.graph_builder import build_hub_spoke_graph

graph = build_hub_spoke_graph(
    supervisor=supervisor,
    agents={"greeter": greeter, "researcher": researcher},
    topology=topology,
)
result = await graph.ainvoke(create_initial_state("Hello"))
yaml
topology:
  edges:
    - from_agent: supervisor
      to_agent: greeter
    - from_agent: supervisor
      to_agent: researcher
    - from_agent: greeter
      to_agent: supervisor
    - from_agent: researcher
      to_agent: supervisor
  default_return: supervisor

5. Mesh Topology #

Workers can route directly to other workers without going through the supervisor.

When to use: When agents need peer-to-peer communication (e.g., a billing agent hands off directly to a support agent).

python
from graphcrew.routing.graph_builder import build_topology_graph

graph = build_topology_graph(
    agents={
        "supervisor": supervisor,
        "billing": billing,
        "support": support,
    },
    topology=topology,
    entry_point="supervisor",
)
yaml
topology:
  edges:
    - from_agent: supervisor
      to_agent: billing
    - from_agent: supervisor
      to_agent: support
    - from_agent: billing
      to_agent: support     # direct worker-to-worker
    - from_agent: support
      to_agent: supervisor
    - from_agent: billing
      to_agent: supervisor
  default_return: supervisor

build_conditional_edge() is also available for manual graph wiring:

python
from graphcrew.routing.graph_builder import build_conditional_edge

routing_fn = build_conditional_edge(topology, "supervisor")
# Returns END when state["current_agent"] == "supervisor" (termination sentinel)
# Falls back to topology.default_return for unknown targets

6. Context Slots #

Schemaless key-value context with token budgets and tiered persistence.

When to use: When agents need to share structured data (user preferences, intermediate results, counters) across turns or sessions.

python
from graphcrew import SlotManager, SlotTier, ContextConfig

manager = SlotManager(config=ContextConfig())

# Shared slots (default tier: TURN)
manager.set_shared("user_name", "Alice")
manager.set_shared_tiered("preference", "dark_mode", tier=SlotTier.SESSION)

# Per-agent slots
manager.set_agent("researcher", "search_query", "quantum computing")

# Promote a slot to a higher tier
manager.promote_slot("user_name", SlotTier.LONG_TERM)

# Serialization
data = manager.to_dict()     # {"shared": {...}, "agents": {...}}
manager.from_dict(data)       # Restore from dict

# Lifecycle
async with SlotManager(config=ContextConfig()) as mgr:
    mgr.set_shared("key", "value")

Agents write slots via AgentResponse:

python
return AgentResponse(
    agent_name=self.name,
    content="Done",
    slots_update={"result": computed_value},
    slots_tier={"result": SlotTier.SESSION},
)

7. Eviction Strategies #

When a slot pool exceeds its max_tokens budget, slots are evicted automatically.

When to use: When you need control over which slots are removed under memory pressure.

Strategy Behavior
LRUEviction Evicts least recently accessed slots (default)
FIFOEviction Evicts oldest created slots
PriorityEviction Evicts lowest priority slots
DemotionEviction Demotes TURN -> SESSION -> LONG_TERM before removing
yaml
context:
  eviction_strategy: demotion  # or lru, fifo, priority

Custom eviction strategies extend EvictionStrategy:

python
from graphcrew import EvictionStrategy, Slot

class CustomEviction(EvictionStrategy):
    def select(self, slots: list[Slot], count: int) -> list[Slot]:
        # Return up to `count` slots to evict
        return sorted(slots, key=lambda s: s.priority)[:count]

8. Slot Persistence #

SlotPersistence is an ABC for persisting SESSION and LONG_TERM slots to durable storage.

When to use: When context must survive process restarts (production deployments).

python
from graphcrew import SlotPersistence, Slot

class PostgresSlotPersistence(SlotPersistence):
    async def load_session_slots(self, session_id: str) -> list[Slot]: ...
    async def save_session_slots(self, session_id: str, slots: list[Slot]) -> None: ...
    async def delete_session_slots(self, session_id: str) -> None: ...
    async def load_long_term_slots(self, user_id: str) -> list[Slot]: ...
    async def save_long_term_slots(self, user_id: str, slots: list[Slot]) -> None: ...
    async def delete_long_term_slots(self, user_id: str) -> None: ...

The library ships InMemorySlotPersistence for dev/testing:

python
from graphcrew import InMemorySlotPersistence, SlotManager, ContextConfig

persistence = InMemorySlotPersistence()
manager = SlotManager(config=ContextConfig(), persistence=persistence)

await manager.hydrate(session_id="s1", user_id="alice")
# ... run your graph ...
await manager.flush(session_id="s1", user_id="alice")
await manager.clear_turn_slots()

SlotManager.hydrate() loads persisted slots at graph start. SlotManager.flush() persists SESSION and LONG_TERM slots at graph end.


9. Knowledge Modules #

Per-agent markdown knowledge files loaded on-demand during ReAct execution.

When to use: When agents need access to structured reference information (API docs, routing rules, policies) that should be loaded selectively to stay within token budgets.

Directory structure:

text
llm_resources/
  supervisor/
    react_config.yaml
    react_core.md
    slots/
      routing_rules.md
      escalation_policy.md

react_config.yaml:

yaml
max_passes: 5

actions:
  request_slot:
    description: Load a knowledge slot for reference
    parameters:
      slot: Name of the slot to load
    required:
      - slot
  decide:
    description: Make a final routing decision
    parameters:
      next_agent: The agent to route to
      content: Summary of reasoning
    required:
      - next_agent

slots:
  routing_rules:
    description: Rules for routing requests to agents
    modules:
      - slots/routing_rules.md
    triggers: When deciding which agent should handle the request
    max_tokens: 2000
  escalation_policy:
    description: Escalation rules
    modules:
      - slots/escalation_policy.md
    dependencies:
      - routing_rules

ReactTemplateManager validates the config at construction, including slot dependency cycle detection. ReactConfigSchema uses a merged validate_slot_dependencies validator that checks both reference validity and circular dependencies via DFS.


10. ReAct Execution Engine #

Multi-pass thought/action/observation loop implemented by ReactExecutor.

When to use: When agents need multi-step reasoning with access to knowledge modules and custom actions.

python
from graphcrew import (
    ReactExecutor,
    ReactTemplateManager,
    FileSystemKnowledgeLoader,
    RetryConfig,
    TimeoutConfig,
)

loader = FileSystemKnowledgeLoader(base_dir="llm_resources")
template_manager = ReactTemplateManager(
    agent_name="supervisor",
    base_dir="llm_resources",
    loader=loader,
    max_loaded_tokens=8192,
)

executor = ReactExecutor(
    agent_name="supervisor",
    llm=llm,
    template_manager=template_manager,
    action_handlers={"request_slot": slot_handler, "decide": decide_handler},
    max_passes=5,
    retry_config=RetryConfig(max_attempts=3),
    timeout_config=TimeoutConfig(llm_call_seconds=30.0, total_request_seconds=120.0),
    max_history_tokens=2000,
    rate_limiter=limiter,
    use_structured_output=True,
)

result = await executor.execute(user_input="Hello", context={"key": "value"}, config=config)

Key features:

Action handlers implement the ActionHandler protocol:

python
class ActionHandler(Protocol):
    async def __call__(
        self,
        params: dict[str, Any],
        state: dict[str, Any],
        config: RunnableConfig | None = None,
    ) -> tuple[str, dict[str, Any]]: ...

Return (observation, state_updates). Raise TerminalActionResult(result_dict) to end the loop.


11. Session Management #

SessionManager ABC with three abstract methods plus optional methods for summaries, memories, and pagination.

When to use: When you need conversation history, summaries, or long-term memory.

python
from graphcrew import InMemorySessionManager, SessionMessage, Memory, Summary

session_mgr = InMemorySessionManager()

# Save messages
await session_mgr.save_messages("s1", [
    SessionMessage(role="user", content="Hello", session_id="s1", user_id="alice"),
])

# Get messages (limit must be >= 1)
messages = await session_mgr.get_messages("s1", limit=50, user_id="alice")

# Store and recall memories
await session_mgr.store_memory("s1", Memory(
    session_id="s1", key="preference", value="dark mode",
    importance=0.8, user_id="alice",
), user_id="alice")

memory = await session_mgr.recall_memory("s1", "preference")

# Search memories (sorted by importance descending, expired memories excluded)
results = await session_mgr.search_memories("s1", "preference", limit=5)

# Cross-session search by user
results = await session_mgr.search_memories_by_user("alice", "preference", limit=10)

# Pagination
page = await session_mgr.get_messages_paginated("s1", cursor=None, limit=20)
# page.items, page.next_cursor, page.has_more

# Update memory (only content fields: value, importance, source_agent, categories, metadata, expires_at)
updated = await session_mgr.update_memory("s1", memory.id, {"importance": 0.9}, user_id="alice")

# Delete memory (with ownership check)
deleted = await session_mgr.delete_memory("s1", memory.id, user_id="alice")

# Lifecycle
await session_mgr.clear_session("s1")  # Clears data but retains session lock
await session_mgr.close()              # Sets _closed flag, clears all data

Capability discovery:

python
caps = session_mgr.get_capabilities()
# InMemorySessionManager returns all 13 methods

Async context manager:

python
async with InMemorySessionManager() as mgr:
    await mgr.save_messages("s1", messages)

SessionMessage.to_langchain() converts to LangChain message types: "user" -> HumanMessage, "assistant"/"ai" -> AIMessage, "system" -> SystemMessage.


12. Multi-Tenant Isolation #

TenantIsolationMiddleware prevents agents from modifying identity fields (session_id, user_id).

When to use: Multi-tenant deployments where agents must not leak data across tenants.

python
from graphcrew import TenantIsolationMiddleware

middleware = TenantIsolationMiddleware()

agent = MyAgent(
    config=agent_config,
    middleware=[middleware],
)

The middleware captures session_id and user_id in before_invoke, then checks both response.slots_update and direct state mutations in after_invoke. Raises TenantIsolationError on identity field changes.

BaseAgent.invoke() also independently logs a WARNING (identity_field_mutated) if process() mutates identity fields in state.


13. Agent Middleware #

AgentMiddleware provides lifecycle hooks around agent execution.

When to use: Cross-cutting concerns like logging, authentication, metrics, circuit breaking, tenant isolation.

python
from graphcrew import AgentMiddleware, OrchestratorState, AgentResponse
from langchain_core.runnables import RunnableConfig


class LoggingMiddleware(AgentMiddleware):
    async def before_invoke(
        self,
        agent_name: str,
        state: OrchestratorState,
        config: RunnableConfig | None = None,
    ) -> OrchestratorState:
        print(f"Before: {agent_name}")
        return state

    async def after_invoke(
        self,
        agent_name: str,
        state: OrchestratorState,
        response: AgentResponse,
        config: RunnableConfig | None = None,
    ) -> AgentResponse:
        print(f"After: {agent_name} -> {response.next_agent}")
        return response

    async def on_error(
        self,
        agent_name: str,
        state: OrchestratorState,
        error: Exception,
        config: RunnableConfig | None = None,
    ) -> None:
        print(f"Error: {agent_name}: {error}")

Execution order (onion model):

Middleware can modify state (in before_invoke) and response (in after_invoke).

Built-in middleware:

Middleware Purpose
MetricsMiddleware Records invocation duration, token usage, errors
TenantIsolationMiddleware Prevents identity field mutation
CircuitBreakerMiddleware CLOSED -> OPEN -> HALF_OPEN state machine
MemoryInjectionMiddleware Injects relevant memories into knowledge_context
OpenTelemetryMiddleware Creates OTel spans around invocations

All built-in middleware use contextvars.ContextVar for per-invocation state, making them safe for concurrent use across agents.


14. Metrics Collection #

MetricsMiddleware records invocation duration, token usage, and errors via a MetricsCollector protocol.

When to use: When you need visibility into agent performance and error rates.

python
from graphcrew import MetricsMiddleware, InMemoryMetricsCollector

collector = InMemoryMetricsCollector()
metrics_mw = MetricsMiddleware(collector)

agent = MyAgent(config=agent_config, middleware=[metrics_mw])

MetricsCollector protocol:

python
class MetricsCollector(Protocol):
    def record_invocation(self, record: InvocationRecord) -> None: ...
    def record_error(self, record: ErrorRecord) -> None: ...

InMemoryMetricsCollector ships for dev/testing. For production, implement MetricsCollector with Prometheus, Datadog, or your preferred metrics backend.

Duration is measured via time.monotonic() for clock-skew safety.


15. Circuit Breaker #

CircuitBreakerMiddleware implements a CLOSED -> OPEN -> HALF_OPEN -> CLOSED state machine.

When to use: When you need to fail fast during sustained failures to avoid wasting LLM spend.

python
from graphcrew import CircuitBreakerMiddleware

cb = CircuitBreakerMiddleware(
    failure_threshold=5,          # failures before opening
    recovery_timeout_seconds=60,  # seconds before half-open probe
)

agent = MyAgent(config=agent_config, middleware=[cb])

When the circuit is open, CircuitBreakerOpenError is raised immediately without calling the agent.


16. Memory Injection #

MemoryInjectionMiddleware automatically searches session and cross-session memories and injects them into state["knowledge_context"]["_injected_memories"].

When to use: When agents should have access to relevant long-term context from previous interactions.

python
from graphcrew import MemoryInjectionMiddleware, InMemorySessionManager

session_mgr = InMemorySessionManager()
memory_mw = MemoryInjectionMiddleware(session_mgr, limit=5)

supervisor = SupervisorAgent(
    config=agent_config,
    llm=llm,
    topology=topology,
    middleware=[memory_mw],
)

Search strategy:

  1. Session-scoped: search_memories(session_id, query) using the last message as query.
  2. Cross-session: search_memories_by_user(user_id, query) only when user_id is set and not "anonymous".
  3. Deduplicate by Memory.key (higher importance wins), sort by importance descending, cap at limit.

NotImplementedError from either search is silently ignored. Any other exception is logged as warning and injection is skipped. The middleware never blocks agent execution.


17. Rate Limiting #

RateLimiter ABC with acquire(), release(), and close() methods. Called before every LLM invocation.

When to use: When you need to control LLM API call concurrency or add delays between requests.

SemaphoreRateLimiter -- Single-tenant semaphore-based limiter:

python
from graphcrew import SemaphoreRateLimiter

limiter = SemaphoreRateLimiter(max_concurrency=5, delay_seconds=0.1)

supervisor = SupervisorAgent(
    config=agent_config, llm=llm, topology=topology,
    rate_limiter=limiter,
)

PerTenantRateLimiter -- Per-tenant semaphore isolation with LRU eviction:

python
from graphcrew import PerTenantRateLimiter

limiter = PerTenantRateLimiter(
    max_concurrency=5,
    delay_seconds=0.0,
    max_tenants=10_000,
)

# In your request handler:
limiter.set_tenant("tenant-abc")
await limiter.acquire()
try:
    # ... LLM call ...
finally:
    await limiter.release()

set_tenant() must be called before acquire() or RuntimeError is raised. When max_tenants is reached, the least-recently-used tenant's limiter is evicted.

All rate limiters support async with for resource cleanup:

python
async with SemaphoreRateLimiter(max_concurrency=5) as limiter:
    # ...

18. Streaming #

SupervisorAgent.astream_process() supports streaming in simple mode (without ReAct).

When to use: Real-time response streaming (e.g., server-sent events in a web app).

python
async for chunk_meta, accumulated in supervisor.astream_process(state, config):
    if chunk_meta["chunk_type"] == "token":
        print(chunk_meta["delta"], end="", flush=True)
    elif chunk_meta["chunk_type"] == "final":
        print(f"\nRouting to: {chunk_meta['next_agent']}")

astream_with_timeout() in core/llm_utils.py provides the underlying streaming primitive:

python
from graphcrew import astream_with_timeout

async for chunk in astream_with_timeout(llm, messages, timeout_seconds=30.0, config=config):
    print(chunk.content, end="")

Streaming is not supported in ReAct mode (XML parsing needs the full response). astream_process() raises NotImplementedError when a ReactExecutor is configured.


19. Structured Output #

Opt-in structured output for ReAct responses using Pydantic models.

When to use: When your LLM supports with_structured_output() and you want to avoid XML parsing.

python
executor = ReactExecutor(
    agent_name="supervisor",
    llm=llm,
    template_manager=template_manager,
    action_handlers=handlers,
    use_structured_output=True,
)

ReactStructuredResponse Pydantic model:

python
class ReactStructuredResponse(BaseModel):
    thought: str
    action: str
    action_params: dict[str, Any] = Field(default_factory=dict)

The executor probes llm.with_structured_output(ReactStructuredResponse) on the first invocation. If the LLM does not support it (AttributeError or NotImplementedError), it falls back to XML parsing. The capability result is cached for all subsequent passes.

The SupervisorAgent simple mode also uses structured output for routing decisions via an internal _RouteDecision model.


20. Observability (OpenTelemetry) #

Optional OpenTelemetry instrumentation via the [otel] extra.

When to use: When you use OpenTelemetry for distributed tracing (LangSmith, Langfuse, Jaeger, Datadog).

bash
pip install graphcrew[otel]
python
from graphcrew.observability import (
    OTEL_AVAILABLE,
    OpenTelemetryMiddleware,
    get_tracer,
    otel_span,
    sync_otel_span,
)

# Middleware: creates spans around agent invocations
otel_mw = OpenTelemetryMiddleware()
agent = MyAgent(config=agent_config, middleware=[otel_mw])

# Manual spans
async with otel_span("my_operation", attributes={"key": "value"}) as span:
    # ... your code ...
    pass

with sync_otel_span("react_pass", agent_name="supervisor", pass_num=1):
    # ... your code ...
    pass

All operations are no-ops when OpenTelemetry is not installed (OTEL_AVAILABLE == False), ensuring zero overhead in environments without OTel. _NoOpSpan and _NoOpTracer handle the fallback.

RunnableConfig (carrying LangSmith/Langfuse/OTel callbacks) is threaded from BaseAgent.invoke() through process(), ReactExecutor.execute(), and down to llm.ainvoke(messages, config=config).


21. Structured Logging #

Request and per-invocation correlation via structlog contextvars.

When to use: Every production deployment. Enables log correlation across agent invocations.

python
from graphcrew import bind_logging_context, clear_logging_context

# bind_logging_context is called automatically by BaseAgent.invoke()
# It binds: request_id, session_id, user_id, langgraph_step, langgraph_node

# App developers must configure structlog with merge_contextvars:
import structlog
structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.dev.ConsoleRenderer(),
    ],
)

# Clear at request boundaries:
clear_logging_context()

PII redaction: Set redact_pii=True on any agent to hash session_id and user_id with 12-char SHA-256 prefix in log output. request_id is left untouched.

python
agent = MyAgent(config=agent_config, redact_pii=True)

Log events emitted by the library:

All log events use agent_name= (not agent=) for field naming consistency. response_metadata on AIMessages carries request_id, session_id, user_id, langgraph_step, and langgraph_node for downstream correlation.


22. Token Counting #

Pluggable TokenCounter ABC with a single count(text) -> int method.

When to use: Injected into SlotPool, SlotManager, ReactTemplateManager for token budget enforcement. Replace CharDivisionTokenCounter with a model-specific counter for production accuracy.

python
from graphcrew import TokenCounter, CharDivisionTokenCounter

# Default: len(text) // 4, minimum 1 for non-empty text, 0 for empty
counter = CharDivisionTokenCounter(chars_per_token=4)
counter.count("hello world")  # 2

# Custom counter (e.g., tiktoken-based)
class TiktokenCounter(TokenCounter):
    def __init__(self, model: str = "gpt-4o"):
        import tiktoken
        self._enc = tiktoken.encoding_for_model(model)

    def count(self, text: str) -> int:
        if not text:
            return 0
        return len(self._enc.encode(text))

Inject into components:

python
slot_pool = SlotPool(token_counter=counter)
slot_manager = SlotManager(config=context_config, token_counter=counter)
template_manager = ReactTemplateManager(
    agent_name="supervisor", base_dir="llm_resources",
    loader=loader, token_counter=counter,
)

23. Error Handling #

All public-facing errors inherit from OrchestratorError. The hierarchy:

text
OrchestratorError
  ConfigurationError
    ConfigLoadError
    ConfigValidationError
    LLMNotConfiguredError
    ReactExecutorNotConfiguredError
  AgentExecutionError
    AgentProcessError
  LLMError
    RateLimitExceededError
    LLMInvocationError
    LLMRetryExhaustedError
  SessionError
    SessionManagerError
    SessionManagerNotConfiguredError
  StateError
    InvalidStateError
  ContextError
    SlotPersistenceError
  KnowledgeError
    KnowledgeSlotNotFoundError
    KnowledgePathError
    KnowledgeLoadError
    KnowledgeConfigValidationError
  ReactError
    ReactCancellationError
    ReactMaxPassesError
    ReservedStateKeyError
    ReactConsecutiveErrorsError
  RoutingError
    RoutingSelfLoopError
  TenantIsolationError
  CircuitBreakerOpenError

All exceptions carry context attributes (e.g., AgentProcessError.agent_name, LLMRetryExhaustedError.attempts). Catch OrchestratorError for broad error handling, or specific subclasses for targeted recovery.


24. Health Checks #

SessionManager.health_check() and SlotPersistence.health_check() return {"status": "ok"} by default.

When to use: Kubernetes liveness/readiness probes, monitoring dashboards.

python
# SessionManager
status = await session_mgr.health_check()
# InMemorySessionManager: {"status": "ok", "sessions": 3}
# When closed: {"status": "closed", "sessions": 0}

# SlotPersistence
status = await persistence.health_check()
# InMemorySlotPersistence: {"status": "ok", "session_slots": 5, "long_term_slots": 2}

Override in your custom implementation to add backend-specific checks (database connectivity, latency, etc.).


25. Transient Exception Registration #

Register provider-specific exception types as retryable.

When to use: When using LLM providers whose rate limit or transient errors are not OSError or TimeoutError.

python
from graphcrew import (
    register_transient_exception,
    deregister_transient_exception,
    get_transient_exceptions,
)

# At app startup
from anthropic import RateLimitError
register_transient_exception(RateLimitError)

# Check current set
print(get_transient_exceptions())  # (OSError, TimeoutError, RateLimitError)

# Deregister (safe in test teardown, no-op if not registered)
deregister_transient_exception(RateLimitError)

build_async_retryer() reads the transient exception set at call time, so newly registered types are picked up immediately. Registration is thread-safe via threading.Lock.

Default transient exceptions: OSError (covers ConnectionError, network errors) and TimeoutError.


SemanticSearchMixin is an abstract mix-in for adding vector-based memory search.

When to use: When you want semantic similarity search on memories using pgvector, Chroma, Pinecone, or another vector database.

python
from graphcrew import SemanticSearchMixin, InMemorySessionManager, Memory

class MySemanticSessionManager(SemanticSearchMixin, InMemorySessionManager):
    async def embed_text(self, text: str) -> list[float]:
        # Your embedding provider
        return await my_embedder.embed(text)

    async def search_memories_semantic(
        self, session_id: str, query: str, limit: int = 5,
    ) -> list[Memory]:
        query_vec = await self.embed_text(query)
        # Your vector similarity search implementation
        return await self._vector_store.search(session_id, query_vec, limit)

SemanticSearchMixin must appear first in the MRO (class MyManager(SemanticSearchMixin, InMemorySessionManager)). It declares one abstract method embed_text(text: str) -> list[float] and a concrete search_memories_semantic() that raises NotImplementedError by default.


27. OrchestratorContext Lifecycle #

OrchestratorContext manages coordinated lifecycle for all orchestrator components.

When to use: Production deployments, especially with FastAPI or other async frameworks where deterministic resource cleanup is needed.

python
from graphcrew import OrchestratorContext

ctx = OrchestratorContext(
    slot_manager=slot_mgr,
    session_manager=session_mgr,
    template_managers=[tm],
    rate_limiter=limiter,
    extras=[custom_resource],
)

async with ctx:
    # Run your graph
    result = await graph.ainvoke(state)

# ctx.close() tears down in safe order:
# extras -> template managers -> rate limiter -> slot manager -> session manager
# Errors are logged but never mask other component cleanup.

# Check status
print(ctx.is_closed)  # True after close()

28. Testing Utilities #

The graphcrew.testing subpackage provides utilities for unit and integration tests.

When to use: Testing agents, supervisors, and ReAct loops without real LLM API calls.

python
from graphcrew.testing import MockLLM, make_test_state, InMemorySessionManager

# MockLLM returns scripted responses
llm = MockLLM(responses=[
    '<thought>Routing to greeter</thought><action type="decide">{"next_agent": "greeter"}</action>',
    "Hello!",
])

# Raises IndexError when exhausted
# Optional: raise_on_call=ValueError to test error handling
# Optional: stream_chunks=[["chunk1", "chunk2"]] for astream testing
# Optional: structured_responses=[obj] for with_structured_output testing
# Optional: usage_tokens={"input_tokens": 10, "output_tokens": 5}

# make_test_state creates a pre-populated OrchestratorState
state = make_test_state("Hello")
# Defaults: session_id="test-session", user_id="test-user"
state = make_test_state("Hello", session_id="s1", user_id="alice")

# InMemorySessionManager re-exported for convenience
session_mgr = InMemorySessionManager()

29. Graph Builders #

Helper functions that eliminate LangGraph graph wiring boilerplate.

When to use: Every project. Use build_hub_spoke_graph for standard supervisor architectures, build_topology_graph for mesh/hybrid topologies.

python
from graphcrew.routing.graph_builder import (
    build_hub_spoke_graph,
    build_topology_graph,
    build_conditional_edge,
)

# Hub-spoke: all workers route back to supervisor
graph = build_hub_spoke_graph(supervisor, {"greeter": greeter}, topology)

# Topology-driven: edges match topology exactly (supports mesh)
graph = build_topology_graph(
    agents={"supervisor": supervisor, "billing": billing, "support": support},
    topology=topology,
    entry_point="supervisor",
)

# Manual edge wiring
routing_fn = build_conditional_edge(topology, "supervisor")
builder.add_conditional_edges("supervisor", routing_fn, edge_map)

build_conditional_edge() routing logic:

Both build_hub_spoke_graph and build_topology_graph accept an optional state_schema parameter (defaults to OrchestratorState).


1.5 Production-Ready Example: DevOps Incident Response System #

This section presents a complete, production-grade multi-agent application built with

graphcrew. The system automates DevOps incident response using five agents

in a mesh topology.

System Overview #

The DevOps Incident Response System receives alert payloads, classifies the incident,

performs root-cause analysis, executes remediation, and generates post-mortem reports.

Five agents collaborate via a mesh topology:

text
                  +-----------+
            +---->| monitor   |----+
            |     +-----------+    |
            |                      v
    +------------+           +--------------+
    | supervisor |<--------->| diagnostician|
    +------------+           +--------------+
            |                      |
            |     +-----------+    |
            +---->| remediator|<---+
            |     +-----------+
            |          |
            |     +-----------+
            +---->| reporter  |
                  +-----------+

Agents:

Agent Role Type
supervisor Orchestrates incident workflow, routes between agents Built-in supervisor
monitor Detects and classifies incidents from raw alert data Custom BaseAgent
diagnostician Performs root-cause analysis using knowledge modules (ReAct) Custom BaseAgent
remediator Executes remediation playbooks using knowledge modules (ReAct) Custom BaseAgent
reporter Generates incident reports and post-mortem summaries Custom BaseAgent

Library features demonstrated:


File: config.yaml #

The declarative YAML configuration defines all five agents, a mesh topology with

direct agent-to-agent edges, context slot budgets, session management, knowledge

module settings, retry/timeout policies, and eviction strategy.

yaml
# config.yaml -- DevOps Incident Response System

agents:
  - name: supervisor
    type: supervisor
    description: >-
      Orchestrates the incident response workflow. Routes incoming alerts to
      the monitor for classification, then to diagnostician for root-cause
      analysis, remediator for fixes, and reporter for post-mortems.
    knowledge_enabled: true

  - name: monitor
    type: custom
    description: >-
      Detects and classifies incidents from raw alert payloads. Determines
      severity (P1-P4) and affected service.

  - name: diagnostician
    type: custom
    description: >-
      Analyzes classified incidents to determine root cause. Uses runbook
      knowledge modules for system-specific diagnostic procedures.
    knowledge_enabled: true

  - name: remediator
    type: custom
    description: >-
      Executes remediation playbooks based on diagnosis. Uses playbook
      knowledge modules for step-by-step remediation procedures.
    knowledge_enabled: true

  - name: reporter
    type: custom
    description: >-
      Generates structured incident reports and post-mortem documents.

topology:
  edges:
    - from_agent: supervisor
      to_agent: monitor
    - from_agent: supervisor
      to_agent: diagnostician
    - from_agent: supervisor
      to_agent: remediator
    - from_agent: supervisor
      to_agent: reporter
    - from_agent: monitor
      to_agent: diagnostician
    - from_agent: monitor
      to_agent: supervisor
    - from_agent: diagnostician
      to_agent: remediator
    - from_agent: diagnostician
      to_agent: supervisor
    - from_agent: remediator
      to_agent: reporter
    - from_agent: remediator
      to_agent: supervisor
    - from_agent: reporter
      to_agent: supervisor
  default_return: supervisor

context:
  shared_slots:
    max_tokens: 8192
  tiered_slots:
    turn:
      max_tokens: 4096
    session:
      max_tokens: 8192
    long_term:
      max_tokens: 4096
  eviction_strategy: lru

session:
  enabled: true

knowledge:
  enabled: true
  base_dir: knowledge
  max_total_tokens: 16384

retry:
  initial_interval: 1.0
  backoff_factor: 2.0
  max_interval: 30.0
  max_attempts: 3
  jitter: true

timeout:
  llm_call_seconds: 30.0
  total_request_seconds: 120.0

File: agents.py #

Four custom agents, each extending BaseAgent with a full process() implementation.

python
"""Custom agents for the DevOps Incident Response System."""
from __future__ import annotations

import json
from typing import Any

from langchain_core.runnables import RunnableConfig

from graphcrew import AgentResponse, BaseAgent, OrchestratorState
from graphcrew.context.slots import SlotTier


class MonitorAgent(BaseAgent):
    """Classifies raw alerts into structured incidents."""

    async def process(
        self,
        state: OrchestratorState,
        config: RunnableConfig | None = None,
    ) -> AgentResponse:
        messages = state.get("messages", [])
        alert_text = str(messages[-1].content) if messages else "unknown alert"

        severity = "P2"
        service = "unknown"
        category = "infrastructure"

        if "cpu" in alert_text.lower() or "memory" in alert_text.lower():
            category = "resource_exhaustion"
            service = "compute"
        elif "latency" in alert_text.lower() or "timeout" in alert_text.lower():
            category = "performance"
            service = "api-gateway"
        elif "error rate" in alert_text.lower() or "5xx" in alert_text.lower():
            category = "availability"
            service = "web-service"
            severity = "P1"

        classification = {
            "severity": severity,
            "service": service,
            "category": category,
            "raw_alert": alert_text[:500],
        }

        return AgentResponse(
            agent_name=self.name,
            content=f"Incident classified: {severity} {category} on {service}",
            next_agent="diagnostician",
            slots_update={
                "incident_classification": json.dumps(classification),
                "incident_severity": severity,
                "incident_service": service,
            },
            slots_tier={
                "incident_severity": SlotTier.SESSION,
                "incident_service": SlotTier.SESSION,
            },
        )


class DiagnosticianAgent(BaseAgent):
    """Performs root-cause analysis using context from the monitor."""

    async def process(
        self,
        state: OrchestratorState,
        config: RunnableConfig | None = None,
    ) -> AgentResponse:
        slots = state.get("context_slots", {})
        classification_raw = slots.get("incident_classification", "{}")
        classification = json.loads(classification_raw) if isinstance(
            classification_raw, str
        ) else classification_raw

        category = classification.get("category", "unknown")
        service = classification.get("service", "unknown")

        root_cause = "unknown"
        recommended_action = "escalate"

        if category == "resource_exhaustion":
            root_cause = f"Resource limits exceeded on {service}"
            recommended_action = "scale_horizontally"
        elif category == "performance":
            root_cause = f"Latency degradation in {service}"
            recommended_action = "circuit_breaker_and_retry"
        elif category == "availability":
            root_cause = f"Service {service} returning errors due to deployment regression"
            recommended_action = "rollback_deployment"

        diagnosis = {
            "root_cause": root_cause,
            "recommended_action": recommended_action,
            "confidence": 0.85,
        }

        return AgentResponse(
            agent_name=self.name,
            content=f"Diagnosis: {root_cause}. Recommended: {recommended_action}",
            next_agent="remediator",
            slots_update={"incident_diagnosis": json.dumps(diagnosis)},
            slots_tier={"incident_diagnosis": SlotTier.SESSION},
        )


class RemediatorAgent(BaseAgent):
    """Executes remediation based on the diagnosis."""

    async def process(
        self,
        state: OrchestratorState,
        config: RunnableConfig | None = None,
    ) -> AgentResponse:
        slots = state.get("context_slots", {})
        diagnosis_raw = slots.get("incident_diagnosis", "{}")
        diagnosis = json.loads(diagnosis_raw) if isinstance(
            diagnosis_raw, str
        ) else diagnosis_raw

        recommended_action = diagnosis.get("recommended_action", "escalate")
        status = "completed"
        steps = ["Executed remediation steps"]

        remediation = {
            "action_taken": recommended_action,
            "steps": steps,
            "status": status,
        }

        return AgentResponse(
            agent_name=self.name,
            content=f"Remediation {status}: {recommended_action}",
            next_agent="reporter",
            slots_update={"incident_remediation": json.dumps(remediation)},
            slots_tier={"incident_remediation": SlotTier.SESSION},
        )


class ReporterAgent(BaseAgent):
    """Generates incident reports from the full incident context."""

    async def process(
        self,
        state: OrchestratorState,
        config: RunnableConfig | None = None,
    ) -> AgentResponse:
        slots = state.get("context_slots", {})
        classification = json.loads(slots.get("incident_classification", "{}"))
        diagnosis = json.loads(slots.get("incident_diagnosis", "{}"))
        remediation = json.loads(slots.get("incident_remediation", "{}"))

        report = f"""# Incident Report
**Severity:** {classification.get('severity', 'P3')}
**Service:** {classification.get('service', 'unknown')}
## Root Cause
{diagnosis.get('root_cause', 'unknown')}
## Remediation
**Action:** {remediation.get('action_taken', 'none')}
**Status:** {remediation.get('status', 'unknown')}"""

        return AgentResponse(
            agent_name=self.name,
            content=report,
            next_agent=None,  # Terminates the graph
            slots_update={"incident_report": report},
            slots_tier={"incident_report": SlotTier.LONG_TERM},
        )

File: app.py #

The FastAPI application ties everything together: OrchestratorContext lifecycle,

PerTenantRateLimiter, mesh topology graph construction, streaming SSE endpoint,

health checks, and structured logging with PII redaction.

python
"""FastAPI application for the DevOps Incident Response System."""
from __future__ import annotations

import uuid
from contextlib import asynccontextmanager
from pathlib import Path

import structlog
from fastapi import FastAPI, Header, HTTPException
from pydantic import BaseModel

from graphcrew import (
    AgentConfig,
    CircuitBreakerOpenError,
    InMemoryMetricsCollector,
    OrchestratorContext,
    OrchestratorError,
    PerTenantRateLimiter,
    RateLimitExceededError,
    SupervisorAgent,
    TopologyResolver,
    clear_logging_context,
    create_initial_state,
    load_config,
)
from graphcrew.routing.graph_builder import build_topology_graph

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.dev.ConsoleRenderer(),
    ],
)


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Manage OrchestratorContext lifecycle."""
    config = load_config(Path("config.yaml"))
    rate_limiter = PerTenantRateLimiter(max_concurrency=5, max_tenants=1000)

    ctx = OrchestratorContext(rate_limiter=rate_limiter)
    app.state.ctx = ctx
    yield
    await ctx.close()


app = FastAPI(
    title="DevOps Incident Response System",
    lifespan=lifespan,
)


@app.post("/incident")
async def create_incident(
    alert: str,
    x_user_id: str = Header(..., alias="X-User-Id"),
):
    clear_logging_context()
    session_id = f"incident-{uuid.uuid4()}"
    state = create_initial_state(alert, session_id=session_id, user_id=x_user_id)

    try:
        result = await app.state.graph.ainvoke(state)
        return {"incident_id": session_id, "report": result["messages"][-1].content}
    except RateLimitExceededError:
        raise HTTPException(status_code=429, detail="Rate limit exceeded")
    except CircuitBreakerOpenError:
        raise HTTPException(status_code=503, detail="Service temporarily unavailable")
    except OrchestratorError as exc:
        raise HTTPException(status_code=500, detail=str(exc))


@app.get("/health")
async def health():
    ctx = app.state.ctx
    return {"status": "ok" if not ctx.is_closed else "closed"}

Running the Application #

bash
# Install dependencies
pip install graphcrew fastapi uvicorn langchain-openai

# Start the server
uvicorn devops_incident.app:app --host 0.0.0.0 --port 8000

# Submit an incident
curl -X POST http://localhost:8000/incident \
  -H "Content-Type: application/json" \
  -H "X-User-Id: ops-team" \
  -d '{"alert": "CRITICAL: 5xx error rate above 10% on web-service"}'

# Health check
curl http://localhost:8000/health

Feature Coverage Summary #

Library Feature Where Demonstrated
Declarative YAML config config.yaml
Mesh topology config.yaml topology edges + app.py build_topology_graph()
Custom agents (BaseAgent) agents.py -- 4 agents with full process()
AgentResponse with slots agents.py -- slots_update, slots_tier, next_agent
SlotTier (SESSION, LONG_TERM) agents.py -- tiered slot persistence
OrchestratorContext lifecycle app.py -- async with + close() teardown
PerTenantRateLimiter app.py -- set_tenant() before graph invocation
Health checks app.py -- health_check() + is_closed
Exception hierarchy app.py -- RateLimitExceededError, CircuitBreakerOpenError, OrchestratorError
create_initial_state() app.py -- explicit session_id and user_id
clear_logging_context() app.py -- request boundary cleanup

1.6 Configuration Reference #

Complete YAML Schema #

All configuration models use ConfigDict(frozen=True, extra="forbid"). Unknown fields cause immediate ValidationError.

OrchestratorConfig (top-level) #

Field Type Default Description
agents list[AgentConfig] required (min 1) Agent definitions
topology TopologyConfig {} Communication topology
context ContextConfig {} Context management settings
session SessionConfig {} Session management settings
knowledge KnowledgeConfig {} Knowledge module system settings
retry RetryConfig {} LLM retry with exponential backoff
timeout TimeoutConfig {} Async timeout settings

Model validator: validate_topology_agents ensures all topology edges and default_return reference agents from the agents list.

AgentConfig #

Field Type Default Validation
name str required 1--64 chars, [a-zA-Z][a-zA-Z0-9_-]*, not __end__/__start__
type Literal["supervisor", "session_manager", "custom"] required
description str required min_length=1
model `str \ None` None LLM model name override
knowledge_enabled bool False Enable knowledge module system

TopologyConfig #

Field Type Default
edges list[TopologyEdgeConfig] []
default_return str "supervisor"

TopologyEdgeConfig #

Field Type
from_agent str
to_agent str

ContextConfig #

Field Type Default
shared_slots SlotConfig {max_tokens: 4096}
tiered_slots TieredSlotConfig (see below)
eviction_strategy Literal["lru", "fifo", "priority", "demotion"] "lru"

SlotConfig #

Field Type Default Validation
max_tokens int 4096 gt=0

TieredSlotConfig #

Field Type Default
turn TierConfig {max_tokens: 4096}
session TierConfig {max_tokens: 4096}
long_term TierConfig {max_tokens: 4096}

TierConfig #

Field Type Default Validation
max_tokens int 4096 gt=0

SessionConfig #

Field Type Default
enabled bool True

KnowledgeConfig #

Field Type Default Validation
enabled bool False
base_dir str "llm_resources"
max_total_tokens int 8192 gt=0

RetryConfig #

Field Type Default Validation Description
initial_interval float 0.5 gt=0 Seconds before first retry
backoff_factor float 2.0 gt=0 Exponential base multiplier
max_interval float 128.0 gt=0 Maximum seconds between retries
max_attempts int 3 ge=1 Total attempts including first (1 = no retry)
jitter bool True Random jitter (AWS Full Jitter algorithm)

Uses tenacity's wait_random_exponential when jitter=True, wait_exponential_jitter when jitter=False.

TimeoutConfig #

All values in seconds. None disables the timeout (default for all fields). Production guidance: set at 2--3x your p99 observed latency.

Field Type Default Validation
llm_call_seconds `float \ None` None gt=0
file_io_seconds `float \ None` None gt=0
persistence_seconds `float \ None` None gt=0
total_request_seconds `float \ None` None gt=0

react_config.yaml Schema #

Per-agent config file at <base_dir>/<agent_name>/react_config.yaml:

ReactConfigSchema #

Field Type Default Validation
max_passes int 5 (from DEFAULT_MAX_PASSES) gt=0
actions dict[str, ReactActionSchema] {}
slots dict[str, ReactSlotSchema] {}

Model validator: validate_slot_dependencies checks reference validity (rejects unknown/self dependencies) and detects cycles via DFS.

ReactActionSchema #

Field Type Default
description str required
parameters dict[str, str] {}
required list[str] []

ReactSlotSchema #

Field Type Default Validation
description str required
modules list[str] required min_length=1
dependencies list[str] []
triggers str ""
max_tokens `int \ None` None gt=0

1.7 Migration & Compatibility #

Python Version #

Python >= 3.11 is required. The library uses:

CI tests on Python 3.11, 3.12, and 3.13.

Dependency Constraints #

Dependency Range
langgraph >= 1.0, < 2.0
langchain-core >= 0.3, < 1.0
pydantic >= 2.0, < 3.0
pyyaml >= 6.0, < 7.0
structlog >= 24.0, < 26.0
tenacity >= 8.0, < 10.0
opentelemetry-api (optional) >= 1.24, < 2.0

Dev dependencies have upper bounds to prevent surprise major-version breakage (e.g., pytest>=8.0,<9.0, hypothesis>=6.100,<7.0).

Frozen Config Models #

All Pydantic config models use ConfigDict(frozen=True, extra="forbid"). This means:

Backward Compatibility #

Part 2: Developer Guide

graphcrew -- Part 2: Architecture Deep Dive #

Library version: 0.1.0 | Python: >= 3.11 | Engine: LangGraph >= 1.0 | License: MIT


2.1 Development Environment Setup #

Prerequisites #

Tool Version Purpose
Python >= 3.11 Runtime (StrEnum, ExceptionGroup, asyncio.TaskGroup)
uv latest Package manager (replaces pip + virtualenv)
git >= 2.30 Version control
make any Task runner (make all = lint + typecheck + test)

Clone and Install #

bash
git clone https://github.com/Atiqul-Islam/swagent.git
cd swagent

# Create virtual environment and install all deps (including dev)
uv sync --all-extras --dev

Verify Installation #

bash
# Run the full quality gate (lint + typecheck + tests)
make all

# Or run individually:
uv run ruff check src/ tests/          # Linting
uv run ruff format --check src/ tests/ # Formatting
uv run mypy src/ tests/                # Type checking (strict)
uv run pytest                          # 1300+ tests, 98%+ coverage

IDE Setup #

VS Code (recommended):

PyCharm:

Optional Extras #

bash
# OpenTelemetry instrumentation support
uv sync --extra otel

2.2 Architecture Overview #

Layer Diagram (8 Layers) #

text
+---------------------------------------------------------------------------+
|                  graphcrew (library)                                |
|                                                                           |
|  +---------------------------------------------------------------------+ |
|  |  1. Configuration                                                    | |
|  |  YAML -> Pydantic -> agents, topology, slots, knowledge             | |
|  +----------------------------+----------------------------------------+ |
|                               |                                           |
|                               v                                           |
|  +---------------------------------------------------------------------+ |
|  |  2. Core                                                             | |
|  |  OrchestratorState, types, exceptions                                | |
|  +--------+------------------------------+-----------------------------+ |
|           |                              |                                |
|           v                              v                                |
|  +----------------+       +-------------------------------------------+  |
|  | 3. Built-in    |       | Routing                                   |  |
|  | Agents         |       | TopologyResolver (validates)              |  |
|  |                |       |                                           |  |
|  | Supervisor     |       |                                           |  |
|  | SessionMgr     |       +-------------------------------------------+  |
|  +----------------+                                                       |
|                                                                           |
|  +---------------------------------------------------------------------+ |
|  |  4. Slot-Based Context Management                                    | |
|  |  SlotManager -> per-agent pools + shared pool                        | |
|  |  SlotTier (TURN / SESSION / LONG_TERM)                               | |
|  |  TokenCounter (ABC) -> pluggable token estimation                    | |
|  |  EvictionStrategy (ABC) -> LRU / FIFO / Priority / Demotion         | |
|  |  SlotPersistence (ABC) -> pluggable durable storage                  | |
|  +---------------------------------------------------------------------+ |
|                                                                           |
|  +---------------------------------------------------------------------+ |
|  |  5. Session Layer                                                     | |
|  |  SessionManager (ABC: 3 abstract methods)                            | |
|  |  Summaries + memories default to NotImplementedError                 | |
|  |  InMemorySessionManager ships for dev/testing (per-session locks)    | |
|  +---------------------------------------------------------------------+ |
|                                                                           |
|  +---------------------------------------------------------------------+ |
|  |  6. Knowledge Module System                                          | |
|  |  KnowledgeLoader (ABC) + FileSystemKnowledgeLoader                   | |
|  |  ReactTemplateManager (per-agent config + slot loading)              | |
|  |  ReactConfigSchema (Pydantic validation for react_config.yaml)       | |
|  +---------------------------------------------------------------------+ |
|                                                                           |
|  +---------------------------------------------------------------------+ |
|  |  7. ReAct Execution Engine                                           | |
|  |  ReactExecutor (multi-pass loop + token-budgeted history window)    | |
|  |  ReactParsedResponse, TerminalActionResult, ActionHandler            | |
|  +---------------------------------------------------------------------+ |
|                                                                           |
|  +---------------------------------------------------------------------+ |
|  |  8. Extension Points (what developers implement)                     | |
|  |  BaseAgent (ABC) | SessionManager (ABC) | KnowledgeLoader (ABC)     | |
|  |  EvictionStrategy (ABC) | SlotPersistence (ABC) | TokenCounter (ABC)| |
|  |  AgentMiddleware | RateLimiter (ABC) | YAML config                | |
|  +---------------------------------------------------------------------+ |
+---------------------------------------------------------------------------+

Request Flow #

text
User Input
    |
    v
[Supervisor Node] --- reads context slots ---> [SlotManager]
    |                                               |
    | (conditional edge: route_decision)            | (optional)
    v                                               v
[Agent-X Node] <--- historical context --- [SessionManagerAgent]
    |                                           |
    | (handoff / return)                        v
    v                                     [SessionManager impl]
[Supervisor Node]                          (developer-provided)
    |
    v
Final Response
  1. User input enters the Supervisor node.
  2. Supervisor reads context from SlotManager and optionally retrieves history via SessionManagerAgent.
  3. Supervisor selects the target agent, constrained by the topology configuration.
  4. The target agent executes. It may hand off to another agent or return to the supervisor.
  5. Supervisor aggregates results and produces the final response.

Layer Responsibilities #

Layer Responsibility
1. Configuration YAML files parsed into frozen Pydantic models. Validates agents, topology edges, slot budgets, knowledge paths, retry/timeout settings.
2. Core OrchestratorState TypedDict with dict-merge reducers for safe parallel fan-in. Shared types (AgentResponse), exception hierarchy, token counting ABC, LLM utilities (retry, timeout, streaming), rate limiting, structured logging context, OrchestratorContext lifecycle manager.
3. Agent Runtime BaseAgent ABC with middleware hooks, SupervisorAgent (simple + ReAct + streaming modes), SessionManagerAgent (ReAct agent using session methods as tools), routing via TopologyResolver.
4. Context Management Tiered slot pools (TURN/SESSION/LONG_TERM) with configurable eviction (LRU, FIFO, priority, demotion) and pluggable persistence backends.
5. Session Layer SessionManager ABC with 3 abstract methods (messages + lifecycle). Summaries and memories opt-in via override. InMemorySessionManager for dev/testing with per-session async locks. SemanticSearchMixin for vector search.
6. Knowledge Per-agent react_config.yaml defining knowledge slots and actions. ReactTemplateManager handles validation, dependency resolution, and prompt assembly with async-safe caching.
7. ReAct Engine Multi-pass thought/action/observation loop. Each pass: build prompt, call LLM, parse XML or structured response, execute action handler, collect observation. Loops until terminal action or max_passes.
8. Extension Points ABCs and protocols that developers implement: BaseAgent, SessionManager, KnowledgeLoader, SlotPersistence, EvictionStrategy, TokenCounter, AgentMiddleware, RateLimiter.

2.3 Package-by-Package Documentation #

2.3.1 config/ -- Configuration #

Purpose: Load YAML configuration files, validate them with Pydantic, and provide typed config objects to the rest of the library.

Files:

Key Classes:

python
class OrchestratorConfig(BaseModel):
    """Top-level config. frozen=True, extra='forbid'."""
    agents: list[AgentConfig]
    topology: TopologyConfig
    context: ContextConfig
    session: SessionConfig
    knowledge: KnowledgeConfig
    retry: RetryConfig
    timeout: TimeoutConfig

class AgentConfig(BaseModel):
    """Per-agent config with name validation."""
    name: str          # [a-zA-Z][a-zA-Z0-9_-]*, <= 64 chars, no reserved names
    type: Literal["supervisor", "session_manager", "custom"]
    description: str   # min_length=1
    model: str | None
    knowledge_enabled: bool

class TopologyConfig(BaseModel):
    edges: list[TopologyEdgeConfig]
    default_return: str  # default "supervisor"

class TimeoutConfig(BaseModel):
    llm_call_seconds: float | None
    file_io_seconds: float | None
    persistence_seconds: float | None
    total_request_seconds: float | None

class RetryConfig(BaseModel):
    initial_interval: float   # 0.5
    backoff_factor: float     # 2.0
    max_interval: float       # 128.0
    max_attempts: int         # 3 (ge=1; 1 = no retry)
    jitter: bool              # True (AWS Full Jitter via tenacity)

Functions:

python
def load_config(path: str | Path) -> OrchestratorConfig: ...
def load_config_from_dict(data: dict) -> OrchestratorConfig: ...
def load_config_from_string(yaml_str: str) -> OrchestratorConfig: ...
def get_default_config() -> OrchestratorConfig: ...

Patterns:

Dependencies: pydantic, pyyaml

Extension Points: None. Config models are final. Developers write YAML files that conform to the schema.


2.3.2 core/ -- Core Types and Infrastructure #

Purpose: Shared state definition, exception hierarchy, token counting, LLM call utilities, rate limiting, logging context, and the OrchestratorContext lifecycle manager.

Files:

Key Classes:

python
class OrchestratorState(TypedDict, total=False):
    """LangGraph state. Uses _merge_dicts reducer for context_slots and knowledge_context."""
    messages: Annotated[list[BaseMessage], add_messages]
    current_agent: str
    context_slots: Annotated[dict[str, Any], _merge_dicts]
    knowledge_context: Annotated[dict[str, Any], _merge_dicts]
    session_id: str
    user_id: str
    request_id: str

@dataclass(frozen=True, slots=True)
class AgentResponse:
    agent_name: str
    content: str
    next_agent: str | None = None
    slots_update: dict[str, Any] = field(default_factory=dict)
    slots_tier: dict[str, SlotTier] = field(default_factory=dict)
    token_usage: dict[str, int] | None = None

class TokenCounter(ABC):
    @abstractmethod
    def count(self, text: str) -> int: ...

class CharDivisionTokenCounter(TokenCounter):
    """max(1, len(text) // chars_per_token) for non-empty; 0 for empty. Default chars_per_token=4."""

class RateLimiter(ABC):
    async def acquire(self) -> None: ...
    async def release(self) -> None: ...  # concrete no-op default
    async def close(self) -> None: ...
    # Also supports async with protocol

class SemaphoreRateLimiter(RateLimiter):
    def __init__(self, *, max_concurrency: int = 1, delay_seconds: float = 0.0): ...

class PerTenantRateLimiter(RateLimiter):
    def __init__(self, *, max_concurrency: int = 1, delay_seconds: float = 0.0, max_tenants: int = 10_000): ...
    def set_tenant(self, tenant_id: str) -> None: ...

class OrchestratorContext:
    """Manages coordinated startup/shutdown of SlotManager, SessionManager,
    ReactTemplateManager, RateLimiter, and extras. Supports async with."""
    async def close(self) -> None: ...
    @property
    def is_closed(self) -> bool: ...

Functions:

python
def create_initial_state(
    user_input: str,
    *,
    current_agent: str = "supervisor",
    session_id: str = "default",
    user_id: str = "anonymous",
) -> OrchestratorState: ...

def bind_logging_context(state, config, *, redact_pii=False) -> None: ...
def clear_logging_context() -> None: ...

async def ainvoke_with_timeout(llm, messages, timeout_seconds, config) -> Any: ...
async def astream_with_timeout(llm, messages, timeout_seconds, config) -> AsyncIterator: ...
def build_async_retryer(retry_config: RetryConfig) -> AsyncRetrying: ...

def register_transient_exception(exc_type: type[Exception]) -> None: ...
def deregister_transient_exception(exc_type: type[Exception]) -> None: ...
def get_transient_exceptions() -> tuple[type[Exception], ...]: ...

Patterns:

Dependencies: langchain-core, structlog, tenacity

Extension Points: TokenCounter ABC, RateLimiter ABC.


2.3.3 agents/ -- Agent Runtime #

Purpose: Define the agent lifecycle (BaseAgent ABC), two built-in agents (SupervisorAgent, SessionManagerAgent), and a middleware system for cross-cutting concerns.

Files:

Key Classes:

python
class BaseAgent(ABC):
    def __init__(
        self,
        config: AgentConfig,
        template_manager: ReactTemplateManager | None = None,
        react_executor: ReactExecutor | None = None,
        middleware: Sequence[AgentMiddleware] | None = None,
        *,
        redact_pii: bool = False,
    ) -> None: ...

    @abstractmethod
    async def process(self, state: OrchestratorState, config: RunnableConfig | None = None) -> AgentResponse: ...
    async def astream_process(self, state, config) -> AsyncGenerator[tuple[dict, str], None]: ...
    async def invoke(self, state: OrchestratorState, config: RunnableConfig | None = None) -> dict[str, Any]: ...

class AgentMiddleware:
    """Override only what you need. All hooks have concrete no-op defaults."""
    async def before_invoke(self, agent_name, state, config) -> OrchestratorState: ...
    async def after_invoke(self, agent_name, state, response, config) -> AgentResponse: ...
    async def on_error(self, agent_name, state, error, config) -> None: ...

class SupervisorAgent(BaseAgent):
    """Built-in supervisor. Simple mode (single LLM call) or ReAct mode."""
    def __init__(self, config, llm, topology, *, template_manager=None,
                 react_executor=None, middleware=None, rate_limiter=None,
                 use_structured_output=False, redact_pii=False): ...
    async def process(self, state, config) -> AgentResponse: ...
    async def astream_process(self, state, config) -> AsyncGenerator: ...

class SessionManagerAgent(BaseAgent):
    """ReAct agent using SessionManager methods as action handlers."""
    def __init__(self, config, llm, session_manager, *,
                 template_manager=None, middleware=None,
                 rate_limiter=None, redact_pii=False): ...

class MetricsMiddleware(AgentMiddleware):
    def __init__(self, collector: MetricsCollector): ...

class TenantIsolationMiddleware(AgentMiddleware):
    """Raises TenantIsolationError if process() mutates session_id/user_id."""

class CircuitBreakerMiddleware(AgentMiddleware):
    """CLOSED -> OPEN -> HALF_OPEN -> CLOSED state machine."""
    def __init__(self, *, failure_threshold: int = 5, recovery_timeout_seconds: float = 30.0): ...

class MemoryInjectionMiddleware(AgentMiddleware):
    """Injects relevant memories into state['knowledge_context']['_injected_memories']."""
    def __init__(self, session_manager: SessionManager, *, limit: int = 5): ...

Patterns:

Dependencies: core/, config/, knowledge/, react/, routing/

Extension Points: BaseAgent ABC (implement process()), AgentMiddleware (override hooks).


2.3.4 context/ -- Slot-Based Context Management #

Purpose: Manage key-value context slots with token budgets, tiered persistence (TURN/SESSION/LONG_TERM), configurable eviction, and pluggable durable storage.

Files:

Key Classes:

python
class SlotTier(StrEnum):
    TURN = "turn"           # Discarded at end of turn
    SESSION = "session"     # Persisted for the session
    LONG_TERM = "long_term" # Persisted across sessions

TIER_ORDER: tuple[SlotTier, ...] = (SlotTier.TURN, SlotTier.SESSION, SlotTier.LONG_TERM)

@dataclass(slots=True)
class Slot:
    key: str
    value: Any
    priority: int
    tier: SlotTier
    created_at: float
    accessed_at: float
    tokens: int

class SlotPool:
    """Token-budgeted collection. Supports len() and 'in' operators."""
    def add(self, key, value, priority=0, *, tier=SlotTier.TURN) -> None: ...
    def get(self, key) -> Any | None: ...          # Updates accessed_at
    def get_slot(self, key) -> Slot | None: ...     # Updates accessed_at
    def remove(self, key) -> bool: ...
    def slots(self) -> list[Slot]: ...
    @property
    def total_tokens(self) -> int: ...

class SlotManager:
    def __init__(self, config: ContextConfig, persistence=None, token_counter=None, timeout_config=None): ...
    def set_shared(self, key, value, priority=0) -> None: ...
    def get_shared(self, key) -> Any | None: ...
    def set_shared_tiered(self, key, value, tier: SlotTier, priority=0) -> None: ...
    def promote_slot(self, key, target_tier: SlotTier) -> bool: ...
    def set_agent(self, agent_name, key, value, priority=0) -> None: ...
    def get_agent(self, agent_name, key) -> Any | None: ...
    async def hydrate(self, session_id, user_id=None) -> None: ...
    async def flush(self, session_id, user_id=None) -> None: ...
    async def clear_turn_slots(self) -> None: ...
    def to_dict(self) -> dict: ...
    def from_dict(self, data: dict) -> None: ...
    async def close(self) -> None: ...
    # Supports async with

class EvictionStrategy(ABC):
    @abstractmethod
    def select(self, slots: list[Slot], count: int) -> list[Slot]: ...

class SlotPersistence(ABC):
    @abstractmethod
    async def load_session_slots(self, session_id: str) -> list[Slot]: ...
    @abstractmethod
    async def save_session_slots(self, session_id: str, slots: list[Slot]) -> None: ...
    @abstractmethod
    async def load_long_term_slots(self, user_id: str) -> list[Slot]: ...
    @abstractmethod
    async def save_long_term_slots(self, user_id: str, slots: list[Slot]) -> None: ...
    async def health_check(self) -> dict[str, Any]: ...

Patterns:

Dependencies: core/ (token counting, exceptions, config schema)

Extension Points: EvictionStrategy ABC, SlotPersistence ABC, TokenCounter ABC (injected).


2.3.5 session/ -- Session Layer #

Purpose: Abstract interface for message persistence, summaries, and memories. Ships InMemorySessionManager for development and SemanticSearchMixin for vector search integration.

Files:

Key Classes:

python
class SessionManager(ABC):
    """3 abstract methods. Everything else raises NotImplementedError by default."""

    # Abstract (must implement)
    @abstractmethod
    async def save_messages(self, session_id: str, messages: list[SessionMessage]) -> None: ...
    @abstractmethod
    async def get_messages(self, session_id: str, limit: int = 50, user_id: str | None = None) -> list[SessionMessage]: ...
    @abstractmethod
    async def clear_session(self, session_id: str) -> None: ...

    # Concrete defaults (override to enable)
    async def save_summary(self, session_id, summary) -> None: ...         # raises NotImplementedError
    async def get_summary(self, session_id) -> Summary | None: ...         # raises NotImplementedError
    async def store_memory(self, session_id, memory, user_id=None) -> None: ...
    async def recall_memory(self, session_id, key) -> Memory | None: ...
    async def search_memories(self, session_id, query, limit=5) -> list[Memory]: ...
    async def search_memories_by_user(self, user_id, query, limit=10) -> list[Memory]: ...
    async def get_messages_paginated(self, session_id, cursor=None, limit=50, user_id=None) -> PaginatedResult: ...
    async def update_memory(self, session_id, memory_id, updates, user_id=None) -> Memory | None: ...
    async def delete_memory(self, session_id, memory_id, user_id=None) -> bool: ...
    async def list_memories(self, session_id, cursor=None, limit=20) -> PaginatedResult[Memory]: ...

    # Utility
    async def health_check(self) -> dict[str, Any]: ...
    def get_capabilities(self) -> set[str]: ...
    async def close(self) -> None: ...
    # Supports async with

class InMemorySessionManager(SessionManager):
    """Full implementation for dev/testing. Per-session asyncio.Lock guards."""

class SemanticSearchMixin(ABC):
    """Mix-in for vector-based memory search. Must appear first in MRO."""
    @abstractmethod
    async def embed_text(self, text: str) -> list[float]: ...
    async def search_memories_semantic(self, session_id, query, limit=5) -> list[Memory]: ...

Data Models (all frozen=True, extra="forbid"):

python
class SessionMessage(BaseModel):
    id: str                    # UUID
    session_id: str
    role: Literal["user", "assistant", "ai", "system"]
    content: str
    timestamp: datetime
    user_id: str | None = None
    metadata: dict[str, Any] = {}
    def to_langchain(self) -> BaseMessage: ...

class Memory(BaseModel):
    id: str
    session_id: str
    key: str
    value: str
    importance: float          # ge=0.0, le=1.0
    user_id: str | None = None
    expires_at: datetime | None = None
    # ... categories, metadata, source_agent, etc.

class Summary(BaseModel):
    id: str
    session_id: str
    content: str
    user_id: str | None = None

class PaginatedResult(BaseModel, Generic[T]):
    items: list[T]
    next_cursor: str | None
    has_more: bool

Patterns:

Dependencies: pydantic, langchain-core

Extension Points: SessionManager ABC (implement 3 abstract methods, override optional methods as needed), SemanticSearchMixin (implement embed_text()).


2.3.6 routing/ -- Topology and Graph Building #

Purpose: Validate agent-to-agent routes against a declared topology and provide helper functions that eliminate LangGraph graph-wiring boilerplate.

Files:

Key Classes:

python
class TopologyResolver:
    """Validates allowed routes from topology config."""
    def __init__(self, config: TopologyConfig): ...
    def get_allowed_targets(self, from_agent: str) -> list[str]: ...
    @property
    def default_return(self) -> str: ...

Functions:

python
def build_conditional_edge(
    topology: TopologyResolver, from_agent_name: str
) -> Callable[[OrchestratorState], str]:
    """Returns a routing function for StateGraph.add_conditional_edges().
    Returns END when current_agent == from_agent_name (termination sentinel).
    Falls back to default_return for unknown targets."""

def build_hub_spoke_graph(
    supervisor, agents: dict[str, Any], topology: TopologyResolver,
    *, state_schema=OrchestratorState
) -> CompiledStateGraph:
    """Hub-and-spoke: all workers route back to supervisor."""

def build_topology_graph(
    agents: dict[str, Any], topology: TopologyResolver,
    *, entry_point: str, state_schema=OrchestratorState
) -> CompiledStateGraph:
    """Flexible: edges match topology exactly. Supports mesh, hybrid."""

Patterns:

Dependencies: core/ (state, exceptions), langgraph

Extension Points: None. Developers use the provided graph builders or wire LangGraph graphs manually.


2.3.7 knowledge/ -- Knowledge Module System #

Purpose: Per-agent knowledge configuration (react_config.yaml), filesystem-based module loading, prompt assembly, and dependency resolution between knowledge slots.

Files:

Key Classes:

python
class ReactConfigSchema(BaseModel):
    """frozen=True, extra='forbid'. Validates react_config.yaml."""
    max_passes: int = DEFAULT_MAX_PASSES  # default 5, gt=0
    actions: dict[str, ReactActionSchema]
    slots: dict[str, ReactSlotSchema]
    # @model_validator: validates dependency references + cycle detection (DFS)

class ReactSlotSchema(BaseModel):
    description: str
    modules: list[str]         # min_length=1, relative .md file paths
    dependencies: list[str]    # slot names that must load first
    triggers: str              # hint for when to load
    max_tokens: int | None     # per-slot token budget cap

class ReactActionSchema(BaseModel):
    description: str
    parameters: dict[str, str]  # param name -> description
    required: list[str]

DEFAULT_MAX_PASSES: int = 5

class KnowledgeLoader(ABC):
    @abstractmethod
    async def load_module(self, module_ref: str) -> str: ...
    @abstractmethod
    def resolve_module_path(self, module_ref: str) -> Path: ...

class FileSystemKnowledgeLoader(KnowledgeLoader):
    """Enforces base_dir containment. Rejects absolute paths and traversal."""

class ReactTemplateManager:
    """Per-agent config + slot loading. Async-safe with asyncio.Lock."""
    @classmethod
    async def create(cls, ...) -> ReactTemplateManager: ...  # async factory
    async def load_slot(self, slot_name: str) -> KnowledgeLoadResult: ...
    async def load_core_prompt(self, agent_name, **kwargs) -> str: ...
    def format_available_slots(self) -> str: ...
    def format_available_actions(self) -> str: ...
    async def reset(self) -> None: ...
    async def close(self) -> None: ...
    @property
    def max_passes(self) -> int: ...
    @property
    def token_counter(self) -> TokenCounter: ...
    # Supports async with

Patterns:

Dependencies: core/ (token counting, exceptions), pydantic, pyyaml

Extension Points: KnowledgeLoader ABC (implement for non-filesystem sources like S3, databases).


2.3.8 react/ -- ReAct Execution Engine #

Purpose: Multi-pass thought/action/observation loop that drives ReAct-mode agents.

Files:

Key Classes:

python
class ReactExecutor:
    def __init__(
        self,
        agent_name: str,
        llm: BaseLanguageModel,
        template_manager: ReactTemplateManager,
        action_handlers: dict[str, ActionHandler],
        *,
        max_passes: int | None = None,
        retry_config: RetryConfig | None = None,
        timeout_config: TimeoutConfig | None = None,
        max_consecutive_errors: int | None = None,
        max_history_tokens: int | None = None,
        rate_limiter: RateLimiter | None = None,
        cancel_event: asyncio.Event | None = None,
        use_structured_output: bool = False,
        total_request_seconds: float | None = None,
    ) -> None: ...

    async def execute(
        self, user_input: str, context: dict[str, Any],
        config: RunnableConfig | None = None, *, request_id: str | None = None
    ) -> dict[str, Any]: ...

class ReactParsedResponse:
    thought: str
    action: str
    action_params: dict[str, Any]

class ReactStructuredResponse(BaseModel):
    """Pydantic model for llm.with_structured_output()."""
    thought: str
    action: str
    action_params: dict[str, Any]

class TerminalActionResult(Exception):
    """Raised by action handlers to signal loop termination."""
    def __init__(self, result: dict[str, Any]): ...

# ActionHandler is a Protocol with optional config param
ActionHandler = Callable[[dict, dict, RunnableConfig | None], Awaitable[tuple[str, dict]]]

Pass Loop (per pass):

  1. Check cancellation event.
  2. Build prompt: static core prompt + dynamic per-pass prompt (user input, loaded knowledge, history).
  3. Invoke LLM (with retry, rate limiting, timeout).
  4. Accumulate token usage.
  5. Parse response: XML (<thought>, <action type="...">) or structured (ReactStructuredResponse).
  6. Look up action handler. If unknown, increment consecutive errors and continue.
  7. Execute handler. If TerminalActionResult raised, return the result. Otherwise, collect observation.
  8. On max_passes exhaustion, raise ReactMaxPassesError.

Reserved State Keys: _user_input, _pass_number, _previous_thoughts, _observations, _loaded_slots, _loaded_slot_contents. Overwriting these from context or handler state_updates raises ReservedStateKeyError.

Patterns:

Dependencies: core/ (LLM utils, exceptions, rate limiting), knowledge/ (template manager)

Extension Points: Action handlers (any async callable matching the ActionHandler protocol).


2.3.9 observability/ -- OpenTelemetry Instrumentation #

Purpose: Optional OpenTelemetry integration. All operations are zero-overhead no-ops when the opentelemetry-api package is not installed.

Files:

Key Exports:

python
OTEL_AVAILABLE: bool  # True if opentelemetry-api is importable

def get_tracer(name: str = "graphcrew") -> Tracer: ...
async def otel_span(name: str, **attributes) -> AsyncContextManager: ...
def sync_otel_span(name: str, **attributes) -> ContextManager: ...

class OpenTelemetryMiddleware(AgentMiddleware):
    """Creates spans for agent invocations. Per-invocation state in contextvars."""

Patterns:

Dependencies: opentelemetry-api (optional, >=1.24,<2.0)

Extension Points: None directly. Developers configure OTel exporters and processors at the application level.


2.3.10 testing/ -- Test Utilities #

Purpose: Provide mock objects and factory helpers for unit and integration tests. Not intended for production use.

Files:

Key Exports:

python
class MockLLM:
    """Returns scripted responses without real API calls."""
    # Supports:
    #   raise_on_call: exception class to raise
    #   stream_chunks: per-call chunk lists for astream
    #   structured_responses: FIFO list for with_structured_output
    #   usage_tokens: dict attached to every response

def make_test_state(user_input: str, **kwargs) -> OrchestratorState:
    """Creates a minimal OrchestratorState for testing."""

# Re-exported for convenience:
InMemorySessionManager

Patterns:

Dependencies: core/, session/

Extension Points: None. These are test utilities.


2.4 Architectural Decisions Record #

All 68 architectural decisions from the project, grouped by category.

Foundation and Philosophy #

AD-1. Library, not application. Developers build their own LangGraph graphs using library components. The library provides building blocks and interfaces, not a turnkey solution.

AD-2. Declarative config. YAML defines agents, topology, and slots. Developers build their own LangGraph graphs using library components wired together via config.

AD-5. LangGraph as execution engine. Chosen over AutoGen/AG2 (fragmented) and Google ADK (too new). LangGraph has 35.9k+ dependents, production use at Klarna/Uber/LinkedIn, and is MIT-licensed.

AD-6. Only 2 built-in agents. Supervisor and SessionManagerAgent. Everything else is developer-provided by design.

AD-21. Code conventions. from __future__ import annotations in every file. Slot uses slots=True. Memory is frozen. All dataclasses use frozen=True or slots=True where applicable.

AD-24. __version__ public attribute. Reads from package metadata via importlib.metadata.version(). Included in __all__.

Routing and Topology #

AD-3. Constrained dynamic routing. Agents choose next_agent at runtime, validated against topology via TopologyResolver. Invalid routes fall back to default_return.

AD-22. Agent name and description validation. AgentConfig.name enforces [a-zA-Z][a-zA-Z0-9_-]*, <= 64 chars, rejects __end__/__start__. Description requires min_length=1.

AD-58. Graph builder helpers. build_hub_spoke_graph(), build_topology_graph(), and build_conditional_edge() eliminate LangGraph wiring boilerplate. build_conditional_edge returns END when current_agent == from_agent_name.

AD-61. RoutingSelfLoopError. Raised when supervisor tries to route to itself, replacing the previous warning-and-fallback behavior.

Context and Slots #

AD-4. Schemaless slots with tiered persistence. Arbitrary key-value pairs managed by SlotManager with token budgets and eviction. Three tiers: TURN (end of turn), SESSION (session lifetime), LONG_TERM (cross-session).

AD-8. Tiered slot persistence. DemotionEviction demotes slots through tiers before evicting. SlotPersistence ABC allows any durable backend.

AD-10. Pluggable token counting. TokenCounter ABC with count(text) -> int. Default CharDivisionTokenCounter estimates len(text) // 4. Injected into SlotPool, SlotManager, and ReactTemplateManager.

AD-35. InMemorySlotPersistence. Concrete implementation using dicts with copy.deepcopy for mutation isolation. Ships for dev/testing.

AD-36. SlotTier as StrEnum. Enables SlotTier.TURN == "turn" string comparison directly. Requires Python 3.11+.

AD-54. AgentResponse.slots_tier typed as dict[str, SlotTier]. Uses SlotTier enum instead of str. Backward-compatible via StrEnum equality.

AD-56. max_tokens validation. All max_tokens and max_total_tokens fields have gt=0 constraint.

Session Layer #

AD-5b. Single SessionManager ABC. One interface with 3 abstract methods (messages + lifecycle). Summaries and memories default to NotImplementedError. Override only what you need.

AD-23. SessionMessage.to_langchain() role mapping. "user" -> HumanMessage, "assistant"/"ai" -> AIMessage, "system" -> SystemMessage. Raises ValueError on unknown roles.

AD-42. Health checks. SessionManager.health_check() and SlotPersistence.health_check() return {"status": "ok"} by default. Override for backend-specific connectivity checks.

AD-52. knowledge_context populated by BaseAgent.invoke(). After process(), _loaded_slot_contents is extracted into update["knowledge_context"] and removed from context_slots.

AD-62. InMemorySessionManager parameter validation. get_messages(), get_messages_paginated(), and list_memories() raise ValueError when limit < 1.

AD-66. SemanticSearchMixin. Abstract mix-in for vector-based memory search. Declares embed_text() and search_memories_semantic(). Combine via MRO with SemanticSearchMixin first.

ReAct Engine #

AD-7. ReAct execution engine. Multi-pass thought/action/observation loop. Agents load knowledge on-demand. Optional max_history_tokens sliding window for prompt overflow prevention.

AD-9. External knowledge modules. Per-agent react_config.yaml validated by ReactConfigSchema. Enforces dependency references and circular dependency detection via DFS.

AD-15. Reserved ReactExecutor state keys. 6 internal keys guarded by _RESERVED_STATE_KEYS frozenset. ReservedStateKeyError raised on overwrite attempts.

AD-19. ReAct cancellation. cancel_event: asyncio.Event checked at top of each pass. Raises ReactCancellationError when set.

AD-25. DEFAULT_MAX_PASSES constant. Exported from knowledge/schema.py as DEFAULT_MAX_PASSES = 5.

AD-31. Structured output. Opt-in use_structured_output=True. ReactStructuredResponse Pydantic model. Graceful fallback to XML when LLM does not support with_structured_output().

AD-42b. Knowledge slot loading budget. ReactTemplateManager.max_loaded_tokens enforced in load_slot(). Per-slot max_tokens checked before global budget.

AD-60. TimeoutConfig.total_request_seconds. Wraps the entire ReactExecutor pass loop with a single outer timeout, complementing the per-call llm_call_seconds.

AD-63. Per-slot max_tokens enforcement. ReactTemplateManager.load_slot() checks per-slot token limit after loading content. Was in schema but not enforced at load time previously.

AD-64. Structured output capability cached. ReactExecutor caches whether LLM supports with_structured_output() after first detection call.

Agent Middleware #

AD-16. Agent middleware. AgentMiddleware with before_invoke, after_invoke, on_error hooks. All have concrete defaults. Onion model execution order.

AD-38. MetricsMiddleware. MetricsCollector protocol + middleware recording duration, token usage, and success/error. InMemoryMetricsCollector for dev.

AD-39. TenantIsolationMiddleware. Captures session_id/user_id in before_invoke. Raises TenantIsolationError if process mutates identity fields.

AD-40. CircuitBreakerMiddleware. CLOSED -> OPEN -> HALF_OPEN -> CLOSED state machine. Configurable failure_threshold and recovery_timeout_seconds.

AD-46. Middleware concurrency safety. MetricsMiddleware, TenantIsolationMiddleware, and OpenTelemetryMiddleware use contextvars.ContextVar for per-invocation state.

AD-68. MemoryInjectionMiddleware. Searches session and cross-session memories, deduplicates by key (higher importance wins), injects into state["knowledge_context"]["_injected_memories"]. Never blocks agent execution on failure.

Rate Limiting #

AD-17. Rate limiter ABC. acquire(), release(), close(). Called before every LLM invocation. release() has concrete no-op default.

AD-34. SemaphoreRateLimiter. Concrete implementation using asyncio.Semaphore. Holds semaphore from acquire() until release(). Optional delay_seconds.

AD-48. RateLimiter async context manager. Supports async with rate_limiter: usage.

AD-55. Rate limiter acquire/release safety. acquired flag prevents release() when acquire() fails.

AD-67. PerTenantRateLimiter. Per-tenant semaphore isolation via OrderedDict. LRU eviction when max_tenants is reached. set_tenant() stores tenant ID in contextvars.ContextVar.

Observability and Logging #

AD-12. Request and per-invocation correlation. request_id (UUID4) + bind_logging_context() binds request_id, session_id, user_id, langgraph_step, langgraph_node into structlog contextvars. All agent lifecycle logs include explicit request_id.

AD-32. OpenTelemetry instrumentation. Optional [otel] extra. Zero-overhead no-ops when not installed. OpenTelemetryMiddleware creates spans for agent invocations.

AD-33. Token usage tracking. AgentResponse.token_usage accumulated across ReAct passes. Propagated to AIMessage.response_metadata.

AD-41. PII redaction. bind_logging_context(redact_pii=True) hashes session_id/user_id with 12-char SHA-256 prefix. Deterministic for log correlation.

Async Safety and Concurrency #

AD-11. Async concurrency safety. Targeted asyncio.Lock guards: ReactTemplateManager (load_slot, reset), SlotManager (hydrate, flush), InMemorySessionManager (per-session locks). SlotPool.total_tokens snapshots dict values. OrchestratorState uses _merge_dicts reducer.

AD-18. Async context manager protocol. close() + async with on SlotManager, ReactTemplateManager, SessionManager, and RateLimiter. Follows aiohttp.ClientSession pattern.

AD-49. Closed guards. SlotManager and ReactTemplateManager have _closed flag. Post-close operations raise RuntimeError. TOCTOU fix in ReactTemplateManager.load_slot() moves budget check inside lock.

AD-37. OrchestratorContext coordinated lifecycle. Manages startup/shutdown of all components. close() tears down in safe order (extras, template managers, rate limiter, slot manager, session manager). Errors logged but never masking.

AD-50. OrchestratorContext.__repr__ and is_closed. __repr__ shows managed component names and open/closed status. is_closed property for health checks.

Configuration and Validation #

AD-13. Immutable config models. All Pydantic config models use ConfigDict(frozen=True, extra="forbid"). Prevents mutation and rejects unknown fields (typos raise ValidationError immediately).

AD-44. load_config() accepts str | Path. Coerces strings to Path internally so callers no longer need explicit Path() wrapping.

AD-53. ConfigLoadError for missing files. Raises ConfigLoadError (not FileNotFoundError) for consistent error hierarchy.

AD-47. Agent constructors forward middleware and rate limiter. SupervisorAgent and SessionManagerAgent accept middleware param (forwarded to BaseAgent). SessionManagerAgent also accepts rate_limiter.

AD-48b. Keyword-only constructor args. Optional params on ReactExecutor, SupervisorAgent, and SessionManagerAgent use * separator to prevent positional argument mistakes.

AD-27. _build_action_handlers startup validation. SessionManagerAgent validates all method names in _CAPABILITY_HANDLER_MAP exist on the class at construction time. Raises AttributeError for missing handlers.

LLM Integration #

AD-14. RunnableConfig threading. RunnableConfig carrying LangSmith/Langfuse/OTel callbacks threaded from invoke() through process(), execute(), to llm.ainvoke(). LangGraph auto-passes config to node functions.

AD-22b. Shared LLM utilities. ainvoke_with_timeout, build_async_retryer, TRANSIENT_EXCEPTIONS in core/llm_utils.py. Retries target OSError/TimeoutError only, not logic errors.

AD-30. Streaming support. Opt-in astream_with_timeout() + BaseAgent.astream_process() (default raises NotImplementedError). SupervisorAgent supports simple mode streaming. ReAct stays non-streaming (XML parsing needs full response).

AD-57. Transient exception registry. register_transient_exception(), deregister_transient_exception(), get_transient_exceptions(). Thread-safe via threading.Lock. Register provider-specific exceptions (e.g., anthropic.RateLimitError) at app startup.

Testing and CI #

AD-28. Dev dependency upper bounds. pytest>=8.0,<9.0, hypothesis>=6.100,<7.0, etc. Prevents surprise major-version breakage.

AD-29. Parametrized and property-based testing. @pytest.mark.parametrize for validation cases. hypothesis for agent name regex exhaustive testing.

AD-43. pip-audit in CI. Vulnerability scanning in lint job after ruff, before mypy.

AD-59. graphcrew.testing subpackage. MockLLM (scripted responses, streaming chunks, structured output, error injection), make_test_state(), InMemorySessionManager re-exported for convenience.

AD-65. mypy applied to tests/. Strict mode with ignore_errors = true override so test files can intentionally pass wrong types to verify error handling.

Debuggability #

AD-26. __repr__ on key classes. BaseAgent, SlotPool, SlotManager, ReactTemplateManager, ReactExecutor. BaseAgent.__repr__ uses type(self).__name__ for subclass accuracy.

Multi-Tenant Support #

AD-39b. BaseAgent.invoke() identity mutation logging. Independently of TenantIsolationMiddleware, invoke() logs a WARNING if process() mutates session_id or user_id.

AD-52b. create_initial_state() defaults. session_id defaults to "default", user_id defaults to "anonymous". These are for single-user development only -- production code MUST pass explicit values.


2.5 Exception Hierarchy #

All exceptions inherit from OrchestratorError. The tree below shows the full hierarchy with brief descriptions of when each is raised.

text
Exception
  |
  +-- OrchestratorError                          # Base for all library exceptions
       |
       +-- ConfigurationError                    # Base for config/setup errors
       |    +-- LLMNotConfiguredError            # Agent requires LLM but none provided
       |    +-- ReactExecutorNotConfiguredError   # ReAct mode needs template_manager
       |    +-- ConfigLoadError*                  # Config file not found or unreadable
       |    +-- ConfigValidationError*            # Config YAML fails Pydantic validation
       |
       +-- AgentExecutionError                   # Base for agent runtime failures
       |    +-- AgentProcessError                # Agent's process() method raised
       |
       +-- LLMError                              # Base for LLM call failures
       |    +-- RateLimitExceededError            # Rate limiter rejected the call
       |    +-- LLMInvocationError               # Single LLM ainvoke() call failed
       |    +-- LLMRetryExhaustedError           # All retry attempts exhausted
       |
       +-- SessionError                          # Base for session layer errors
       |    +-- SessionManagerError              # SessionManager operation failed
       |    +-- SessionManagerNotConfiguredError  # No session manager provided
       |
       +-- StateError                            # Base for state validation errors
       |    +-- InvalidStateError                # Required state key missing
       |
       +-- ContextError                          # Base for slot/context errors
       |    +-- SlotPersistenceError             # Persistence load/save failed
       |
       +-- KnowledgeError                        # Base for knowledge module errors
       |    +-- KnowledgeSlotNotFoundError       # Requested slot does not exist
       |    +-- KnowledgePathError               # Module path escapes base directory
       |    +-- KnowledgeLoadError               # Module file failed to load
       |    +-- KnowledgeConfigValidationError   # react_config.yaml validation failed
       |
       +-- ReactError                            # Base for ReAct execution errors
       |    +-- ReactCancellationError           # ReAct loop cancelled via asyncio.Event
       |    +-- ReactMaxPassesError              # max_passes exceeded without terminal action
       |    +-- ReservedStateKeyError            # Attempted overwrite of reserved state keys
       |    +-- ReactConsecutiveErrorsError      # Max consecutive invalid actions reached
       |
       +-- RoutingError                          # Base for routing errors
       |    +-- RoutingSelfLoopError             # Agent tried to route to itself
       |
       +-- TenantIsolationError                  # Agent mutated session_id/user_id
       |
       +-- CircuitBreakerOpenError               # Circuit breaker rejecting calls

ConfigLoadError and ConfigValidationError are defined in config/loader.py and inherit from ConfigurationError.

When Each Exception Is Raised #

Exception Trigger
LLMNotConfiguredError Agent constructor called without an LLM when one is required
ReactExecutorNotConfiguredError SupervisorAgent._process_react() called without a ReactTemplateManager
ConfigLoadError load_config() given a nonexistent file path
ConfigValidationError YAML content fails Pydantic validation
AgentProcessError BaseAgent.invoke() catches a non-OrchestratorError from process()
RateLimitExceededError rate_limiter.acquire() raises an exception
LLMInvocationError Single LLM call fails (no retry configured)
LLMRetryExhaustedError All retry attempts fail for an LLM call
SessionManagerError Any SessionManager operation fails (wraps original exception)
SessionManagerNotConfiguredError Session operation requested but no SessionManager provided
InvalidStateError Required key missing from OrchestratorState
SlotPersistenceError hydrate() or flush() persistence backend raises
KnowledgeSlotNotFoundError ReactTemplateManager.load_slot() given an unknown slot name
KnowledgePathError Module path resolves outside the knowledge base directory
KnowledgeLoadError Module file cannot be read, or token budget exceeded
KnowledgeConfigValidationError react_config.yaml fails ReactConfigSchema validation
ReactCancellationError cancel_event.is_set() detected at start of a ReAct pass
ReactMaxPassesError ReAct loop completes max_passes without a terminal action
ReservedStateKeyError context dict or handler state_updates contain reserved keys
ReactConsecutiveErrorsError max_consecutive_errors consecutive unknown/failed actions
RoutingSelfLoopError Supervisor attempts to route to itself
TenantIsolationError TenantIsolationMiddleware detects identity field mutation
CircuitBreakerOpenError CircuitBreakerMiddleware is in OPEN state, rejecting calls

Error Handling Best Practices #

  1. Catch OrchestratorError at the application boundary to handle all library errors uniformly.
  2. Catch specific subclasses when you need targeted recovery (e.g., RateLimitExceededError for backoff, ReactCancellationError for graceful shutdown).
  3. Never catch ReservedStateKeyError -- it indicates a programming error (fix the offending handler).
  4. LLMRetryExhaustedError vs LLMInvocationError -- the former means all retries failed; the latter means a single call failed with no retry configured. Both carry the original exception as .cause.
  5. ConfigLoadError and ConfigValidationError should be caught at startup and surfaced clearly to the developer.

Part 2: Developer Practices (2.6--2.9) #


2.6 Testing Guide #

Test Directory Structure #

text
tests/
  conftest.py                          # Global fixtures (autouse structlog cleanup)
  __init__.py
  helpers/
    __init__.py
    knowledge_helpers.py               # setup_supervisor_agent_dir(), setup_minimal_agent_dir()
    session_helpers.py                 # FullSessionManager, make_session_agent_config()
  unit/
    __init__.py
    config/
      test_schema.py                   # Config model validation
      test_loader.py                   # YAML loading, ConfigLoadError
      test_defaults.py                 # Default value verification
      test_agent_name_property.py      # Hypothesis property-based name validation
      test_retry_config.py             # Retry/backoff config
      test_timeout_config.py           # Timeout config validation
    core/
      test_state.py                    # OrchestratorState, create_initial_state()
      test_types.py                    # Type aliases and enums
      test_exceptions.py               # Exception hierarchy
      test_logging_context.py          # bind_logging_context, PII redaction
      test_token_counting.py           # CharDivisionTokenCounter + hypothesis
      test_llm_utils.py                # Retry, transient exceptions, timeouts
      test_rate_limiting.py            # SemaphoreRateLimiter, PerTenantRateLimiter
      test_orchestrator_context.py     # OrchestratorContext lifecycle
    agents/
      test_base.py                     # BaseAgent ABC, middleware execution
      test_supervisor.py               # SupervisorAgent routing
      test_session_manager_agent.py    # SessionManagerAgent capabilities
      test_middleware.py               # AgentMiddleware onion model
      test_metrics.py                  # MetricsMiddleware, InMemoryMetricsCollector
      test_tenant_isolation.py         # TenantIsolationMiddleware
      test_circuit_breaker.py          # CircuitBreakerMiddleware state machine
      test_memory_injection.py         # MemoryInjectionMiddleware
      test_streaming.py                # astream_process()
    context/
      test_slots.py                    # SlotPool, SlotTier + hypothesis
      test_manager.py                  # SlotManager lifecycle
      test_eviction.py                 # FIFO, LRU, Priority eviction
      test_persistence.py              # InMemorySlotPersistence
    session/
      test_interface.py                # SessionManager ABC
      test_in_memory.py                # InMemorySessionManager
      test_models.py                   # SessionMessage, Memory, Summary + hypothesis
      test_semantic.py                 # SemanticSearchMixin
    routing/
      test_topology.py                 # TopologyResolver
      test_graph_builder.py            # build_hub_spoke_graph(), build_conditional_edge()
    knowledge/
      test_loader.py                   # FileSystemKnowledgeLoader
      test_template_manager.py         # ReactTemplateManager lifecycle
      test_models.py                   # ReactActionSchema, ReactSlotSchema
      test_react_config_schema.py      # react_config.yaml validation
    react/
      test_executor.py                 # ReactExecutor multi-pass loop
      test_structured_output.py        # ReactStructuredResponse, fallback
      test_models.py                   # ReAct response models
    observability/
      test_otel.py                     # OpenTelemetry middleware (optional)
    test_concurrency.py                # asyncio.Lock guards across components
    test_testing_subpackage.py         # MockLLM, make_test_state validation
    test_public_api.py                 # __all__ exports, __version__
    test_subpackage_exports.py         # Subpackage re-exports
    test_chat.py                       # run_chat_turn()
  integration/
    test_agent_e2e.py                  # End-to-end agent invocation
    test_react_integration.py          # Full ReAct loop with MockLLM
    test_session_integration.py        # Session persistence round-trips
    test_example_chatbot.py            # Smoke test for simple_chatbot example
    test_example_react_knowledge.py    # Smoke test for react_knowledge_agent example
    test_graph_execution.py            # Compiled LangGraph execution
    test_multi_tenant.py               # Multi-tenant isolation, cross-session memory
  performance/
    __init__.py
    test_benchmarks.py                 # Opt-in benchmarks

The global conftest.py provides a single autouse fixture that clears structlog contextvars before and after every test.

Test Utilities: graphcrew.testing #

The library ships a testing subpackage for writing tests without real API calls:

python
from graphcrew.testing import MockLLM, make_test_state, InMemorySessionManager

MockLLM -- A BaseChatModel subclass that returns scripted responses in FIFO order.

make_test_state() -- Creates a pre-populated OrchestratorState with sensible defaults.

InMemorySessionManager -- Re-exported from graphcrew.session for convenience in tests.

Writing a Unit Test #

python
"""Tests for the FooBar component."""

from __future__ import annotations

import pytest
from pydantic import ValidationError

from graphcrew.config.schema import AgentConfig


class TestAgentConfig:
    def test_valid_config(self) -> None:
        cfg = AgentConfig(name="supervisor", type="supervisor", description="Main")
        assert cfg.name == "supervisor"

    def test_missing_required_field_raises(self) -> None:
        with pytest.raises(ValidationError):
            AgentConfig(type="custom", description="No name")  # type: ignore[call-arg]

    @pytest.mark.parametrize(
        ("name", "match"),
        [
            ("", None),
            ("__end__", "reserved"),
            ("1agent", "invalid"),
        ],
        ids=["empty", "reserved-end", "digit-start"],
    )
    def test_invalid_names(self, name: str, match: str | None) -> None:
        with pytest.raises(ValidationError):
            AgentConfig(name=name, type="custom", description="desc")

Property-Based Testing with Hypothesis #

The project uses Hypothesis for property-based testing where the input space is large.

Running Tests #

bash
# Full test suite with coverage
uv run pytest --cov=graphcrew --cov-report=term-missing --cov-fail-under=90

# Via Make
make test

# Specific test file
uv run pytest tests/unit/agents/test_supervisor.py -v

Coverage #

The suite maintains 98%+ coverage across 1300+ tests. Branch coverage is enabled, and show_missing = true reports uncovered lines. The fail_under = 90 threshold is enforced in CI and make test.

CI Pipeline #

The GitHub Actions CI pipeline runs on every push to main and every pull request.

Lint job (Python 3.13 only):

  1. ruff check -- linting
  2. ruff format --check -- format verification
  3. pip-audit -- vulnerability scanning
  4. mypy src/ tests/ -- strict type checking
  5. uv build -- verify the package builds

Test job (Python 3.11, 3.12, 3.13 matrix, depends on lint):

  1. pytest --cov=graphcrew --cov-report=term-missing --cov-fail-under=90
  2. Upload coverage to Codecov (3.13 only)

2.7 Code Conventions #

Future Annotations #

Every .py file starts with:

python
from __future__ import annotations

Frozen and Strict Pydantic Models #

All Pydantic config models use:

python
model_config = ConfigDict(frozen=True, extra="forbid")

Keyword-Only Constructor Parameters #

Optional parameters on public constructors use the * separator.

structlog Field Naming #

All log events use agent_name= (not agent=) for consistency.

Ruff Configuration #

Code Rule Set Purpose
E pycodestyle errors Style errors
F Pyflakes Logical errors
I isort Import ordering
N pep8-naming Naming conventions
W pycodestyle warnings Style warnings
UP pyupgrade Python version upgrade suggestions
B flake8-bugbear Common bug patterns
SIM flake8-simplify Code simplification
RUF Ruff-specific Ruff own rules
T20 flake8-print Disallow print statements
S flake8-bandit Security checks
C4 flake8-comprehensions Comprehension improvements
PTH flake8-use-pathlib Prefer pathlib over os.path

mypy Strict Mode #

toml
[tool.mypy]
python_version = "3.11"
strict = true
warn_return_any = true
warn_unused_configs = true

2.8 Contributing Guide #

Prerequisites #

Setup #

bash
git clone https://github.com/Atiqul-Islam/swagent.git
cd swagent
uv sync --group dev
pre-commit install

Development Commands #

bash
make test       # Run tests with coverage (fail-under=90)
make lint       # Run ruff linter
make typecheck  # Run mypy strict type checking
make format     # Auto-format code with ruff
make audit      # Run pip-audit vulnerability scan
make all        # Run lint + typecheck + audit + test
make clean      # Remove build artifacts, caches

Required Checks #

Check Tool Command
Lint ruff uv run ruff check src/ tests/ examples/
Format ruff uv run ruff format --check src/ tests/ examples/
Vulnerabilities pip-audit uv run pip-audit
Type check mypy (strict) uv run mypy src/ tests/
Build hatchling uv build
Tests pytest uv run pytest --cov=graphcrew --cov-fail-under=90

How to Add a New Agent #

  1. Create your agent class extending BaseAgent
  2. Register it in your YAML config under agents: with type: custom
  3. Write unit tests in tests/unit/agents/ and integration tests in tests/integration/

Code of Conduct #

The project follows the Contributor Covenant Code of Conduct v2.1.


2.9 Release and Deployment #

Build System #

toml
[build-system]
requires = ["hatchling>=1.25"]
build-backend = "hatchling.build"

Versioning #

The version is declared in pyproject.toml. At runtime, graphcrew.__version__ reads from installed package metadata.

Publish Pipeline #

The publish.yml workflow triggers when a GitHub release is published.

Step 1: Test gate (Python 3.11, 3.12, 3.13 matrix)

Step 2: Build and publish

Step 3: Smoke test

Release Process #

  1. Update the version in pyproject.toml
  2. Move changelog entries from [Unreleased] to the new version section
  3. Commit and push to main
  4. Create a GitHub release with a tag matching the version
  5. Pipeline runs full test matrix, publishes, and smoke tests

CI vs Publish Pipeline Comparison #

Check CI Publish
Trigger Push/PR to main GitHub release published
Lint Python 3.13 only All matrix versions
Tests 3.11, 3.12, 3.13 3.11, 3.12, 3.13
Package build Verify only Build + publish to PyPI
Smoke test No Yes

Optional Dependencies #

toml
[project.optional-dependencies]
otel = ["opentelemetry-api>=1.24,<2.0"]

Install with: pip install graphcrew[otel]