Coverage for agentos/hitl/gradio_ui.py: 34%

267 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS v1.14.2 — Dynamic HITL UI (Gradio-based Approval Dashboard). 

3 

4受 LangGraph Studio / AutoGen UI 启发,在已有 HITL 审批引擎之上 

5增加 Gradio 驱动的响应式审批面板。Agent 遇到高风险操作时, 

6自动弹出 Web UI 而非阻塞终端。 

7 

8Core features: 

9- GradioApp: 一键启动的审批 Dashboard 

10- ApprovalQueue: 实时审批队列,WebSocket 推送 

11- AgentStatusPanel: Agent 状态监控面板 

12- ApprovalCard: 可定制的审批卡片组件 

13- HistoryView: 审批历史追溯 

14- PolicyEditor: 可视化策略编辑器 

15 

16与 hitl/approver.py 的关系: 

17- approver.py: 审批引擎(决策逻辑、风险评级、策略执行) 

18- gradio_ui.py: 交互层(Web UI、实时推送、可视化配置) 

19""" 

20 

21from __future__ import annotations 

22 

23import asyncio 

24import json 

25import time 

26import uuid 

27import threading 

28import queue 

29from dataclasses import dataclass, field 

30from datetime import datetime 

31from enum import Enum 

32from pathlib import Path 

33from typing import ( 

34 Any, Callable, Dict, List, Optional, Set, Tuple, Union, 

35) 

36 

37 

38# ── UI Data Models ────────────────────────── 

39 

40 

41class ApprovalStatus(str, Enum): 

42 PENDING = "pending" 

43 APPROVED = "approved" 

44 DENIED = "denied" 

45 EXPIRED = "expired" 

46 CANCELLED = "cancelled" 

47 

48 

49class RiskLevelUI(str, Enum): 

50 SAFE = "safe" 

51 LOW = "low" 

52 MEDIUM = "medium" 

53 HIGH = "high" 

54 CRITICAL = "critical" 

55 

56 

57@dataclass 

58class ApprovalRequestUI: 

59 """UI 层的审批请求,与底层 HITL 解耦。""" 

60 

61 request_id: str = field(default_factory=lambda: f"apr-{uuid.uuid4().hex[:8]}") 

62 agent_name: str = "" 

63 action: str = "" # 人类可读的操作描述 

64 details: str = "" # 详细说明 

65 risk_level: RiskLevelUI = RiskLevelUI.MEDIUM 

66 status: ApprovalStatus = ApprovalStatus.PENDING 

67 created_at: float = field(default_factory=time.time) 

68 expires_at: float = 0.0 # 超时自动拒绝 

69 source_file: str = "" # 触发操作的文件/代码位置 

70 metadata: Dict[str, Any] = field(default_factory=dict) 

71 # Callback when approved/denied 

72 on_decision: Optional[Callable] = None 

73 

74 @property 

75 def elapsed_seconds(self) -> float: 

76 return time.time() - self.created_at 

77 

78 @property 

79 def is_expired(self) -> bool: 

80 if self.expires_at <= 0: 

81 return False 

82 return time.time() > self.expires_at 

83 

84 

85@dataclass 

86class ApprovalHistory: 

87 """审批历史记录。""" 

88 

89 request: ApprovalRequestUI 

90 decision: ApprovalStatus 

91 decided_by: str = "user" # "user" | "auto" | "timeout" 

92 reason: str = "" 

93 decided_at: float = field(default_factory=time.time) 

94 

95 def to_dict(self) -> dict: 

96 return { 

97 "request_id": self.request.request_id, 

98 "action": self.request.action, 

99 "risk_level": self.request.risk_level.value, 

100 "decision": self.decision.value, 

101 "decided_by": self.decided_by, 

102 "reason": self.reason, 

103 "created_at": self.request.created_at, 

104 "decided_at": self.decided_at, 

105 "elapsed_ms": int((self.decided_at - self.request.created_at) * 1000), 

106 } 

107 

108 

109@dataclass 

110class AgentStatusSnapshot: 

111 """Agent 运行状态快照。""" 

112 

113 agent_id: str = "" 

114 agent_name: str = "" 

115 status: str = "idle" # idle | running | waiting_approval | paused | error 

116 current_task: str = "" 

117 elapsed_seconds: float = 0.0 

118 pending_approvals: int = 0 

119 memory_fragments: int = 0 

120 last_error: str = "" 

121 

122 

123# ── Approval Queue ───────────────────────── 

124 

125 

126class ApprovalQueue: 

127 """线程安全的审批队列,支持 WebSocket 推送通知。 

128 

129 在 Gradio UI 与 Agent HITL 引擎之间架设实时通信桥梁。 

130 """ 

131 

132 def __init__(self, max_size: int = 100, default_timeout: float = 300.0): 

133 self._queue: queue.Queue = queue.Queue(maxsize=max_size) 

134 self._pending: Dict[str, ApprovalRequestUI] = {} 

135 self._history: List[ApprovalHistory] = [] 

136 self._subscribers: List[Callable] = [] # WebSocket callbacks 

137 self._default_timeout = default_timeout 

138 self._lock = threading.Lock() 

139 

140 def submit(self, request: ApprovalRequestUI) -> str: 

141 """提交审批请求。注册到队列并通知订阅者。""" 

142 if request.expires_at <= 0: 

143 request.expires_at = time.time() + self._default_timeout 

144 

145 with self._lock: 

146 self._pending[request.request_id] = request 

147 

148 self._notify_subscribers({ 

149 "event": "new_request", 

150 "request_id": request.request_id, 

151 "action": request.action, 

152 "risk_level": request.risk_level.value, 

153 "pending_count": len(self._pending), 

154 }) 

155 

156 return request.request_id 

157 

158 def decide(self, request_id: str, approved: bool, reason: str = "") -> Optional[ApprovalHistory]: 

159 """处理审批决定。""" 

160 with self._lock: 

161 request = self._pending.pop(request_id, None) 

162 

163 if request is None: 

164 return None 

165 

166 decision = ApprovalStatus.APPROVED if approved else ApprovalStatus.DENIED 

167 history = ApprovalHistory( 

168 request=request, 

169 decision=decision, 

170 reason=reason, 

171 ) 

172 

173 # Trigger callback 

174 if request.on_decision: 

175 try: 

176 request.on_decision(approved, reason) 

177 except Exception: 

178 pass 

179 

180 with self._lock: 

181 self._history.append(history) 

182 

183 self._notify_subscribers({ 

184 "event": "decision", 

185 "request_id": request_id, 

186 "approved": approved, 

187 "reason": reason, 

188 "pending_count": len(self._pending), 

189 }) 

190 

191 return history 

192 

193 def approve_all(self) -> int: 

194 """批量批准所有待处理请求。""" 

195 ids = list(self._pending.keys()) 

196 for rid in ids: 

197 self.decide(rid, True, "batch_approve") 

198 return len(ids) 

199 

200 def deny_all(self) -> int: 

201 """批量拒绝所有待处理请求。""" 

202 ids = list(self._pending.keys()) 

203 for rid in ids: 

204 self.decide(rid, False, "batch_deny") 

205 return len(ids) 

206 

207 def check_timeouts(self) -> int: 

208 """检查并自动拒绝超时请求。""" 

209 now = time.time() 

210 expired_ids = [] 

211 with self._lock: 

212 for rid, req in self._pending.items(): 

213 if req.is_expired: 

214 expired_ids.append(rid) 

215 

216 for rid in expired_ids: 

217 history = self.decide(rid, False, "timeout") 

218 if history: 

219 history.decided_by = "timeout" 

220 

221 return len(expired_ids) 

222 

223 @property 

224 def pending_requests(self) -> List[ApprovalRequestUI]: 

225 with self._lock: 

226 return list(self._pending.values()) 

227 

228 @property 

229 def pending_count(self) -> int: 

230 with self._lock: 

231 return len(self._pending) 

232 

233 @property 

234 def recent_history(self, limit: int = 50) -> List[ApprovalHistory]: 

235 with self._lock: 

236 return self._history[-limit:] 

237 

238 def subscribe(self, callback: Callable) -> None: 

239 """注册 WebSocket 通知回调。""" 

240 self._subscribers.append(callback) 

241 

242 def _notify_subscribers(self, data: dict) -> None: 

243 for cb in self._subscribers: 

244 try: 

245 cb(data) 

246 except Exception: 

247 pass 

248 

249 

250# ── Gradio Approval Dashboard ────────────── 

251 

252 

253class ApprovalDashboard: 

254 """Gradio 驱动的审批面板。 

255 

256 核心布局: 

257 - 顶部: Agent 状态栏 

258 - 左侧: 待审批队列 

259 - 右侧: 审批详情 + 历史 

260 - 底部: 批量操作按钮 

261 

262 Usage: 

263 dashboard = ApprovalDashboard(queue, port=7860) 

264 dashboard.launch() 

265 """ 

266 

267 def __init__( 

268 self, 

269 approval_queue: ApprovalQueue, 

270 port: int = 7860, 

271 title: str = "AgentOS — HITL Approval Dashboard", 

272 theme: str = "soft", 

273 auto_launch: bool = True, 

274 ): 

275 self._queue = approval_queue 

276 self._port = port 

277 self._title = title 

278 self._theme = theme 

279 self._auto_launch = auto_launch 

280 self._app = None 

281 self._agent_statuses: Dict[str, AgentStatusSnapshot] = {} 

282 self._selected_request_id: str = "" 

283 

284 def launch(self, share: bool = False) -> Any: 

285 """启动 Gradio 面板。 

286 

287 返回 Gradio Blocks 实例,可在 Jupyter 中内嵌或独立运行。 

288 """ 

289 try: 

290 import gradio as gr 

291 except ImportError: 

292 raise ImportError( 

293 "Gradio is required for ApprovalDashboard. " 

294 "Install with: pip install gradio>=4.0" 

295 ) 

296 

297 with gr.Blocks(title=self._title, theme=self._theme) as app: 

298 self._app = app 

299 self._build_ui(app) 

300 

301 if self._auto_launch: 

302 app.launch( 

303 server_port=self._port, 

304 share=share, 

305 prevent_thread_lock=False, 

306 ) 

307 

308 return app 

309 

310 def _build_ui(self, app: Any) -> None: 

311 """构建完整 UI 布局。""" 

312 import gradio as gr 

313 

314 # ── Header ── 

315 gr.Markdown( 

316 f"# {self._title}\n" 

317 f"### Real-time Human-in-the-Loop Approval Dashboard" 

318 ) 

319 

320 # ── Agent Status Bar ── 

321 with gr.Row(): 

322 self._agent_status_display = gr.HTML( 

323 value=self._render_agent_status_bar(), 

324 every=3.0, 

325 ) 

326 

327 # ── Main Layout ── 

328 with gr.Row(): 

329 # Left: Pending Queue 

330 with gr.Column(scale=1): 

331 gr.Markdown("### Pending Approvals") 

332 self._queue_list = gr.HTML( 

333 value=self._render_pending_queue(), 

334 every=2.0, 

335 ) 

336 with gr.Row(): 

337 self._btn_approve_all = gr.Button( 

338 "Approve All", variant="primary", size="sm" 

339 ) 

340 self._btn_deny_all = gr.Button( 

341 "Deny All", variant="stop", size="sm" 

342 ) 

343 

344 # Right: Detail + History 

345 with gr.Column(scale=2): 

346 with gr.Tabs(): 

347 with gr.TabItem("Approval Detail"): 

348 self._detail_view = gr.HTML( 

349 value="<p>Select a request from the queue...</p>" 

350 ) 

351 with gr.Row(): 

352 self._btn_approve = gr.Button( 

353 "Approve", variant="primary" 

354 ) 

355 self._btn_deny = gr.Button( 

356 "Deny", variant="stop" 

357 ) 

358 self._reason_input = gr.Textbox( 

359 label="Reason (optional)", 

360 placeholder="Why this decision...", 

361 ) 

362 

363 with gr.TabItem("History"): 

364 self._history_view = gr.HTML( 

365 value=self._render_history(), 

366 every=3.0, 

367 ) 

368 

369 with gr.TabItem("Policy"): 

370 self._policy_view = gr.HTML( 

371 value=self._render_policy_editor() 

372 ) 

373 

374 # ── Event Handlers ── 

375 self._btn_approve.click( 

376 fn=self._handle_approve, 

377 inputs=[self._reason_input], 

378 outputs=[self._detail_view, self._queue_list, self._history_view], 

379 ) 

380 self._btn_deny.click( 

381 fn=self._handle_deny, 

382 inputs=[self._reason_input], 

383 outputs=[self._detail_view, self._queue_list, self._history_view], 

384 ) 

385 self._btn_approve_all.click( 

386 fn=lambda: self._queue.approve_all(), 

387 outputs=[], 

388 ) 

389 self._btn_deny_all.click( 

390 fn=lambda: self._queue.deny_all(), 

391 outputs=[], 

392 ) 

393 

394 def _handle_approve(self, reason: str) -> Tuple[str, str, str]: 

395 if not self._selected_request_id: 

396 return self._detail_view, self._queue_list, self._history_view 

397 self._queue.decide(self._selected_request_id, True, reason) 

398 self._selected_request_id = "" 

399 return ( 

400 "<p>Select a request from the queue...</p>", 

401 self._render_pending_queue(), 

402 self._render_history(), 

403 ) 

404 

405 def _handle_deny(self, reason: str) -> Tuple[str, str, str]: 

406 if not self._selected_request_id: 

407 return self._detail_view, self._queue_list, self._history_view 

408 self._queue.decide(self._selected_request_id, False, reason) 

409 self._selected_request_id = "" 

410 return ( 

411 "<p>Select a request from the queue...</p>", 

412 self._render_pending_queue(), 

413 self._render_history(), 

414 ) 

415 

416 def update_agent_status(self, snapshot: AgentStatusSnapshot) -> None: 

417 """更新 Agent 状态(由外部 Agent loop 调用)。""" 

418 self._agent_statuses[snapshot.agent_id] = snapshot 

419 

420 def _render_agent_status_bar(self) -> str: 

421 """渲染 Agent 状态栏 HTML。""" 

422 if not self._agent_statuses: 

423 return ( 

424 '<div style="padding:12px;background:#f0f0f0;border-radius:8px;">' 

425 '<span style="color:#888;">No agents connected</span></div>' 

426 ) 

427 

428 rows = [] 

429 for agent_id, snap in self._agent_statuses.items(): 

430 status_color = { 

431 "idle": "#4CAF50", 

432 "running": "#2196F3", 

433 "waiting_approval": "#FF9800", 

434 "paused": "#9E9E9E", 

435 "error": "#F44336", 

436 }.get(snap.status, "#9E9E9E") 

437 

438 rows.append( 

439 f'<div style="display:inline-block;margin:4px 8px;padding:8px 12px;' 

440 f'background:#fff;border-radius:6px;border-left:4px solid {status_color};">' 

441 f'<b>{snap.agent_name}</b> ' 

442 f'<span style="color:{status_color};">● {snap.status}</span> ' 

443 f'| {snap.current_task[:30]} ' 

444 f'| {snap.pending_approvals} pending' 

445 f'</div>' 

446 ) 

447 

448 return ( 

449 '<div style="padding:12px;background:#f0f0f0;border-radius:8px;">' 

450 + "".join(rows) 

451 + "</div>" 

452 ) 

453 

454 def _render_pending_queue(self) -> str: 

455 """渲染待处理队列 HTML。""" 

456 pending = self._queue.pending_requests 

457 if not pending: 

458 return '<p style="color:#888;">No pending approvals</p>' 

459 

460 risk_colors = { 

461 "safe": "#4CAF50", 

462 "low": "#8BC34A", 

463 "medium": "#FF9800", 

464 "high": "#F44336", 

465 "critical": "#B71C1C", 

466 } 

467 

468 cards = [] 

469 for req in pending: 

470 color = risk_colors.get(req.risk_level.value, "#999") 

471 elapsed = int(req.elapsed_seconds) 

472 cards.append( 

473 f'<div onclick="selectRequest(\'{req.request_id}\')" ' 

474 f'style="cursor:pointer;margin:6px 0;padding:10px;' 

475 f'background:#fff;border-radius:6px;' 

476 f'border-left:4px solid {color};">' 

477 f'<div style="font-weight:bold;">{req.action[:60]}</div>' 

478 f'<div style="color:{color};font-size:0.85em;">' 

479 f'Risk: {req.risk_level.value} | Agent: {req.agent_name} | ' 

480 f'{elapsed}s ago</div>' 

481 f'</div>' 

482 ) 

483 

484 return "".join(cards) 

485 

486 def _render_history(self) -> str: 

487 """渲染审批历史。""" 

488 history = self._queue.recent_history(limit=30) 

489 if not history: 

490 return "<p>No history yet</p>" 

491 

492 rows = ["<table style='width:100%;border-collapse:collapse;'>"] 

493 rows.append( 

494 "<tr style='background:#eee;'><th>Time</th><th>Action</th>" 

495 "<th>Decision</th><th>By</th><th>Latency</th></tr>" 

496 ) 

497 for h in reversed(history): 

498 dt = datetime.fromtimestamp(h.decided_at).strftime("%H:%M:%S") 

499 decision_color = "#4CAF50" if h.decision == ApprovalStatus.APPROVED else "#F44336" 

500 rows.append( 

501 f"<tr><td>{dt}</td>" 

502 f"<td>{h.request.action[:40]}</td>" 

503 f"<td style='color:{decision_color};font-weight:bold;'>" 

504 f"{h.decision.value}</td>" 

505 f"<td>{h.decided_by}</td>" 

506 f"<td>{h.decided_at - h.request.created_at:.1f}s</td></tr>" 

507 ) 

508 rows.append("</table>") 

509 return "".join(rows) 

510 

511 def _render_policy_editor(self) -> str: 

512 """渲染策略编辑器(占位,可扩展为交互式表单)。""" 

513 return """ 

514 <div style="padding:16px;"> 

515 <h3>Approval Policy</h3> 

516 <p>Configure auto-approval thresholds by risk level:</p> 

517 <ul> 

518 <li><b>Safe/Low:</b> Auto-approve</li> 

519 <li><b>Medium:</b> Ask if confidence &lt; 90%</li> 

520 <li><b>High:</b> Always ask</li> 

521 <li><b>Critical:</b> Always ask + require 2FA</li> 

522 </ul> 

523 <p><i>Interactive policy editor coming in v1.14.3</i></p> 

524 </div> 

525 """ 

526 

527 @property 

528 def queue(self) -> ApprovalQueue: 

529 return self._queue 

530 

531 

532# ── Agent Integration Bridge ──────────────── 

533 

534 

535class HITLUIBridge: 

536 """连接 Agent HITL 引擎与 Gradio UI 的桥梁。 

537 

538 在 Agent loop 中使用: 

539 bridge = HITLUIBridge(queue, agent_id) 

540 bridge.send_approval_request(action="Delete file X", risk_level="high") 

541 # UI 弹出审批卡片,Agent 在此阻塞等待结果 

542 approved = await bridge.wait_for_decision(timeout=60) 

543 """ 

544 

545 def __init__( 

546 self, 

547 approval_queue: ApprovalQueue, 

548 agent_id: str = "", 

549 agent_name: str = "", 

550 ): 

551 self._queue = approval_queue 

552 self.agent_id = agent_id 

553 self.agent_name = agent_name 

554 self._decision_events: Dict[str, asyncio.Event] = {} 

555 self._decision_results: Dict[str, Tuple[bool, str]] = {} 

556 

557 async def send_approval_request( 

558 self, 

559 action: str, 

560 details: str = "", 

561 risk_level: str = "medium", 

562 source_file: str = "", 

563 timeout: float = 300.0, 

564 metadata: Optional[Dict[str, Any]] = None, 

565 ) -> Tuple[bool, str]: 

566 """发送审批请求到 UI,阻塞等待用户决定。 

567 

568 Returns: 

569 (approved: bool, reason: str) 

570 """ 

571 event = asyncio.Event() 

572 request_id = f"apr-{uuid.uuid4().hex[:8]}" 

573 self._decision_events[request_id] = event 

574 

575 def on_decision(approved: bool, reason: str) -> None: 

576 self._decision_results[request_id] = (approved, reason) 

577 event.set() 

578 

579 request = ApprovalRequestUI( 

580 request_id=request_id, 

581 agent_name=self.agent_name, 

582 action=action, 

583 details=details, 

584 risk_level=RiskLevelUI(risk_level), 

585 expires_at=time.time() + timeout, 

586 source_file=source_file, 

587 metadata=metadata or {}, 

588 on_decision=on_decision, 

589 ) 

590 

591 self._queue.submit(request) 

592 

593 # 等待用户决定或超时 

594 try: 

595 await asyncio.wait_for(event.wait(), timeout=timeout) 

596 result = self._decision_results.pop(request_id, (False, "timeout")) 

597 self._decision_events.pop(request_id, None) 

598 return result 

599 except asyncio.TimeoutError: 

600 self._queue.decide(request_id, False, "timeout") 

601 self._decision_events.pop(request_id, None) 

602 return (False, "timeout") 

603 

604 

605# ── Quick Launch ──────────────────────────── 

606 

607 

608def create_hitl_dashboard( 

609 port: int = 7860, 

610 theme: str = "soft", 

611 share: bool = False, 

612) -> Tuple[ApprovalDashboard, ApprovalQueue]: 

613 """一键创建并启动 HITL 审批面板。 

614 

615 Usage: 

616 dashboard, queue = create_hitl_dashboard(port=7860) 

617 # Agent 代码中: 

618 bridge = HITLUIBridge(queue, agent_name="FileAgent") 

619 approved, reason = await bridge.send_approval_request( 

620 action="Delete 50 files in /tmp/", 

621 risk_level="high", 

622 ) 

623 """ 

624 queue = ApprovalQueue() 

625 dashboard = ApprovalDashboard( 

626 approval_queue=queue, 

627 port=port, 

628 theme=theme, 

629 auto_launch=True, 

630 ) 

631 

632 # 在后台线程启动 Gradio 

633 thread = threading.Thread( 

634 target=dashboard.launch, 

635 kwargs={"share": share}, 

636 daemon=True, 

637 ) 

638 thread.start() 

639 

640 return dashboard, queue