Coverage for agentos/swarm/agent_monitor.py: 27%

217 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2v1.9.6: Agent Self-Monitoring & Quality Gates. 

3 

4Each agent execution passes through configurable quality checks before 

5results are accepted. Failed checks trigger automatic fallback or retry. 

6""" 

7 

8from __future__ import annotations 

9 

10import time 

11import uuid 

12from dataclasses import dataclass, field 

13from enum import Enum 

14from typing import Any, Callable, Optional 

15 

16 

17class GateStatus(str, Enum): 

18 PASS = "pass" 

19 FAIL = "fail" 

20 WARN = "warn" 

21 SKIP = "skip" 

22 

23 

24class GateAction(str, Enum): 

25 ACCEPT = "accept" # Accept result as-is 

26 RETRY = "retry" # Retry the task 

27 FALLBACK = "fallback" # Use fallback result 

28 ABORT = "abort" # Abort the task 

29 WARN = "warn" # Accept but flag warning 

30 

31 

32@dataclass 

33class GateResult: 

34 """Result of a single quality gate check.""" 

35 

36 name: str 

37 status: GateStatus = GateStatus.PASS 

38 action: GateAction = GateAction.ACCEPT 

39 score: float = 1.0 

40 threshold: float = 0.7 

41 detail: str = "" 

42 timestamp: float = field(default_factory=time.time) 

43 

44 def to_dict(self) -> dict: 

45 return { 

46 "name": self.name, 

47 "status": self.status.value, 

48 "action": self.action.value, 

49 "score": self.score, 

50 "threshold": self.threshold, 

51 "detail": self.detail, 

52 } 

53 

54 

55@dataclass 

56class MonitorReport: 

57 """Complete self-monitoring report for a task execution.""" 

58 

59 task_id: str = field(default_factory=lambda: uuid.uuid4().hex[:8]) 

60 task_name: str = "" 

61 gates: list[GateResult] = field(default_factory=list) 

62 overall_status: GateStatus = GateStatus.PASS 

63 overall_action: GateAction = GateAction.ACCEPT 

64 total_checks: int = 0 

65 passed: int = 0 

66 failed: int = 0 

67 warned: int = 0 

68 duration_ms: float = 0.0 

69 retries_used: int = 0 

70 max_retries: int = 3 

71 fallback_used: bool = False 

72 output_summary: str = "" 

73 

74 def to_dict(self) -> dict: 

75 return { 

76 "task_id": self.task_id, 

77 "task_name": self.task_name, 

78 "gates": [g.to_dict() for g in self.gates], 

79 "overall_status": self.overall_status.value, 

80 "overall_action": self.overall_action.value, 

81 "total_checks": self.total_checks, 

82 "passed": self.passed, 

83 "failed": self.failed, 

84 "warned": self.warned, 

85 "duration_ms": f"{self.duration_ms:.1f}", 

86 "retries_used": self.retries_used, 

87 "fallback_used": self.fallback_used, 

88 "output_summary": self.output_summary[:200], 

89 } 

90 

91 

92class QualityGate: 

93 """A single quality check that validates agent output. 

94 

95 Built-in gate types: output_not_empty, output_length, confidence_min, 

96 schema_valid, no_error, latency_max. 

97 """ 

98 

99 def __init__( 

100 self, 

101 name: str, 

102 check_fn: Callable[[Any, dict], tuple[bool, str, float]], 

103 threshold: float = 0.7, 

104 on_fail: GateAction = GateAction.RETRY, 

105 on_warn: GateAction = GateAction.ACCEPT, 

106 max_retries: int = 1, 

107 ): 

108 """ 

109 Args: 

110 name: Gate name for reporting 

111 check_fn: (output, context) → (passed, detail, score) 

112 threshold: Score threshold for pass (0-1) 

113 on_fail: Action when gate fails 

114 on_warn: Action when gate warns 

115 max_retries: Max retries for this specific gate 

116 """ 

117 self.name = name 

118 self._check = check_fn 

119 self.threshold = threshold 

120 self.on_fail = on_fail 

121 self.on_warn = on_warn 

122 self.max_retries = max_retries 

123 

124 def evaluate(self, output: Any, context: dict | None = None) -> GateResult: 

125 """Run the quality check.""" 

126 ctx = context or {} 

127 try: 

128 passed, detail, score = self._check(output, ctx) 

129 except Exception as e: 

130 return GateResult( 

131 name=self.name, 

132 status=GateStatus.FAIL, 

133 action=self.on_fail, 

134 score=0.0, 

135 threshold=self.threshold, 

136 detail=f"Gate check error: {e}", 

137 ) 

138 

139 if score >= self.threshold and passed: 

140 return GateResult( 

141 name=self.name, 

142 status=GateStatus.PASS, 

143 action=GateAction.ACCEPT, 

144 score=score, 

145 threshold=self.threshold, 

146 detail=detail, 

147 ) 

148 elif score >= self.threshold * 0.6: 

149 return GateResult( 

150 name=self.name, 

151 status=GateStatus.WARN, 

152 action=self.on_warn, 

153 score=score, 

154 threshold=self.threshold, 

155 detail=detail, 

156 ) 

157 else: 

158 return GateResult( 

159 name=self.name, 

160 status=GateStatus.FAIL, 

161 action=self.on_fail, 

162 score=score, 

163 threshold=self.threshold, 

164 detail=detail, 

165 ) 

166 

167 

168class AgentMonitor: 

169 """ 

170 Self-monitoring pipeline for agent task execution. 

171 

172 Runs each task output through a chain of quality gates. Based on gate results, 

173 decides whether to accept, retry, fallback, or abort. 

174 

175 Usage: 

176 monitor = AgentMonitor() 

177 monitor.add_gate(output_not_empty_gate) 

178 monitor.add_gate(confidence_min_gate) 

179 

180 result = await monitor.monitor_execution( 

181 task_fn=lambda: agent.run(task), 

182 task_name="research_query", 

183 ) 

184 if result.overall_action == GateAction.ACCEPT: 

185 ... 

186 """ 

187 

188 def __init__(self, max_retries: int = 3, default_fallback: Any = None): 

189 self._gates: list[QualityGate] = [] 

190 self.max_retries = max_retries 

191 self.default_fallback = default_fallback 

192 

193 def add_gate(self, gate: QualityGate) -> AgentMonitor: 

194 """Add a quality gate to the pipeline.""" 

195 self._gates.append(gate) 

196 return self 

197 

198 def add_gates(self, gates: list[QualityGate]) -> AgentMonitor: 

199 """Add multiple quality gates.""" 

200 self._gates.extend(gates) 

201 return self 

202 

203 async def monitor_execution( 

204 self, 

205 task_fn: Callable[[], Any], 

206 task_name: str = "", 

207 context: dict | None = None, 

208 fallback_fn: Callable[[], Any] | None = None, 

209 ) -> tuple[Any, MonitorReport]: 

210 """Execute a task with full monitoring and quality gating. 

211 

212 Args: 

213 task_fn: Async/sync function that executes the task 

214 context: Additional context for gate evaluation 

215 fallback_fn: Fallback function to call if gates fail with FALLBACK action 

216 

217 Returns: 

218 Tuple of (final_output, monitor_report) 

219 """ 

220 import asyncio 

221 

222 report = MonitorReport(task_name=task_name) 

223 ctx = context or {} 

224 start = time.time() 

225 

226 output = None 

227 retries = 0 

228 

229 while retries <= self.max_retries: 

230 # Execute task 

231 try: 

232 result = task_fn() 

233 if asyncio.iscoroutine(result): 

234 output = await result 

235 else: 

236 output = result 

237 except Exception as e: 

238 report.gates.append(GateResult( 

239 name="execution_error", 

240 status=GateStatus.FAIL, 

241 action=GateAction.RETRY, 

242 score=0.0, 

243 detail=str(e), 

244 )) 

245 retries += 1 

246 if retries > self.max_retries: 

247 report.overall_status = GateStatus.FAIL 

248 report.overall_action = GateAction.FALLBACK 

249 break 

250 continue 

251 

252 # Run gates 

253 report.gates = [] 

254 any_fail = False 

255 worst_action = GateAction.ACCEPT 

256 action_prio = { 

257 GateAction.ACCEPT: 0, 

258 GateAction.WARN: 1, 

259 GateAction.RETRY: 2, 

260 GateAction.FALLBACK: 3, 

261 GateAction.ABORT: 4, 

262 } 

263 

264 for gate in self._gates: 

265 gr = gate.evaluate(output, ctx) 

266 report.gates.append(gr) 

267 

268 if gr.status == GateStatus.FAIL: 

269 any_fail = True 

270 if action_prio.get(gr.action, 0) > action_prio.get(worst_action, 0): 

271 worst_action = gr.action 

272 

273 # Tally 

274 report.total_checks = len(report.gates) 

275 report.passed = sum(1 for g in report.gates if g.status == GateStatus.PASS) 

276 report.failed = sum(1 for g in report.gates if g.status == GateStatus.FAIL) 

277 report.warned = sum(1 for g in report.gates if g.status == GateStatus.WARN) 

278 

279 if not any_fail: 

280 report.overall_status = GateStatus.PASS 

281 report.overall_action = worst_action 

282 report.retries_used = retries 

283 report.output_summary = self._summarize(output) 

284 report.duration_ms = (time.time() - start) * 1000 

285 return output, report 

286 

287 # Handle failure 

288 if worst_action == GateAction.ABORT: 

289 report.overall_status = GateStatus.FAIL 

290 report.overall_action = GateAction.ABORT 

291 report.retries_used = retries 

292 report.duration_ms = (time.time() - start) * 1000 

293 return output, report 

294 

295 if worst_action == GateAction.FALLBACK: 

296 report.overall_status = GateStatus.FAIL 

297 report.overall_action = GateAction.FALLBACK 

298 report.retries_used = retries 

299 report.fallback_used = True 

300 report.duration_ms = (time.time() - start) * 1000 

301 

302 if fallback_fn: 

303 fb_result = fallback_fn() 

304 if asyncio.iscoroutine(fb_result): 

305 output = await fb_result 

306 else: 

307 output = fb_result 

308 elif self.default_fallback is not None: 

309 output = self.default_fallback 

310 

311 report.output_summary = self._summarize(output) 

312 return output, report 

313 

314 # RETRY or WARN — continue loop 

315 retries += 1 

316 

317 # Exhausted retries 

318 report.overall_status = GateStatus.FAIL if report.failed > 0 else GateStatus.WARN 

319 report.overall_action = GateAction.FALLBACK 

320 report.retries_used = retries 

321 report.duration_ms = (time.time() - start) * 1000 

322 

323 if fallback_fn: 

324 fb_result = fallback_fn() 

325 if asyncio.iscoroutine(fb_result): 

326 output = await fb_result 

327 else: 

328 output = fb_result 

329 elif self.default_fallback is not None: 

330 output = self.default_fallback 

331 

332 report.output_summary = self._summarize(output) 

333 return output, report 

334 

335 def _summarize(self, output: Any) -> str: 

336 """Create a brief summary of output for reporting.""" 

337 if output is None: 

338 return "None" 

339 s = str(output) 

340 if len(s) > 200: 

341 return s[:197] + "..." 

342 return s 

343 

344 

345# ── Built-in Quality Gates ──────────────────────────────────────── 

346 

347def output_not_empty( 

348 min_length: int = 1, 

349 threshold: float = 0.9, 

350) -> QualityGate: 

351 """Gate: output must not be empty.""" 

352 def check(output: Any, ctx: dict) -> tuple[bool, str, float]: 

353 s = str(output).strip() if output else "" 

354 score = min(1.0, len(s) / max(min_length, 1)) 

355 if not s: 

356 return False, "Output is empty", 0.0 

357 if len(s) < min_length: 

358 return False, f"Output too short ({len(s)} < {min_length})", score 

359 return True, f"Output length {len(s)} OK", 1.0 

360 return QualityGate("output_not_empty", check, threshold, on_fail=GateAction.RETRY) 

361 

362 

363def output_length_range( 

364 min_len: int = 10, 

365 max_len: int = 10000, 

366 threshold: float = 0.8, 

367) -> QualityGate: 

368 """Gate: output length must be in range.""" 

369 def check(output: Any, ctx: dict) -> tuple[bool, str, float]: 

370 s = str(output).strip() if output else "" 

371 length = len(s) 

372 if length < min_len: 

373 return False, f"Output too short: {length} < {min_len}", length / max(min_len, 1) 

374 if length > max_len: 

375 return False, f"Output too long: {length} > {max_len}", max_len / length 

376 return True, f"Output length {length} OK", 1.0 

377 return QualityGate("output_length", check, threshold, on_fail=GateAction.WARN) 

378 

379 

380def no_error_output(threshold: float = 0.95) -> QualityGate: 

381 """Gate: output must not contain error/exception patterns.""" 

382 ERROR_PATTERNS = [ 

383 "Traceback (most recent call last)", 

384 "Error:", 

385 "Exception:", 

386 "failed to", 

387 "cannot be", 

388 "invalid", 

389 "permission denied", 

390 ] 

391 

392 def check(output: Any, ctx: dict) -> tuple[bool, str, float]: 

393 s = str(output).lower() 

394 hits = [p for p in ERROR_PATTERNS if p.lower() in s] 

395 if hits: 

396 score = 1.0 - (len(hits) / len(ERROR_PATTERNS)) 

397 return False, f"Error patterns found: {hits[:3]}", max(0, score) 

398 return True, "No error patterns", 1.0 

399 return QualityGate("no_error", check, threshold, on_fail=GateAction.RETRY) 

400 

401 

402def contains_keywords( 

403 keywords: list[str], 

404 min_hits: int = 1, 

405 threshold: float = 0.7, 

406) -> QualityGate: 

407 """Gate: output must contain at least N keywords.""" 

408 def check(output: Any, ctx: dict) -> tuple[bool, str, float]: 

409 s = str(output).lower() 

410 hits = [kw for kw in keywords if kw.lower() in s] 

411 score = min(1.0, len(hits) / max(min_hits, 1)) 

412 if len(hits) < min_hits: 

413 missing = [kw for kw in keywords if kw.lower() not in s] 

414 return False, f"Missing keywords: {missing[:5]}", score 

415 return True, f"Found {len(hits)}/{len(keywords)} keywords", 1.0 

416 return QualityGate("keywords", check, threshold, on_fail=GateAction.WARN) 

417 

418 

419def latency_max(max_ms: float, threshold: float = 0.9) -> QualityGate: 

420 """Gate: execution must complete within time limit (ms).""" 

421 def check(output: Any, ctx: dict) -> tuple[bool, str, float]: 

422 elapsed = ctx.get("_latency_ms", 0) 

423 score = max(0, 1.0 - (elapsed / max_ms)) 

424 if elapsed > max_ms: 

425 return False, f"Latency {elapsed:.0f}ms > {max_ms}ms", score 

426 return True, f"Latency {elapsed:.0f}ms OK", 1.0 

427 return QualityGate("latency", check, threshold, on_fail=GateAction.WARN) 

428 

429 

430def confidence_min(min_confidence: float = 0.5, threshold: float = 0.8) -> QualityGate: 

431 """Gate: fused confidence must meet minimum.""" 

432 def check(output: Any, ctx: dict) -> tuple[bool, str, float]: 

433 confidence = ctx.get("_confidence", 0.0) 

434 score = min(1.0, confidence / max(min_confidence, 0.01)) 

435 if confidence < min_confidence: 

436 return False, f"Confidence {confidence:.2f} < {min_confidence}", score 

437 return True, f"Confidence {confidence:.2f} OK", 1.0 

438 return QualityGate("confidence", check, threshold, on_fail=GateAction.RETRY)