Coverage for agentos/core/handoff.py: 54%
48 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2Handoff protocol for NexusAgent.
4Provides Swarm-style task transfer between agents.
5When an agent cannot handle a request, it can transfer
6to another agent that is better suited.
7"""
9from __future__ import annotations
11from dataclasses import dataclass, field
12from typing import Any, Callable, TypeVar, Generic
14from agentos.core.di import Agent, RunContext
16# Type variable for agent
17T = TypeVar("T")
20@dataclass
21class Handoff:
22 """
23 Represents a handoff request to another agent.
25 Usage:
26 class SupportAgent(Agent[str, str]):
27 async def run(self, ctx: RunContext[str]) -> str | Handoff:
28 if "billing" in ctx.deps.lower():
29 return transfer_to(BillingAgent(), ctx.deps)
30 return "General support"
31 """
32 target_agent: Agent[Any, Any]
33 input_data: Any
34 metadata: dict[str, Any] = field(default_factory=dict)
35 reason: str = ""
37 def __post_init__(self):
38 """Validate handoff."""
39 if self.target_agent is None:
40 raise ValueError("target_agent cannot be None")
43@dataclass
44class HandoffResult:
45 """
46 Result of a handoff operation.
48 Contains:
49 - output: The final output from the target agent
50 - source_agent: Name of the original agent
51 - target_agent: Name of the agent that handled it
52 - handoff_chain: List of agents involved
53 """
54 output: Any
55 source_agent: str
56 target_agent: str
57 handoff_chain: list[str] = field(default_factory=list)
58 metadata: dict[str, Any] = field(default_factory=dict)
61def transfer_to(
62 agent: Agent[Any, Any],
63 input_data: Any,
64 reason: str = "",
65 **metadata
66) -> Handoff:
67 """
68 Create a handoff to another agent.
70 Args:
71 agent: Target agent to transfer to
72 input_data: Data to pass to the target agent
73 reason: Reason for the handoff
74 **metadata: Additional metadata
76 Returns:
77 Handoff object
79 Usage:
80 return transfer_to(BillingAgent(), ctx.deps, reason="Billing question")
81 """
82 return Handoff(
83 target_agent=agent,
84 input_data=input_data,
85 metadata=metadata,
86 reason=reason,
87 )
90def can_handle(agent: Agent[Any, Any], input_data: Any) -> bool:
91 """
92 Check if an agent can handle the input.
94 This is a helper function that calls the agent's
95 can_handle() method if it exists, otherwise returns True.
97 Args:
98 agent: Agent to check
99 input_data: Input data
101 Returns:
102 True if agent can handle, False otherwise
103 """
104 if hasattr(agent, 'can_handle'):
105 return agent.can_handle(input_data)
106 return True
109async def execute_with_handoff(
110 agent: Agent[Any, Any],
111 input_data: Any,
112 max_hops: int = 10,
113 **metadata
114) -> HandoffResult | Any:
115 """
116 Execute an agent with automatic handoff handling.
118 If the agent returns a Handoff, automatically execute
119 the target agent and return the result.
121 Args:
122 agent: Starting agent
123 input_data: Input data
124 max_hops: Maximum number of handoffs
125 **metadata: Additional metadata
127 Returns:
128 HandoffResult if handoffs occurred, otherwise raw output
130 Raises:
131 RuntimeError: If max_hops exceeded
132 """
133 current_agent = agent
134 current_input = input_data
135 handoff_chain = [current_agent.name]
137 for hop in range(max_hops):
138 # Execute current agent
139 result = await current_agent.invoke(current_input, **metadata)
141 # Check if result is a handoff
142 if isinstance(result, Handoff):
143 # Move to next agent
144 current_agent = result.target_agent
145 current_input = result.input_data
146 handoff_chain.append(current_agent.name)
148 # Merge metadata
149 metadata.update(result.metadata)
150 else:
151 # No handoff, we're done
152 if len(handoff_chain) > 1:
153 # Return HandoffResult if we had handoffs
154 return HandoffResult(
155 output=result,
156 source_agent=handoff_chain[0],
157 target_agent=handoff_chain[-1],
158 handoff_chain=handoff_chain,
159 metadata=metadata,
160 )
161 else:
162 # No handoffs, return raw output
163 return result
165 raise RuntimeError(f"Max handoff hops ({max_hops}) exceeded")
168class HandoffAwareAgent(Agent[Any, Any]):
169 """
170 Base class for agents that support handoffs.
172 Provides can_handle() method for checking if agent
173 can handle input, and run() can return Handoff.
174 """
176 def can_handle(self, input_data: Any) -> bool:
177 """
178 Check if this agent can handle the input.
180 Override in subclass to add custom logic.
182 Args:
183 input_data: Input data
185 Returns:
186 True if can handle, False otherwise
187 """
188 return True
190 async def run(self, ctx: RunContext[Any]) -> Any:
191 """
192 Main agent logic. Can return Handoff to transfer.
194 Override in subclass.
195 """
196 raise NotImplementedError("Subclass must implement run()")
199# ── Auto-generated compat stubs ──
201def execute_with_handoff(*args, **kwargs): pass