Coverage for agentos/swarm/human_loop.py: 47%
118 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2v1.9.5: Human-in-the-Loop (HITL) breakpoint system.
4Enables task execution to pause at configurable checkpoints for human
5review, approval, or intervention before continuing.
6"""
8from __future__ import annotations
10import asyncio
11import time
12import uuid
13from dataclasses import dataclass, field
14from enum import Enum
15from typing import Any, Callable, Optional
18class BreakpointType(str, Enum):
19 """Types of human-in-the-loop breakpoints."""
21 BEFORE_TASK = "before_task" # Before a sub-task starts
22 AFTER_RESULT = "after_result" # After a sub-task produces output
23 ON_FAILURE = "on_failure" # When a sub-task fails
24 ON_LOW_CONFIDENCE = "on_low_confidence" # When fusion confidence is low
25 MANUAL = "manual" # Explicitly placed by developer
28class HumanDecision(str, Enum):
29 """Human responses at a breakpoint."""
31 APPROVE = "approve" # Approve and continue
32 REJECT = "reject" # Reject and skip/retry
33 RETRY = "retry" # Reject and retry with feedback
34 MODIFY = "modify" # Accept with modifications
35 ABORT = "abort" # Abort entire task
38@dataclass
39class Breakpoint:
40 """A checkpoint where execution pauses for human input."""
42 id: str = field(default_factory=lambda: uuid.uuid4().hex[:8])
43 type: BreakpointType = BreakpointType.MANUAL
44 task_id: str = ""
45 context: dict[str, Any] = field(default_factory=dict)
46 message: str = ""
47 options: list[str] = field(default_factory=lambda: ["approve", "reject", "retry", "abort"])
48 timeout: float = 0.0 # 0 = no timeout
49 created_at: float = field(default_factory=time.time)
50 resolved_at: float = 0.0
51 decision: HumanDecision | None = None
52 feedback: str = ""
53 resolved: bool = False
55 def to_dict(self) -> dict:
56 return {
57 "id": self.id,
58 "type": self.type.value,
59 "task_id": self.task_id,
60 "message": self.message,
61 "options": self.options,
62 "resolved": self.resolved,
63 "decision": self.decision.value if self.decision else None,
64 }
67@dataclass
68class HITLConfig:
69 """Configuration for human-in-the-loop behavior."""
71 enabled: bool = True
72 break_on_failure: bool = True
73 break_on_low_confidence: float = 0.3 # confidence below this triggers break
74 break_on_first_task: bool = False # break before first sub-task
75 break_on_every_task: bool = False
76 break_on_final_result: bool = False # break before returning final result
77 max_pending_breakpoints: int = 5 # queue limit
78 default_timeout: float = 300.0 # 5 min default
81class HITLManager:
82 """Manages human-in-the-loop breakpoints during task execution.
84 Usage:
85 hitl = HITLManager(config=HITLConfig(break_on_failure=True))
87 # Register a callback for human input
88 hitl.register_handler(my_human_input_function)
90 # During execution:
91 decision = await hitl.request_decision(
92 bp_type=BreakpointType.ON_FAILURE,
93 task_id="task_1",
94 message="Task failed. Retry?",
95 context={"error": "...", "attempts": 2}
96 )
97 if decision == HumanDecision.RETRY:
98 ...
99 """
101 def __init__(
102 self,
103 config: HITLConfig | None = None,
104 handler: Callable | None = None,
105 ):
106 self.config = config or HITLConfig()
107 self._handler = handler
108 self._breakpoints: dict[str, Breakpoint] = {}
109 self._pending: list[Breakpoint] = []
110 self._decision_queue: asyncio.Queue = asyncio.Queue()
112 def register_handler(self, handler: Callable[[Breakpoint], HumanDecision]) -> None:
113 """
114 Register a human input handler.
116 Args:
117 handler: Callable that receives a Breakpoint and returns a HumanDecision.
118 Can be sync or async.
119 """
120 self._handler = handler
122 async def request_decision(
123 self,
124 bp_type: BreakpointType,
125 task_id: str,
126 message: str,
127 context: dict | None = None,
128 timeout: float | None = None,
129 options: list[str] | None = None,
130 ) -> tuple[HumanDecision, str]:
131 """
132 Pause execution and request human decision.
134 Args:
135 bp_type: Type of breakpoint
136 task_id: Current task identifier
137 message: Human-readable message explaining what's needed
138 context: Additional context for the decision
139 timeout: Max wait time (None = use config default)
140 options: Available decision options
142 Returns:
143 Tuple of (decision, feedback text)
144 """
145 if not self.config.enabled:
146 return HumanDecision.APPROVE, ""
148 bp = Breakpoint(
149 type=bp_type,
150 task_id=task_id,
151 context=context or {},
152 message=message,
153 options=options or ["approve", "reject", "retry", "abort"],
154 timeout=timeout or self.config.default_timeout,
155 )
157 self._breakpoints[bp.id] = bp
158 self._pending.append(bp)
160 # If pending exceeds limit, auto-approve oldest
161 if len(self._pending) > self.config.max_pending_breakpoints:
162 oldest = self._pending.pop(0)
163 oldest.decision = HumanDecision.APPROVE
164 oldest.resolved = True
165 oldest.resolved_at = time.time()
167 # Call handler
168 if self._handler:
169 try:
170 result = self._handler(bp)
171 if asyncio.iscoroutine(result):
172 result = await result
173 if isinstance(result, HumanDecision):
174 bp.decision = result
175 elif isinstance(result, tuple) and len(result) == 2:
176 bp.decision, bp.feedback = result
177 else:
178 bp.decision = HumanDecision.APPROVE
179 except Exception:
180 bp.decision = HumanDecision.APPROVE
181 else:
182 # No handler — wait on queue
183 try:
184 decision, feedback = await asyncio.wait_for(
185 self._decision_queue.get(),
186 timeout=bp.timeout,
187 )
188 bp.decision = decision
189 bp.feedback = feedback
190 except asyncio.TimeoutError:
191 bp.decision = HumanDecision.APPROVE
193 bp.resolved = True
194 bp.resolved_at = time.time()
196 # Remove from pending
197 if bp in self._pending:
198 self._pending.remove(bp)
200 return bp.decision, bp.feedback
202 def provide_decision(
203 self,
204 breakpoint_id: str,
205 decision: HumanDecision,
206 feedback: str = "",
207 ) -> None:
208 """Provide a decision for a pending breakpoint (alternative to handler)."""
209 if breakpoint_id in self._breakpoints:
210 bp = self._breakpoints[breakpoint_id]
211 self._decision_queue.put_nowait((decision, feedback))
213 async def should_break_before_task(
214 self, task_id: str, task_name: str
215 ) -> bool:
216 """Check if we should break before a sub-task."""
217 if not self.config.enabled:
218 return False
219 if self.config.break_on_first_task or self.config.break_on_every_task:
220 decision, _ = await self.request_decision(
221 bp_type=BreakpointType.BEFORE_TASK,
222 task_id=task_id,
223 message=f"About to execute: {task_name}\nProceed?",
224 options=["approve", "abort", "modify"],
225 )
226 if decision == HumanDecision.ABORT:
227 return False
228 return True
230 async def should_break_on_result(
231 self, task_id: str, output: Any, confidence: float
232 ) -> tuple[HumanDecision, str]:
233 """Check if we should break after a result."""
234 if not self.config.enabled:
235 return HumanDecision.APPROVE, ""
237 # Low confidence trigger
238 if confidence < self.config.break_on_low_confidence:
239 return await self.request_decision(
240 bp_type=BreakpointType.ON_LOW_CONFIDENCE,
241 task_id=task_id,
242 message=(
243 f"Low confidence result (confidence: {confidence:.2f})\n"
244 f"Output: {str(output)[:300]}\n"
245 f"What would you like to do?"
246 ),
247 context={"confidence": confidence, "output": str(output)[:500]},
248 options=["approve", "retry", "modify", "abort"],
249 )
251 # Final result break
252 if self.config.break_on_final_result:
253 return await self.request_decision(
254 bp_type=BreakpointType.AFTER_RESULT,
255 task_id=task_id,
256 message=f"Result: {str(output)[:300]}\nApprove?",
257 context={"output": str(output)[:500]},
258 options=["approve", "retry", "modify"],
259 )
261 return HumanDecision.APPROVE, ""
263 async def should_break_on_failure(
264 self, task_id: str, error: str, attempt: int
265 ) -> tuple[HumanDecision, str]:
266 """Check if we should break on failure."""
267 if not self.config.enabled or not self.config.break_on_failure:
268 return HumanDecision.RETRY, ""
270 return await self.request_decision(
271 bp_type=BreakpointType.ON_FAILURE,
272 task_id=task_id,
273 message=(
274 f"Task failed (attempt {attempt})\n"
275 f"Error: {error[:300]}\n"
276 f"Retry, skip, or abort?"
277 ),
278 context={"error": error, "attempt": attempt},
279 options=["retry", "abort", "modify"],
280 )
282 @property
283 def pending_count(self) -> int:
284 return len(self._pending)
286 @property
287 def total_breakpoints(self) -> int:
288 return len(self._breakpoints)