Coverage for agentos/system/browser.py: 25%
193 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2浏览器自动化模块 — 通过 CDP (Chrome DevTools Protocol) 控制浏览器。
4设计:
5- 底层使用 Playwright 连接 Chromium 浏览器
6- 操作统一为 BrowserAction 结构
7- 支持导航、点击、填表、截图、提取文本、执行JS
8- 权限级别: BROWSER (需用户授权)
9"""
11from __future__ import annotations
13import asyncio
14from dataclasses import dataclass, field
15from typing import Optional, Any
16from datetime import datetime
18from agentos.system.permissions import (
19 SystemPermissionManager,
20 PermissionTier,
21 PermissionDenied,
22)
25# ── 浏览器动作定义 ─────────────────────────────────────────────
28@dataclass
29class BrowserAction:
30 """浏览器操作定义。"""
31 action_type: str # navigate / click / type / screenshot / extract / js / wait / scroll
32 url: str = "" # 导航目标 URL
33 selector: str = "" # CSS/XPath 选择器
34 value: str = "" # 输入值 / JS 代码 / 等待时间
35 screenshot_path: str = "" # 截图保存路径
36 wait_until: str = "load" # 等待条件: load / networkidle / domcontentloaded
39@dataclass
40class BrowserResult:
41 """浏览器操作结果。"""
42 success: bool
43 action: str
44 url: str = ""
45 text: str = "" # 提取的文本
46 html: str = "" # 页面 HTML
47 screenshot_path: str = "" # 截图文件路径
48 title: str = "" # 页面标题
49 error: str = ""
50 duration_ms: float = 0
53# ── CDP 浏览器会话 ─────────────────────────────────────────────
56class BrowserSession:
57 """基于 Playwright 的浏览器会话,封装 CDP 底层协议。
59 使用方式:
60 async with BrowserSession() as browser:
61 await browser.navigate("https://example.com")
62 text = await browser.extract_text("body")
63 await browser.screenshot("page.png")
64 """
66 def __init__(self, headless: bool = True, slow_mo: int = 0,
67 viewport_width: int = 1280, viewport_height: int = 720):
68 self._headless = headless
69 self._slow_mo = slow_mo
70 self._viewport = {"width": viewport_width, "height": viewport_height}
71 self._playwright = None
72 self._browser = None
73 self._page = None
74 self._current_url = ""
76 async def __aenter__(self):
77 await self.start()
78 return self
80 async def __aexit__(self, *args):
81 await self.close()
83 async def start(self) -> None:
84 """启动浏览器实例。"""
85 try:
86 from playwright.async_api import async_playwright
87 except ImportError:
88 raise ImportError(
89 "浏览器自动化需要 playwright。安装: pip install playwright && playwright install chromium"
90 )
92 self._playwright = await async_playwright().start()
93 self._browser = await self._playwright.chromium.launch(
94 headless=self._headless,
95 slow_mo=self._slow_mo,
96 args=[
97 "--no-sandbox",
98 "--disable-setuid-sandbox",
99 "--disable-dev-shm-usage",
100 "--disable-gpu",
101 ],
102 )
103 self._page = await self._browser.new_page(viewport=self._viewport)
105 async def close(self) -> None:
106 """关闭浏览器。"""
107 if self._browser:
108 await self._browser.close()
109 if self._playwright:
110 await self._playwright.stop()
112 # ── 核心操作 ──
114 async def navigate(self, url: str, wait_until: str = "load") -> BrowserResult:
115 """导航到指定 URL。"""
116 import time
117 t0 = time.time()
118 try:
119 resp = await self._page.goto(url, wait_until=wait_until, timeout=30000)
120 self._current_url = self._page.url
121 title = await self._page.title()
122 duration = (time.time() - t0) * 1000
123 return BrowserResult(
124 success=resp and resp.ok,
125 action="navigate",
126 url=self._current_url,
127 title=title,
128 duration_ms=duration,
129 )
130 except Exception as e:
131 return BrowserResult(
132 success=False, action="navigate", url=url,
133 error=str(e), duration_ms=(time.time() - t0) * 1000,
134 )
136 async def click(self, selector: str) -> BrowserResult:
137 """点击元素。"""
138 import time
139 t0 = time.time()
140 try:
141 await self._page.click(selector, timeout=10000)
142 return BrowserResult(
143 success=True, action="click",
144 url=self._page.url, selector=selector,
145 duration_ms=(time.time() - t0) * 1000,
146 )
147 except Exception as e:
148 return BrowserResult(
149 success=False, action="click", selector=selector,
150 error=str(e), duration_ms=(time.time() - t0) * 1000,
151 )
153 async def type_text(self, selector: str, text: str) -> BrowserResult:
154 """在输入框中输入文本。"""
155 import time
156 t0 = time.time()
157 try:
158 await self._page.fill(selector, text, timeout=10000)
159 return BrowserResult(
160 success=True, action="type",
161 url=self._page.url, selector=selector, text=text,
162 duration_ms=(time.time() - t0) * 1000,
163 )
164 except Exception as e:
165 return BrowserResult(
166 success=False, action="type", selector=selector,
167 error=str(e), duration_ms=(time.time() - t0) * 1000,
168 )
170 async def extract_text(self, selector: str = "body") -> BrowserResult:
171 """提取页面文本。"""
172 import time
173 t0 = time.time()
174 try:
175 element = await self._page.query_selector(selector)
176 if element:
177 text = await element.inner_text()
178 else:
179 text = ""
180 return BrowserResult(
181 success=True, action="extract",
182 url=self._page.url, text=text,
183 duration_ms=(time.time() - t0) * 1000,
184 )
185 except Exception as e:
186 return BrowserResult(
187 success=False, action="extract",
188 error=str(e), duration_ms=(time.time() - t0) * 1000,
189 )
191 async def extract_html(self) -> BrowserResult:
192 """获取完整 HTML。"""
193 import time
194 t0 = time.time()
195 try:
196 html = await self._page.content()
197 return BrowserResult(
198 success=True, action="extract",
199 url=self._page.url, html=html,
200 duration_ms=(time.time() - t0) * 1000,
201 )
202 except Exception as e:
203 return BrowserResult(
204 success=False, action="extract",
205 error=str(e), duration_ms=(time.time() - t0) * 1000,
206 )
208 async def screenshot(self, path: str = "", full_page: bool = True) -> BrowserResult:
209 """截取页面截图。"""
210 import time
211 t0 = time.time()
212 save_path = path or f"/tmp/agentos_screenshot_{int(t0)}.png"
213 try:
214 await self._page.screenshot(path=save_path, full_page=full_page)
215 return BrowserResult(
216 success=True, action="screenshot",
217 url=self._page.url, screenshot_path=save_path,
218 duration_ms=(time.time() - t0) * 1000,
219 )
220 except Exception as e:
221 return BrowserResult(
222 success=False, action="screenshot",
223 error=str(e), duration_ms=(time.time() - t0) * 1000,
224 )
226 async def execute_js(self, code: str) -> BrowserResult:
227 """在页面中执行 JavaScript。"""
228 import time
229 t0 = time.time()
230 try:
231 result = await self._page.evaluate(code)
232 return BrowserResult(
233 success=True, action="js",
234 url=self._page.url, text=str(result),
235 duration_ms=(time.time() - t0) * 1000,
236 )
237 except Exception as e:
238 return BrowserResult(
239 success=False, action="js",
240 error=str(e), duration_ms=(time.time() - t0) * 1000,
241 )
243 async def wait(self, selector: str = "", milliseconds: int = 1000) -> BrowserResult:
244 """等待元素出现或等待指定毫秒。"""
245 import time
246 t0 = time.time()
247 try:
248 if selector:
249 await self._page.wait_for_selector(selector, timeout=10000)
250 else:
251 await asyncio.sleep(milliseconds / 1000)
252 return BrowserResult(
253 success=True, action="wait",
254 url=self._page.url, selector=selector,
255 duration_ms=(time.time() - t0) * 1000,
256 )
257 except Exception as e:
258 return BrowserResult(
259 success=False, action="wait", selector=selector,
260 error=str(e), duration_ms=(time.time() - t0) * 1000,
261 )
263 async def scroll(self, direction: str = "down", amount: int = 500) -> BrowserResult:
264 """滚动页面。"""
265 import time
266 t0 = time.time()
267 try:
268 if direction == "down":
269 await self._page.evaluate(f"window.scrollBy(0, {amount})")
270 elif direction == "up":
271 await self._page.evaluate(f"window.scrollBy(0, -{amount})")
272 elif direction == "bottom":
273 await self._page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
274 elif direction == "top":
275 await self._page.evaluate("window.scrollTo(0, 0)")
276 return BrowserResult(
277 success=True, action="scroll",
278 url=self._page.url, text=f"已滚动 {direction}",
279 duration_ms=(time.time() - t0) * 1000,
280 )
281 except Exception as e:
282 return BrowserResult(
283 success=False, action="scroll",
284 error=str(e), duration_ms=(time.time() - t0) * 1000,
285 )
287 @property
288 def current_url(self) -> str:
289 return self._page.url if self._page else ""
292# ── CDP 浏览器管理器 ───────────────────────────────────────────
295class CDPBrowser:
296 """浏览器管理器 — 带权限控制的浏览器自动化入口。
298 使用:
299 pm = SystemPermissionManager()
300 browser = CDPBrowser(pm, "session-123")
302 async with browser.session() as sess:
303 await sess.navigate("https://example.com")
304 text = await sess.extract_text()
305 """
307 def __init__(self, perm_manager: SystemPermissionManager, session_id: str,
308 headless: bool = True):
309 self._pm = perm_manager
310 self._sid = session_id
311 self._headless = headless
312 self._current_session: BrowserSession | None = None
314 def session(self, headless: bool | None = None) -> BrowserSession:
315 """创建浏览器会话(上下文管理器)。"""
316 # 权限检查
317 try:
318 self._pm.require(self._sid, PermissionTier.BROWSER, "browser:*")
319 except PermissionDenied as e:
320 raise PermissionDenied(
321 PermissionTier.BROWSER, "browser:*",
322 f"浏览器自动化需要 BROWSER 权限: {e}",
323 )
325 hl = headless if headless is not None else self._headless
326 self._current_session = BrowserSession(headless=hl)
327 return self._current_session
329 async def quick_fetch(self, url: str, extract_text: bool = True) -> BrowserResult:
330 """快速抓取页面(自动打开关闭浏览器)。"""
331 async with self.session() as sess:
332 nav = await sess.navigate(url)
333 if not nav.success:
334 return nav
335 if extract_text:
336 return await sess.extract_text()
337 return await sess.extract_html()
339 async def quick_screenshot(self, url: str, save_path: str) -> BrowserResult:
340 """快速截图页面。"""
341 async with self.session() as sess:
342 nav = await sess.navigate(url)
343 if not nav.success:
344 return nav
345 return await sess.screenshot(save_path)
347 async def execute_action(self, action: BrowserAction) -> BrowserResult:
348 """执行单个浏览器动作。"""
349 if not self._current_session:
350 raise RuntimeError("没有活跃的浏览器会话,请使用 async with browser.session()")
352 sess = self._current_session
354 if action.action_type == "navigate":
355 return await sess.navigate(action.url, action.wait_until)
356 elif action.action_type == "click":
357 return await sess.click(action.selector)
358 elif action.action_type == "type":
359 return await sess.type_text(action.selector, action.value)
360 elif action.action_type == "screenshot":
361 return await sess.screenshot(action.screenshot_path)
362 elif action.action_type == "extract":
363 return await sess.extract_text(action.selector or "body")
364 elif action.action_type == "js":
365 return await sess.execute_js(action.value)
366 elif action.action_type == "wait":
367 ms = int(action.value) if action.value.isdigit() else 1000
368 return await sess.wait(action.selector, ms)
369 elif action.action_type == "scroll":
370 return await sess.scroll(action.value or "down")
371 else:
372 return BrowserResult(success=False, action=action.action_type, error=f"未知动作: {action.action_type}")