Coverage for agentos/hitl/gradio_ui.py: 34%
267 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS v1.14.2 — Dynamic HITL UI (Gradio-based Approval Dashboard).
4受 LangGraph Studio / AutoGen UI 启发,在已有 HITL 审批引擎之上
5增加 Gradio 驱动的响应式审批面板。Agent 遇到高风险操作时,
6自动弹出 Web UI 而非阻塞终端。
8Core features:
9- GradioApp: 一键启动的审批 Dashboard
10- ApprovalQueue: 实时审批队列,WebSocket 推送
11- AgentStatusPanel: Agent 状态监控面板
12- ApprovalCard: 可定制的审批卡片组件
13- HistoryView: 审批历史追溯
14- PolicyEditor: 可视化策略编辑器
16与 hitl/approver.py 的关系:
17- approver.py: 审批引擎(决策逻辑、风险评级、策略执行)
18- gradio_ui.py: 交互层(Web UI、实时推送、可视化配置)
19"""
21from __future__ import annotations
23import asyncio
24import json
25import time
26import uuid
27import threading
28import queue
29from dataclasses import dataclass, field
30from datetime import datetime
31from enum import Enum
32from pathlib import Path
33from typing import (
34 Any, Callable, Dict, List, Optional, Set, Tuple, Union,
35)
38# ── UI Data Models ──────────────────────────
41class ApprovalStatus(str, Enum):
42 PENDING = "pending"
43 APPROVED = "approved"
44 DENIED = "denied"
45 EXPIRED = "expired"
46 CANCELLED = "cancelled"
49class RiskLevelUI(str, Enum):
50 SAFE = "safe"
51 LOW = "low"
52 MEDIUM = "medium"
53 HIGH = "high"
54 CRITICAL = "critical"
57@dataclass
58class ApprovalRequestUI:
59 """UI 层的审批请求,与底层 HITL 解耦。"""
61 request_id: str = field(default_factory=lambda: f"apr-{uuid.uuid4().hex[:8]}")
62 agent_name: str = ""
63 action: str = "" # 人类可读的操作描述
64 details: str = "" # 详细说明
65 risk_level: RiskLevelUI = RiskLevelUI.MEDIUM
66 status: ApprovalStatus = ApprovalStatus.PENDING
67 created_at: float = field(default_factory=time.time)
68 expires_at: float = 0.0 # 超时自动拒绝
69 source_file: str = "" # 触发操作的文件/代码位置
70 metadata: Dict[str, Any] = field(default_factory=dict)
71 # Callback when approved/denied
72 on_decision: Optional[Callable] = None
74 @property
75 def elapsed_seconds(self) -> float:
76 return time.time() - self.created_at
78 @property
79 def is_expired(self) -> bool:
80 if self.expires_at <= 0:
81 return False
82 return time.time() > self.expires_at
85@dataclass
86class ApprovalHistory:
87 """审批历史记录。"""
89 request: ApprovalRequestUI
90 decision: ApprovalStatus
91 decided_by: str = "user" # "user" | "auto" | "timeout"
92 reason: str = ""
93 decided_at: float = field(default_factory=time.time)
95 def to_dict(self) -> dict:
96 return {
97 "request_id": self.request.request_id,
98 "action": self.request.action,
99 "risk_level": self.request.risk_level.value,
100 "decision": self.decision.value,
101 "decided_by": self.decided_by,
102 "reason": self.reason,
103 "created_at": self.request.created_at,
104 "decided_at": self.decided_at,
105 "elapsed_ms": int((self.decided_at - self.request.created_at) * 1000),
106 }
109@dataclass
110class AgentStatusSnapshot:
111 """Agent 运行状态快照。"""
113 agent_id: str = ""
114 agent_name: str = ""
115 status: str = "idle" # idle | running | waiting_approval | paused | error
116 current_task: str = ""
117 elapsed_seconds: float = 0.0
118 pending_approvals: int = 0
119 memory_fragments: int = 0
120 last_error: str = ""
123# ── Approval Queue ─────────────────────────
126class ApprovalQueue:
127 """线程安全的审批队列,支持 WebSocket 推送通知。
129 在 Gradio UI 与 Agent HITL 引擎之间架设实时通信桥梁。
130 """
132 def __init__(self, max_size: int = 100, default_timeout: float = 300.0):
133 self._queue: queue.Queue = queue.Queue(maxsize=max_size)
134 self._pending: Dict[str, ApprovalRequestUI] = {}
135 self._history: List[ApprovalHistory] = []
136 self._subscribers: List[Callable] = [] # WebSocket callbacks
137 self._default_timeout = default_timeout
138 self._lock = threading.Lock()
140 def submit(self, request: ApprovalRequestUI) -> str:
141 """提交审批请求。注册到队列并通知订阅者。"""
142 if request.expires_at <= 0:
143 request.expires_at = time.time() + self._default_timeout
145 with self._lock:
146 self._pending[request.request_id] = request
148 self._notify_subscribers({
149 "event": "new_request",
150 "request_id": request.request_id,
151 "action": request.action,
152 "risk_level": request.risk_level.value,
153 "pending_count": len(self._pending),
154 })
156 return request.request_id
158 def decide(self, request_id: str, approved: bool, reason: str = "") -> Optional[ApprovalHistory]:
159 """处理审批决定。"""
160 with self._lock:
161 request = self._pending.pop(request_id, None)
163 if request is None:
164 return None
166 decision = ApprovalStatus.APPROVED if approved else ApprovalStatus.DENIED
167 history = ApprovalHistory(
168 request=request,
169 decision=decision,
170 reason=reason,
171 )
173 # Trigger callback
174 if request.on_decision:
175 try:
176 request.on_decision(approved, reason)
177 except Exception:
178 pass
180 with self._lock:
181 self._history.append(history)
183 self._notify_subscribers({
184 "event": "decision",
185 "request_id": request_id,
186 "approved": approved,
187 "reason": reason,
188 "pending_count": len(self._pending),
189 })
191 return history
193 def approve_all(self) -> int:
194 """批量批准所有待处理请求。"""
195 ids = list(self._pending.keys())
196 for rid in ids:
197 self.decide(rid, True, "batch_approve")
198 return len(ids)
200 def deny_all(self) -> int:
201 """批量拒绝所有待处理请求。"""
202 ids = list(self._pending.keys())
203 for rid in ids:
204 self.decide(rid, False, "batch_deny")
205 return len(ids)
207 def check_timeouts(self) -> int:
208 """检查并自动拒绝超时请求。"""
209 now = time.time()
210 expired_ids = []
211 with self._lock:
212 for rid, req in self._pending.items():
213 if req.is_expired:
214 expired_ids.append(rid)
216 for rid in expired_ids:
217 history = self.decide(rid, False, "timeout")
218 if history:
219 history.decided_by = "timeout"
221 return len(expired_ids)
223 @property
224 def pending_requests(self) -> List[ApprovalRequestUI]:
225 with self._lock:
226 return list(self._pending.values())
228 @property
229 def pending_count(self) -> int:
230 with self._lock:
231 return len(self._pending)
233 @property
234 def recent_history(self, limit: int = 50) -> List[ApprovalHistory]:
235 with self._lock:
236 return self._history[-limit:]
238 def subscribe(self, callback: Callable) -> None:
239 """注册 WebSocket 通知回调。"""
240 self._subscribers.append(callback)
242 def _notify_subscribers(self, data: dict) -> None:
243 for cb in self._subscribers:
244 try:
245 cb(data)
246 except Exception:
247 pass
250# ── Gradio Approval Dashboard ──────────────
253class ApprovalDashboard:
254 """Gradio 驱动的审批面板。
256 核心布局:
257 - 顶部: Agent 状态栏
258 - 左侧: 待审批队列
259 - 右侧: 审批详情 + 历史
260 - 底部: 批量操作按钮
262 Usage:
263 dashboard = ApprovalDashboard(queue, port=7860)
264 dashboard.launch()
265 """
267 def __init__(
268 self,
269 approval_queue: ApprovalQueue,
270 port: int = 7860,
271 title: str = "AgentOS — HITL Approval Dashboard",
272 theme: str = "soft",
273 auto_launch: bool = True,
274 ):
275 self._queue = approval_queue
276 self._port = port
277 self._title = title
278 self._theme = theme
279 self._auto_launch = auto_launch
280 self._app = None
281 self._agent_statuses: Dict[str, AgentStatusSnapshot] = {}
282 self._selected_request_id: str = ""
284 def launch(self, share: bool = False) -> Any:
285 """启动 Gradio 面板。
287 返回 Gradio Blocks 实例,可在 Jupyter 中内嵌或独立运行。
288 """
289 try:
290 import gradio as gr
291 except ImportError:
292 raise ImportError(
293 "Gradio is required for ApprovalDashboard. "
294 "Install with: pip install gradio>=4.0"
295 )
297 with gr.Blocks(title=self._title, theme=self._theme) as app:
298 self._app = app
299 self._build_ui(app)
301 if self._auto_launch:
302 app.launch(
303 server_port=self._port,
304 share=share,
305 prevent_thread_lock=False,
306 )
308 return app
310 def _build_ui(self, app: Any) -> None:
311 """构建完整 UI 布局。"""
312 import gradio as gr
314 # ── Header ──
315 gr.Markdown(
316 f"# {self._title}\n"
317 f"### Real-time Human-in-the-Loop Approval Dashboard"
318 )
320 # ── Agent Status Bar ──
321 with gr.Row():
322 self._agent_status_display = gr.HTML(
323 value=self._render_agent_status_bar(),
324 every=3.0,
325 )
327 # ── Main Layout ──
328 with gr.Row():
329 # Left: Pending Queue
330 with gr.Column(scale=1):
331 gr.Markdown("### Pending Approvals")
332 self._queue_list = gr.HTML(
333 value=self._render_pending_queue(),
334 every=2.0,
335 )
336 with gr.Row():
337 self._btn_approve_all = gr.Button(
338 "Approve All", variant="primary", size="sm"
339 )
340 self._btn_deny_all = gr.Button(
341 "Deny All", variant="stop", size="sm"
342 )
344 # Right: Detail + History
345 with gr.Column(scale=2):
346 with gr.Tabs():
347 with gr.TabItem("Approval Detail"):
348 self._detail_view = gr.HTML(
349 value="<p>Select a request from the queue...</p>"
350 )
351 with gr.Row():
352 self._btn_approve = gr.Button(
353 "Approve", variant="primary"
354 )
355 self._btn_deny = gr.Button(
356 "Deny", variant="stop"
357 )
358 self._reason_input = gr.Textbox(
359 label="Reason (optional)",
360 placeholder="Why this decision...",
361 )
363 with gr.TabItem("History"):
364 self._history_view = gr.HTML(
365 value=self._render_history(),
366 every=3.0,
367 )
369 with gr.TabItem("Policy"):
370 self._policy_view = gr.HTML(
371 value=self._render_policy_editor()
372 )
374 # ── Event Handlers ──
375 self._btn_approve.click(
376 fn=self._handle_approve,
377 inputs=[self._reason_input],
378 outputs=[self._detail_view, self._queue_list, self._history_view],
379 )
380 self._btn_deny.click(
381 fn=self._handle_deny,
382 inputs=[self._reason_input],
383 outputs=[self._detail_view, self._queue_list, self._history_view],
384 )
385 self._btn_approve_all.click(
386 fn=lambda: self._queue.approve_all(),
387 outputs=[],
388 )
389 self._btn_deny_all.click(
390 fn=lambda: self._queue.deny_all(),
391 outputs=[],
392 )
394 def _handle_approve(self, reason: str) -> Tuple[str, str, str]:
395 if not self._selected_request_id:
396 return self._detail_view, self._queue_list, self._history_view
397 self._queue.decide(self._selected_request_id, True, reason)
398 self._selected_request_id = ""
399 return (
400 "<p>Select a request from the queue...</p>",
401 self._render_pending_queue(),
402 self._render_history(),
403 )
405 def _handle_deny(self, reason: str) -> Tuple[str, str, str]:
406 if not self._selected_request_id:
407 return self._detail_view, self._queue_list, self._history_view
408 self._queue.decide(self._selected_request_id, False, reason)
409 self._selected_request_id = ""
410 return (
411 "<p>Select a request from the queue...</p>",
412 self._render_pending_queue(),
413 self._render_history(),
414 )
416 def update_agent_status(self, snapshot: AgentStatusSnapshot) -> None:
417 """更新 Agent 状态(由外部 Agent loop 调用)。"""
418 self._agent_statuses[snapshot.agent_id] = snapshot
420 def _render_agent_status_bar(self) -> str:
421 """渲染 Agent 状态栏 HTML。"""
422 if not self._agent_statuses:
423 return (
424 '<div style="padding:12px;background:#f0f0f0;border-radius:8px;">'
425 '<span style="color:#888;">No agents connected</span></div>'
426 )
428 rows = []
429 for agent_id, snap in self._agent_statuses.items():
430 status_color = {
431 "idle": "#4CAF50",
432 "running": "#2196F3",
433 "waiting_approval": "#FF9800",
434 "paused": "#9E9E9E",
435 "error": "#F44336",
436 }.get(snap.status, "#9E9E9E")
438 rows.append(
439 f'<div style="display:inline-block;margin:4px 8px;padding:8px 12px;'
440 f'background:#fff;border-radius:6px;border-left:4px solid {status_color};">'
441 f'<b>{snap.agent_name}</b> '
442 f'<span style="color:{status_color};">● {snap.status}</span> '
443 f'| {snap.current_task[:30]} '
444 f'| {snap.pending_approvals} pending'
445 f'</div>'
446 )
448 return (
449 '<div style="padding:12px;background:#f0f0f0;border-radius:8px;">'
450 + "".join(rows)
451 + "</div>"
452 )
454 def _render_pending_queue(self) -> str:
455 """渲染待处理队列 HTML。"""
456 pending = self._queue.pending_requests
457 if not pending:
458 return '<p style="color:#888;">No pending approvals</p>'
460 risk_colors = {
461 "safe": "#4CAF50",
462 "low": "#8BC34A",
463 "medium": "#FF9800",
464 "high": "#F44336",
465 "critical": "#B71C1C",
466 }
468 cards = []
469 for req in pending:
470 color = risk_colors.get(req.risk_level.value, "#999")
471 elapsed = int(req.elapsed_seconds)
472 cards.append(
473 f'<div onclick="selectRequest(\'{req.request_id}\')" '
474 f'style="cursor:pointer;margin:6px 0;padding:10px;'
475 f'background:#fff;border-radius:6px;'
476 f'border-left:4px solid {color};">'
477 f'<div style="font-weight:bold;">{req.action[:60]}</div>'
478 f'<div style="color:{color};font-size:0.85em;">'
479 f'Risk: {req.risk_level.value} | Agent: {req.agent_name} | '
480 f'{elapsed}s ago</div>'
481 f'</div>'
482 )
484 return "".join(cards)
486 def _render_history(self) -> str:
487 """渲染审批历史。"""
488 history = self._queue.recent_history(limit=30)
489 if not history:
490 return "<p>No history yet</p>"
492 rows = ["<table style='width:100%;border-collapse:collapse;'>"]
493 rows.append(
494 "<tr style='background:#eee;'><th>Time</th><th>Action</th>"
495 "<th>Decision</th><th>By</th><th>Latency</th></tr>"
496 )
497 for h in reversed(history):
498 dt = datetime.fromtimestamp(h.decided_at).strftime("%H:%M:%S")
499 decision_color = "#4CAF50" if h.decision == ApprovalStatus.APPROVED else "#F44336"
500 rows.append(
501 f"<tr><td>{dt}</td>"
502 f"<td>{h.request.action[:40]}</td>"
503 f"<td style='color:{decision_color};font-weight:bold;'>"
504 f"{h.decision.value}</td>"
505 f"<td>{h.decided_by}</td>"
506 f"<td>{h.decided_at - h.request.created_at:.1f}s</td></tr>"
507 )
508 rows.append("</table>")
509 return "".join(rows)
511 def _render_policy_editor(self) -> str:
512 """渲染策略编辑器(占位,可扩展为交互式表单)。"""
513 return """
514 <div style="padding:16px;">
515 <h3>Approval Policy</h3>
516 <p>Configure auto-approval thresholds by risk level:</p>
517 <ul>
518 <li><b>Safe/Low:</b> Auto-approve</li>
519 <li><b>Medium:</b> Ask if confidence < 90%</li>
520 <li><b>High:</b> Always ask</li>
521 <li><b>Critical:</b> Always ask + require 2FA</li>
522 </ul>
523 <p><i>Interactive policy editor coming in v1.14.3</i></p>
524 </div>
525 """
527 @property
528 def queue(self) -> ApprovalQueue:
529 return self._queue
532# ── Agent Integration Bridge ────────────────
535class HITLUIBridge:
536 """连接 Agent HITL 引擎与 Gradio UI 的桥梁。
538 在 Agent loop 中使用:
539 bridge = HITLUIBridge(queue, agent_id)
540 bridge.send_approval_request(action="Delete file X", risk_level="high")
541 # UI 弹出审批卡片,Agent 在此阻塞等待结果
542 approved = await bridge.wait_for_decision(timeout=60)
543 """
545 def __init__(
546 self,
547 approval_queue: ApprovalQueue,
548 agent_id: str = "",
549 agent_name: str = "",
550 ):
551 self._queue = approval_queue
552 self.agent_id = agent_id
553 self.agent_name = agent_name
554 self._decision_events: Dict[str, asyncio.Event] = {}
555 self._decision_results: Dict[str, Tuple[bool, str]] = {}
557 async def send_approval_request(
558 self,
559 action: str,
560 details: str = "",
561 risk_level: str = "medium",
562 source_file: str = "",
563 timeout: float = 300.0,
564 metadata: Optional[Dict[str, Any]] = None,
565 ) -> Tuple[bool, str]:
566 """发送审批请求到 UI,阻塞等待用户决定。
568 Returns:
569 (approved: bool, reason: str)
570 """
571 event = asyncio.Event()
572 request_id = f"apr-{uuid.uuid4().hex[:8]}"
573 self._decision_events[request_id] = event
575 def on_decision(approved: bool, reason: str) -> None:
576 self._decision_results[request_id] = (approved, reason)
577 event.set()
579 request = ApprovalRequestUI(
580 request_id=request_id,
581 agent_name=self.agent_name,
582 action=action,
583 details=details,
584 risk_level=RiskLevelUI(risk_level),
585 expires_at=time.time() + timeout,
586 source_file=source_file,
587 metadata=metadata or {},
588 on_decision=on_decision,
589 )
591 self._queue.submit(request)
593 # 等待用户决定或超时
594 try:
595 await asyncio.wait_for(event.wait(), timeout=timeout)
596 result = self._decision_results.pop(request_id, (False, "timeout"))
597 self._decision_events.pop(request_id, None)
598 return result
599 except asyncio.TimeoutError:
600 self._queue.decide(request_id, False, "timeout")
601 self._decision_events.pop(request_id, None)
602 return (False, "timeout")
605# ── Quick Launch ────────────────────────────
608def create_hitl_dashboard(
609 port: int = 7860,
610 theme: str = "soft",
611 share: bool = False,
612) -> Tuple[ApprovalDashboard, ApprovalQueue]:
613 """一键创建并启动 HITL 审批面板。
615 Usage:
616 dashboard, queue = create_hitl_dashboard(port=7860)
617 # Agent 代码中:
618 bridge = HITLUIBridge(queue, agent_name="FileAgent")
619 approved, reason = await bridge.send_approval_request(
620 action="Delete 50 files in /tmp/",
621 risk_level="high",
622 )
623 """
624 queue = ApprovalQueue()
625 dashboard = ApprovalDashboard(
626 approval_queue=queue,
627 port=port,
628 theme=theme,
629 auto_launch=True,
630 )
632 # 在后台线程启动 Gradio
633 thread = threading.Thread(
634 target=dashboard.launch,
635 kwargs={"share": share},
636 daemon=True,
637 )
638 thread.start()
640 return dashboard, queue