Coverage for agentos/swarm/human_loop.py: 47%

118 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2v1.9.5: Human-in-the-Loop (HITL) breakpoint system. 

3 

4Enables task execution to pause at configurable checkpoints for human 

5review, approval, or intervention before continuing. 

6""" 

7 

8from __future__ import annotations 

9 

10import asyncio 

11import time 

12import uuid 

13from dataclasses import dataclass, field 

14from enum import Enum 

15from typing import Any, Callable, Optional 

16 

17 

18class BreakpointType(str, Enum): 

19 """Types of human-in-the-loop breakpoints.""" 

20 

21 BEFORE_TASK = "before_task" # Before a sub-task starts 

22 AFTER_RESULT = "after_result" # After a sub-task produces output 

23 ON_FAILURE = "on_failure" # When a sub-task fails 

24 ON_LOW_CONFIDENCE = "on_low_confidence" # When fusion confidence is low 

25 MANUAL = "manual" # Explicitly placed by developer 

26 

27 

28class HumanDecision(str, Enum): 

29 """Human responses at a breakpoint.""" 

30 

31 APPROVE = "approve" # Approve and continue 

32 REJECT = "reject" # Reject and skip/retry 

33 RETRY = "retry" # Reject and retry with feedback 

34 MODIFY = "modify" # Accept with modifications 

35 ABORT = "abort" # Abort entire task 

36 

37 

38@dataclass 

39class Breakpoint: 

40 """A checkpoint where execution pauses for human input.""" 

41 

42 id: str = field(default_factory=lambda: uuid.uuid4().hex[:8]) 

43 type: BreakpointType = BreakpointType.MANUAL 

44 task_id: str = "" 

45 context: dict[str, Any] = field(default_factory=dict) 

46 message: str = "" 

47 options: list[str] = field(default_factory=lambda: ["approve", "reject", "retry", "abort"]) 

48 timeout: float = 0.0 # 0 = no timeout 

49 created_at: float = field(default_factory=time.time) 

50 resolved_at: float = 0.0 

51 decision: HumanDecision | None = None 

52 feedback: str = "" 

53 resolved: bool = False 

54 

55 def to_dict(self) -> dict: 

56 return { 

57 "id": self.id, 

58 "type": self.type.value, 

59 "task_id": self.task_id, 

60 "message": self.message, 

61 "options": self.options, 

62 "resolved": self.resolved, 

63 "decision": self.decision.value if self.decision else None, 

64 } 

65 

66 

67@dataclass 

68class HITLConfig: 

69 """Configuration for human-in-the-loop behavior.""" 

70 

71 enabled: bool = True 

72 break_on_failure: bool = True 

73 break_on_low_confidence: float = 0.3 # confidence below this triggers break 

74 break_on_first_task: bool = False # break before first sub-task 

75 break_on_every_task: bool = False 

76 break_on_final_result: bool = False # break before returning final result 

77 max_pending_breakpoints: int = 5 # queue limit 

78 default_timeout: float = 300.0 # 5 min default 

79 

80 

81class HITLManager: 

82 """Manages human-in-the-loop breakpoints during task execution. 

83 

84 Usage: 

85 hitl = HITLManager(config=HITLConfig(break_on_failure=True)) 

86 

87 # Register a callback for human input 

88 hitl.register_handler(my_human_input_function) 

89 

90 # During execution: 

91 decision = await hitl.request_decision( 

92 bp_type=BreakpointType.ON_FAILURE, 

93 task_id="task_1", 

94 message="Task failed. Retry?", 

95 context={"error": "...", "attempts": 2} 

96 ) 

97 if decision == HumanDecision.RETRY: 

98 ... 

99 """ 

100 

101 def __init__( 

102 self, 

103 config: HITLConfig | None = None, 

104 handler: Callable | None = None, 

105 ): 

106 self.config = config or HITLConfig() 

107 self._handler = handler 

108 self._breakpoints: dict[str, Breakpoint] = {} 

109 self._pending: list[Breakpoint] = [] 

110 self._decision_queue: asyncio.Queue = asyncio.Queue() 

111 

112 def register_handler(self, handler: Callable[[Breakpoint], HumanDecision]) -> None: 

113 """ 

114 Register a human input handler. 

115 

116 Args: 

117 handler: Callable that receives a Breakpoint and returns a HumanDecision. 

118 Can be sync or async. 

119 """ 

120 self._handler = handler 

121 

122 async def request_decision( 

123 self, 

124 bp_type: BreakpointType, 

125 task_id: str, 

126 message: str, 

127 context: dict | None = None, 

128 timeout: float | None = None, 

129 options: list[str] | None = None, 

130 ) -> tuple[HumanDecision, str]: 

131 """ 

132 Pause execution and request human decision. 

133 

134 Args: 

135 bp_type: Type of breakpoint 

136 task_id: Current task identifier 

137 message: Human-readable message explaining what's needed 

138 context: Additional context for the decision 

139 timeout: Max wait time (None = use config default) 

140 options: Available decision options 

141 

142 Returns: 

143 Tuple of (decision, feedback text) 

144 """ 

145 if not self.config.enabled: 

146 return HumanDecision.APPROVE, "" 

147 

148 bp = Breakpoint( 

149 type=bp_type, 

150 task_id=task_id, 

151 context=context or {}, 

152 message=message, 

153 options=options or ["approve", "reject", "retry", "abort"], 

154 timeout=timeout or self.config.default_timeout, 

155 ) 

156 

157 self._breakpoints[bp.id] = bp 

158 self._pending.append(bp) 

159 

160 # If pending exceeds limit, auto-approve oldest 

161 if len(self._pending) > self.config.max_pending_breakpoints: 

162 oldest = self._pending.pop(0) 

163 oldest.decision = HumanDecision.APPROVE 

164 oldest.resolved = True 

165 oldest.resolved_at = time.time() 

166 

167 # Call handler 

168 if self._handler: 

169 try: 

170 result = self._handler(bp) 

171 if asyncio.iscoroutine(result): 

172 result = await result 

173 if isinstance(result, HumanDecision): 

174 bp.decision = result 

175 elif isinstance(result, tuple) and len(result) == 2: 

176 bp.decision, bp.feedback = result 

177 else: 

178 bp.decision = HumanDecision.APPROVE 

179 except Exception: 

180 bp.decision = HumanDecision.APPROVE 

181 else: 

182 # No handler — wait on queue 

183 try: 

184 decision, feedback = await asyncio.wait_for( 

185 self._decision_queue.get(), 

186 timeout=bp.timeout, 

187 ) 

188 bp.decision = decision 

189 bp.feedback = feedback 

190 except asyncio.TimeoutError: 

191 bp.decision = HumanDecision.APPROVE 

192 

193 bp.resolved = True 

194 bp.resolved_at = time.time() 

195 

196 # Remove from pending 

197 if bp in self._pending: 

198 self._pending.remove(bp) 

199 

200 return bp.decision, bp.feedback 

201 

202 def provide_decision( 

203 self, 

204 breakpoint_id: str, 

205 decision: HumanDecision, 

206 feedback: str = "", 

207 ) -> None: 

208 """Provide a decision for a pending breakpoint (alternative to handler).""" 

209 if breakpoint_id in self._breakpoints: 

210 bp = self._breakpoints[breakpoint_id] 

211 self._decision_queue.put_nowait((decision, feedback)) 

212 

213 async def should_break_before_task( 

214 self, task_id: str, task_name: str 

215 ) -> bool: 

216 """Check if we should break before a sub-task.""" 

217 if not self.config.enabled: 

218 return False 

219 if self.config.break_on_first_task or self.config.break_on_every_task: 

220 decision, _ = await self.request_decision( 

221 bp_type=BreakpointType.BEFORE_TASK, 

222 task_id=task_id, 

223 message=f"About to execute: {task_name}\nProceed?", 

224 options=["approve", "abort", "modify"], 

225 ) 

226 if decision == HumanDecision.ABORT: 

227 return False 

228 return True 

229 

230 async def should_break_on_result( 

231 self, task_id: str, output: Any, confidence: float 

232 ) -> tuple[HumanDecision, str]: 

233 """Check if we should break after a result.""" 

234 if not self.config.enabled: 

235 return HumanDecision.APPROVE, "" 

236 

237 # Low confidence trigger 

238 if confidence < self.config.break_on_low_confidence: 

239 return await self.request_decision( 

240 bp_type=BreakpointType.ON_LOW_CONFIDENCE, 

241 task_id=task_id, 

242 message=( 

243 f"Low confidence result (confidence: {confidence:.2f})\n" 

244 f"Output: {str(output)[:300]}\n" 

245 f"What would you like to do?" 

246 ), 

247 context={"confidence": confidence, "output": str(output)[:500]}, 

248 options=["approve", "retry", "modify", "abort"], 

249 ) 

250 

251 # Final result break 

252 if self.config.break_on_final_result: 

253 return await self.request_decision( 

254 bp_type=BreakpointType.AFTER_RESULT, 

255 task_id=task_id, 

256 message=f"Result: {str(output)[:300]}\nApprove?", 

257 context={"output": str(output)[:500]}, 

258 options=["approve", "retry", "modify"], 

259 ) 

260 

261 return HumanDecision.APPROVE, "" 

262 

263 async def should_break_on_failure( 

264 self, task_id: str, error: str, attempt: int 

265 ) -> tuple[HumanDecision, str]: 

266 """Check if we should break on failure.""" 

267 if not self.config.enabled or not self.config.break_on_failure: 

268 return HumanDecision.RETRY, "" 

269 

270 return await self.request_decision( 

271 bp_type=BreakpointType.ON_FAILURE, 

272 task_id=task_id, 

273 message=( 

274 f"Task failed (attempt {attempt})\n" 

275 f"Error: {error[:300]}\n" 

276 f"Retry, skip, or abort?" 

277 ), 

278 context={"error": error, "attempt": attempt}, 

279 options=["retry", "abort", "modify"], 

280 ) 

281 

282 @property 

283 def pending_count(self) -> int: 

284 return len(self._pending) 

285 

286 @property 

287 def total_breakpoints(self) -> int: 

288 return len(self._breakpoints)