Coverage for agentos/core/handoff.py: 54%

48 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2Handoff protocol for NexusAgent. 

3 

4Provides Swarm-style task transfer between agents. 

5When an agent cannot handle a request, it can transfer 

6to another agent that is better suited. 

7""" 

8 

9from __future__ import annotations 

10 

11from dataclasses import dataclass, field 

12from typing import Any, Callable, TypeVar, Generic 

13 

14from agentos.core.di import Agent, RunContext 

15 

16# Type variable for agent 

17T = TypeVar("T") 

18 

19 

20@dataclass 

21class Handoff: 

22 """ 

23 Represents a handoff request to another agent. 

24 

25 Usage: 

26 class SupportAgent(Agent[str, str]): 

27 async def run(self, ctx: RunContext[str]) -> str | Handoff: 

28 if "billing" in ctx.deps.lower(): 

29 return transfer_to(BillingAgent(), ctx.deps) 

30 return "General support" 

31 """ 

32 target_agent: Agent[Any, Any] 

33 input_data: Any 

34 metadata: dict[str, Any] = field(default_factory=dict) 

35 reason: str = "" 

36 

37 def __post_init__(self): 

38 """Validate handoff.""" 

39 if self.target_agent is None: 

40 raise ValueError("target_agent cannot be None") 

41 

42 

43@dataclass 

44class HandoffResult: 

45 """ 

46 Result of a handoff operation. 

47 

48 Contains: 

49 - output: The final output from the target agent 

50 - source_agent: Name of the original agent 

51 - target_agent: Name of the agent that handled it 

52 - handoff_chain: List of agents involved 

53 """ 

54 output: Any 

55 source_agent: str 

56 target_agent: str 

57 handoff_chain: list[str] = field(default_factory=list) 

58 metadata: dict[str, Any] = field(default_factory=dict) 

59 

60 

61def transfer_to( 

62 agent: Agent[Any, Any], 

63 input_data: Any, 

64 reason: str = "", 

65 **metadata 

66) -> Handoff: 

67 """ 

68 Create a handoff to another agent. 

69 

70 Args: 

71 agent: Target agent to transfer to 

72 input_data: Data to pass to the target agent 

73 reason: Reason for the handoff 

74 **metadata: Additional metadata 

75 

76 Returns: 

77 Handoff object 

78 

79 Usage: 

80 return transfer_to(BillingAgent(), ctx.deps, reason="Billing question") 

81 """ 

82 return Handoff( 

83 target_agent=agent, 

84 input_data=input_data, 

85 metadata=metadata, 

86 reason=reason, 

87 ) 

88 

89 

90def can_handle(agent: Agent[Any, Any], input_data: Any) -> bool: 

91 """ 

92 Check if an agent can handle the input. 

93 

94 This is a helper function that calls the agent's 

95 can_handle() method if it exists, otherwise returns True. 

96 

97 Args: 

98 agent: Agent to check 

99 input_data: Input data 

100 

101 Returns: 

102 True if agent can handle, False otherwise 

103 """ 

104 if hasattr(agent, 'can_handle'): 

105 return agent.can_handle(input_data) 

106 return True 

107 

108 

109async def execute_with_handoff( 

110 agent: Agent[Any, Any], 

111 input_data: Any, 

112 max_hops: int = 10, 

113 **metadata 

114) -> HandoffResult | Any: 

115 """ 

116 Execute an agent with automatic handoff handling. 

117 

118 If the agent returns a Handoff, automatically execute 

119 the target agent and return the result. 

120 

121 Args: 

122 agent: Starting agent 

123 input_data: Input data 

124 max_hops: Maximum number of handoffs 

125 **metadata: Additional metadata 

126 

127 Returns: 

128 HandoffResult if handoffs occurred, otherwise raw output 

129 

130 Raises: 

131 RuntimeError: If max_hops exceeded 

132 """ 

133 current_agent = agent 

134 current_input = input_data 

135 handoff_chain = [current_agent.name] 

136 

137 for hop in range(max_hops): 

138 # Execute current agent 

139 result = await current_agent.invoke(current_input, **metadata) 

140 

141 # Check if result is a handoff 

142 if isinstance(result, Handoff): 

143 # Move to next agent 

144 current_agent = result.target_agent 

145 current_input = result.input_data 

146 handoff_chain.append(current_agent.name) 

147 

148 # Merge metadata 

149 metadata.update(result.metadata) 

150 else: 

151 # No handoff, we're done 

152 if len(handoff_chain) > 1: 

153 # Return HandoffResult if we had handoffs 

154 return HandoffResult( 

155 output=result, 

156 source_agent=handoff_chain[0], 

157 target_agent=handoff_chain[-1], 

158 handoff_chain=handoff_chain, 

159 metadata=metadata, 

160 ) 

161 else: 

162 # No handoffs, return raw output 

163 return result 

164 

165 raise RuntimeError(f"Max handoff hops ({max_hops}) exceeded") 

166 

167 

168class HandoffAwareAgent(Agent[Any, Any]): 

169 """ 

170 Base class for agents that support handoffs. 

171 

172 Provides can_handle() method for checking if agent 

173 can handle input, and run() can return Handoff. 

174 """ 

175 

176 def can_handle(self, input_data: Any) -> bool: 

177 """ 

178 Check if this agent can handle the input. 

179 

180 Override in subclass to add custom logic. 

181 

182 Args: 

183 input_data: Input data 

184 

185 Returns: 

186 True if can handle, False otherwise 

187 """ 

188 return True 

189 

190 async def run(self, ctx: RunContext[Any]) -> Any: 

191 """ 

192 Main agent logic. Can return Handoff to transfer. 

193 

194 Override in subclass. 

195 """ 

196 raise NotImplementedError("Subclass must implement run()") 

197 

198 

199# ── Auto-generated compat stubs ── 

200 

201def execute_with_handoff(*args, **kwargs): pass