Coverage for agentos/swarm/agent_monitor.py: 27%
217 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2v1.9.6: Agent Self-Monitoring & Quality Gates.
4Each agent execution passes through configurable quality checks before
5results are accepted. Failed checks trigger automatic fallback or retry.
6"""
8from __future__ import annotations
10import time
11import uuid
12from dataclasses import dataclass, field
13from enum import Enum
14from typing import Any, Callable, Optional
17class GateStatus(str, Enum):
18 PASS = "pass"
19 FAIL = "fail"
20 WARN = "warn"
21 SKIP = "skip"
24class GateAction(str, Enum):
25 ACCEPT = "accept" # Accept result as-is
26 RETRY = "retry" # Retry the task
27 FALLBACK = "fallback" # Use fallback result
28 ABORT = "abort" # Abort the task
29 WARN = "warn" # Accept but flag warning
32@dataclass
33class GateResult:
34 """Result of a single quality gate check."""
36 name: str
37 status: GateStatus = GateStatus.PASS
38 action: GateAction = GateAction.ACCEPT
39 score: float = 1.0
40 threshold: float = 0.7
41 detail: str = ""
42 timestamp: float = field(default_factory=time.time)
44 def to_dict(self) -> dict:
45 return {
46 "name": self.name,
47 "status": self.status.value,
48 "action": self.action.value,
49 "score": self.score,
50 "threshold": self.threshold,
51 "detail": self.detail,
52 }
55@dataclass
56class MonitorReport:
57 """Complete self-monitoring report for a task execution."""
59 task_id: str = field(default_factory=lambda: uuid.uuid4().hex[:8])
60 task_name: str = ""
61 gates: list[GateResult] = field(default_factory=list)
62 overall_status: GateStatus = GateStatus.PASS
63 overall_action: GateAction = GateAction.ACCEPT
64 total_checks: int = 0
65 passed: int = 0
66 failed: int = 0
67 warned: int = 0
68 duration_ms: float = 0.0
69 retries_used: int = 0
70 max_retries: int = 3
71 fallback_used: bool = False
72 output_summary: str = ""
74 def to_dict(self) -> dict:
75 return {
76 "task_id": self.task_id,
77 "task_name": self.task_name,
78 "gates": [g.to_dict() for g in self.gates],
79 "overall_status": self.overall_status.value,
80 "overall_action": self.overall_action.value,
81 "total_checks": self.total_checks,
82 "passed": self.passed,
83 "failed": self.failed,
84 "warned": self.warned,
85 "duration_ms": f"{self.duration_ms:.1f}",
86 "retries_used": self.retries_used,
87 "fallback_used": self.fallback_used,
88 "output_summary": self.output_summary[:200],
89 }
92class QualityGate:
93 """A single quality check that validates agent output.
95 Built-in gate types: output_not_empty, output_length, confidence_min,
96 schema_valid, no_error, latency_max.
97 """
99 def __init__(
100 self,
101 name: str,
102 check_fn: Callable[[Any, dict], tuple[bool, str, float]],
103 threshold: float = 0.7,
104 on_fail: GateAction = GateAction.RETRY,
105 on_warn: GateAction = GateAction.ACCEPT,
106 max_retries: int = 1,
107 ):
108 """
109 Args:
110 name: Gate name for reporting
111 check_fn: (output, context) → (passed, detail, score)
112 threshold: Score threshold for pass (0-1)
113 on_fail: Action when gate fails
114 on_warn: Action when gate warns
115 max_retries: Max retries for this specific gate
116 """
117 self.name = name
118 self._check = check_fn
119 self.threshold = threshold
120 self.on_fail = on_fail
121 self.on_warn = on_warn
122 self.max_retries = max_retries
124 def evaluate(self, output: Any, context: dict | None = None) -> GateResult:
125 """Run the quality check."""
126 ctx = context or {}
127 try:
128 passed, detail, score = self._check(output, ctx)
129 except Exception as e:
130 return GateResult(
131 name=self.name,
132 status=GateStatus.FAIL,
133 action=self.on_fail,
134 score=0.0,
135 threshold=self.threshold,
136 detail=f"Gate check error: {e}",
137 )
139 if score >= self.threshold and passed:
140 return GateResult(
141 name=self.name,
142 status=GateStatus.PASS,
143 action=GateAction.ACCEPT,
144 score=score,
145 threshold=self.threshold,
146 detail=detail,
147 )
148 elif score >= self.threshold * 0.6:
149 return GateResult(
150 name=self.name,
151 status=GateStatus.WARN,
152 action=self.on_warn,
153 score=score,
154 threshold=self.threshold,
155 detail=detail,
156 )
157 else:
158 return GateResult(
159 name=self.name,
160 status=GateStatus.FAIL,
161 action=self.on_fail,
162 score=score,
163 threshold=self.threshold,
164 detail=detail,
165 )
168class AgentMonitor:
169 """
170 Self-monitoring pipeline for agent task execution.
172 Runs each task output through a chain of quality gates. Based on gate results,
173 decides whether to accept, retry, fallback, or abort.
175 Usage:
176 monitor = AgentMonitor()
177 monitor.add_gate(output_not_empty_gate)
178 monitor.add_gate(confidence_min_gate)
180 result = await monitor.monitor_execution(
181 task_fn=lambda: agent.run(task),
182 task_name="research_query",
183 )
184 if result.overall_action == GateAction.ACCEPT:
185 ...
186 """
188 def __init__(self, max_retries: int = 3, default_fallback: Any = None):
189 self._gates: list[QualityGate] = []
190 self.max_retries = max_retries
191 self.default_fallback = default_fallback
193 def add_gate(self, gate: QualityGate) -> AgentMonitor:
194 """Add a quality gate to the pipeline."""
195 self._gates.append(gate)
196 return self
198 def add_gates(self, gates: list[QualityGate]) -> AgentMonitor:
199 """Add multiple quality gates."""
200 self._gates.extend(gates)
201 return self
203 async def monitor_execution(
204 self,
205 task_fn: Callable[[], Any],
206 task_name: str = "",
207 context: dict | None = None,
208 fallback_fn: Callable[[], Any] | None = None,
209 ) -> tuple[Any, MonitorReport]:
210 """Execute a task with full monitoring and quality gating.
212 Args:
213 task_fn: Async/sync function that executes the task
214 context: Additional context for gate evaluation
215 fallback_fn: Fallback function to call if gates fail with FALLBACK action
217 Returns:
218 Tuple of (final_output, monitor_report)
219 """
220 import asyncio
222 report = MonitorReport(task_name=task_name)
223 ctx = context or {}
224 start = time.time()
226 output = None
227 retries = 0
229 while retries <= self.max_retries:
230 # Execute task
231 try:
232 result = task_fn()
233 if asyncio.iscoroutine(result):
234 output = await result
235 else:
236 output = result
237 except Exception as e:
238 report.gates.append(GateResult(
239 name="execution_error",
240 status=GateStatus.FAIL,
241 action=GateAction.RETRY,
242 score=0.0,
243 detail=str(e),
244 ))
245 retries += 1
246 if retries > self.max_retries:
247 report.overall_status = GateStatus.FAIL
248 report.overall_action = GateAction.FALLBACK
249 break
250 continue
252 # Run gates
253 report.gates = []
254 any_fail = False
255 worst_action = GateAction.ACCEPT
256 action_prio = {
257 GateAction.ACCEPT: 0,
258 GateAction.WARN: 1,
259 GateAction.RETRY: 2,
260 GateAction.FALLBACK: 3,
261 GateAction.ABORT: 4,
262 }
264 for gate in self._gates:
265 gr = gate.evaluate(output, ctx)
266 report.gates.append(gr)
268 if gr.status == GateStatus.FAIL:
269 any_fail = True
270 if action_prio.get(gr.action, 0) > action_prio.get(worst_action, 0):
271 worst_action = gr.action
273 # Tally
274 report.total_checks = len(report.gates)
275 report.passed = sum(1 for g in report.gates if g.status == GateStatus.PASS)
276 report.failed = sum(1 for g in report.gates if g.status == GateStatus.FAIL)
277 report.warned = sum(1 for g in report.gates if g.status == GateStatus.WARN)
279 if not any_fail:
280 report.overall_status = GateStatus.PASS
281 report.overall_action = worst_action
282 report.retries_used = retries
283 report.output_summary = self._summarize(output)
284 report.duration_ms = (time.time() - start) * 1000
285 return output, report
287 # Handle failure
288 if worst_action == GateAction.ABORT:
289 report.overall_status = GateStatus.FAIL
290 report.overall_action = GateAction.ABORT
291 report.retries_used = retries
292 report.duration_ms = (time.time() - start) * 1000
293 return output, report
295 if worst_action == GateAction.FALLBACK:
296 report.overall_status = GateStatus.FAIL
297 report.overall_action = GateAction.FALLBACK
298 report.retries_used = retries
299 report.fallback_used = True
300 report.duration_ms = (time.time() - start) * 1000
302 if fallback_fn:
303 fb_result = fallback_fn()
304 if asyncio.iscoroutine(fb_result):
305 output = await fb_result
306 else:
307 output = fb_result
308 elif self.default_fallback is not None:
309 output = self.default_fallback
311 report.output_summary = self._summarize(output)
312 return output, report
314 # RETRY or WARN — continue loop
315 retries += 1
317 # Exhausted retries
318 report.overall_status = GateStatus.FAIL if report.failed > 0 else GateStatus.WARN
319 report.overall_action = GateAction.FALLBACK
320 report.retries_used = retries
321 report.duration_ms = (time.time() - start) * 1000
323 if fallback_fn:
324 fb_result = fallback_fn()
325 if asyncio.iscoroutine(fb_result):
326 output = await fb_result
327 else:
328 output = fb_result
329 elif self.default_fallback is not None:
330 output = self.default_fallback
332 report.output_summary = self._summarize(output)
333 return output, report
335 def _summarize(self, output: Any) -> str:
336 """Create a brief summary of output for reporting."""
337 if output is None:
338 return "None"
339 s = str(output)
340 if len(s) > 200:
341 return s[:197] + "..."
342 return s
345# ── Built-in Quality Gates ────────────────────────────────────────
347def output_not_empty(
348 min_length: int = 1,
349 threshold: float = 0.9,
350) -> QualityGate:
351 """Gate: output must not be empty."""
352 def check(output: Any, ctx: dict) -> tuple[bool, str, float]:
353 s = str(output).strip() if output else ""
354 score = min(1.0, len(s) / max(min_length, 1))
355 if not s:
356 return False, "Output is empty", 0.0
357 if len(s) < min_length:
358 return False, f"Output too short ({len(s)} < {min_length})", score
359 return True, f"Output length {len(s)} OK", 1.0
360 return QualityGate("output_not_empty", check, threshold, on_fail=GateAction.RETRY)
363def output_length_range(
364 min_len: int = 10,
365 max_len: int = 10000,
366 threshold: float = 0.8,
367) -> QualityGate:
368 """Gate: output length must be in range."""
369 def check(output: Any, ctx: dict) -> tuple[bool, str, float]:
370 s = str(output).strip() if output else ""
371 length = len(s)
372 if length < min_len:
373 return False, f"Output too short: {length} < {min_len}", length / max(min_len, 1)
374 if length > max_len:
375 return False, f"Output too long: {length} > {max_len}", max_len / length
376 return True, f"Output length {length} OK", 1.0
377 return QualityGate("output_length", check, threshold, on_fail=GateAction.WARN)
380def no_error_output(threshold: float = 0.95) -> QualityGate:
381 """Gate: output must not contain error/exception patterns."""
382 ERROR_PATTERNS = [
383 "Traceback (most recent call last)",
384 "Error:",
385 "Exception:",
386 "failed to",
387 "cannot be",
388 "invalid",
389 "permission denied",
390 ]
392 def check(output: Any, ctx: dict) -> tuple[bool, str, float]:
393 s = str(output).lower()
394 hits = [p for p in ERROR_PATTERNS if p.lower() in s]
395 if hits:
396 score = 1.0 - (len(hits) / len(ERROR_PATTERNS))
397 return False, f"Error patterns found: {hits[:3]}", max(0, score)
398 return True, "No error patterns", 1.0
399 return QualityGate("no_error", check, threshold, on_fail=GateAction.RETRY)
402def contains_keywords(
403 keywords: list[str],
404 min_hits: int = 1,
405 threshold: float = 0.7,
406) -> QualityGate:
407 """Gate: output must contain at least N keywords."""
408 def check(output: Any, ctx: dict) -> tuple[bool, str, float]:
409 s = str(output).lower()
410 hits = [kw for kw in keywords if kw.lower() in s]
411 score = min(1.0, len(hits) / max(min_hits, 1))
412 if len(hits) < min_hits:
413 missing = [kw for kw in keywords if kw.lower() not in s]
414 return False, f"Missing keywords: {missing[:5]}", score
415 return True, f"Found {len(hits)}/{len(keywords)} keywords", 1.0
416 return QualityGate("keywords", check, threshold, on_fail=GateAction.WARN)
419def latency_max(max_ms: float, threshold: float = 0.9) -> QualityGate:
420 """Gate: execution must complete within time limit (ms)."""
421 def check(output: Any, ctx: dict) -> tuple[bool, str, float]:
422 elapsed = ctx.get("_latency_ms", 0)
423 score = max(0, 1.0 - (elapsed / max_ms))
424 if elapsed > max_ms:
425 return False, f"Latency {elapsed:.0f}ms > {max_ms}ms", score
426 return True, f"Latency {elapsed:.0f}ms OK", 1.0
427 return QualityGate("latency", check, threshold, on_fail=GateAction.WARN)
430def confidence_min(min_confidence: float = 0.5, threshold: float = 0.8) -> QualityGate:
431 """Gate: fused confidence must meet minimum."""
432 def check(output: Any, ctx: dict) -> tuple[bool, str, float]:
433 confidence = ctx.get("_confidence", 0.0)
434 score = min(1.0, confidence / max(min_confidence, 0.01))
435 if confidence < min_confidence:
436 return False, f"Confidence {confidence:.2f} < {min_confidence}", score
437 return True, f"Confidence {confidence:.2f} OK", 1.0
438 return QualityGate("confidence", check, threshold, on_fail=GateAction.RETRY)