Coverage for agentos/system/browser.py: 25%

193 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2浏览器自动化模块 — 通过 CDP (Chrome DevTools Protocol) 控制浏览器。 

3 

4设计: 

5- 底层使用 Playwright 连接 Chromium 浏览器 

6- 操作统一为 BrowserAction 结构 

7- 支持导航、点击、填表、截图、提取文本、执行JS 

8- 权限级别: BROWSER (需用户授权) 

9""" 

10 

11from __future__ import annotations 

12 

13import asyncio 

14from dataclasses import dataclass, field 

15from typing import Optional, Any 

16from datetime import datetime 

17 

18from agentos.system.permissions import ( 

19 SystemPermissionManager, 

20 PermissionTier, 

21 PermissionDenied, 

22) 

23 

24 

25# ── 浏览器动作定义 ───────────────────────────────────────────── 

26 

27 

28@dataclass 

29class BrowserAction: 

30 """浏览器操作定义。""" 

31 action_type: str # navigate / click / type / screenshot / extract / js / wait / scroll 

32 url: str = "" # 导航目标 URL 

33 selector: str = "" # CSS/XPath 选择器 

34 value: str = "" # 输入值 / JS 代码 / 等待时间 

35 screenshot_path: str = "" # 截图保存路径 

36 wait_until: str = "load" # 等待条件: load / networkidle / domcontentloaded 

37 

38 

39@dataclass 

40class BrowserResult: 

41 """浏览器操作结果。""" 

42 success: bool 

43 action: str 

44 url: str = "" 

45 text: str = "" # 提取的文本 

46 html: str = "" # 页面 HTML 

47 screenshot_path: str = "" # 截图文件路径 

48 title: str = "" # 页面标题 

49 error: str = "" 

50 duration_ms: float = 0 

51 

52 

53# ── CDP 浏览器会话 ───────────────────────────────────────────── 

54 

55 

56class BrowserSession: 

57 """基于 Playwright 的浏览器会话,封装 CDP 底层协议。 

58 

59 使用方式: 

60 async with BrowserSession() as browser: 

61 await browser.navigate("https://example.com") 

62 text = await browser.extract_text("body") 

63 await browser.screenshot("page.png") 

64 """ 

65 

66 def __init__(self, headless: bool = True, slow_mo: int = 0, 

67 viewport_width: int = 1280, viewport_height: int = 720): 

68 self._headless = headless 

69 self._slow_mo = slow_mo 

70 self._viewport = {"width": viewport_width, "height": viewport_height} 

71 self._playwright = None 

72 self._browser = None 

73 self._page = None 

74 self._current_url = "" 

75 

76 async def __aenter__(self): 

77 await self.start() 

78 return self 

79 

80 async def __aexit__(self, *args): 

81 await self.close() 

82 

83 async def start(self) -> None: 

84 """启动浏览器实例。""" 

85 try: 

86 from playwright.async_api import async_playwright 

87 except ImportError: 

88 raise ImportError( 

89 "浏览器自动化需要 playwright。安装: pip install playwright && playwright install chromium" 

90 ) 

91 

92 self._playwright = await async_playwright().start() 

93 self._browser = await self._playwright.chromium.launch( 

94 headless=self._headless, 

95 slow_mo=self._slow_mo, 

96 args=[ 

97 "--no-sandbox", 

98 "--disable-setuid-sandbox", 

99 "--disable-dev-shm-usage", 

100 "--disable-gpu", 

101 ], 

102 ) 

103 self._page = await self._browser.new_page(viewport=self._viewport) 

104 

105 async def close(self) -> None: 

106 """关闭浏览器。""" 

107 if self._browser: 

108 await self._browser.close() 

109 if self._playwright: 

110 await self._playwright.stop() 

111 

112 # ── 核心操作 ── 

113 

114 async def navigate(self, url: str, wait_until: str = "load") -> BrowserResult: 

115 """导航到指定 URL。""" 

116 import time 

117 t0 = time.time() 

118 try: 

119 resp = await self._page.goto(url, wait_until=wait_until, timeout=30000) 

120 self._current_url = self._page.url 

121 title = await self._page.title() 

122 duration = (time.time() - t0) * 1000 

123 return BrowserResult( 

124 success=resp and resp.ok, 

125 action="navigate", 

126 url=self._current_url, 

127 title=title, 

128 duration_ms=duration, 

129 ) 

130 except Exception as e: 

131 return BrowserResult( 

132 success=False, action="navigate", url=url, 

133 error=str(e), duration_ms=(time.time() - t0) * 1000, 

134 ) 

135 

136 async def click(self, selector: str) -> BrowserResult: 

137 """点击元素。""" 

138 import time 

139 t0 = time.time() 

140 try: 

141 await self._page.click(selector, timeout=10000) 

142 return BrowserResult( 

143 success=True, action="click", 

144 url=self._page.url, selector=selector, 

145 duration_ms=(time.time() - t0) * 1000, 

146 ) 

147 except Exception as e: 

148 return BrowserResult( 

149 success=False, action="click", selector=selector, 

150 error=str(e), duration_ms=(time.time() - t0) * 1000, 

151 ) 

152 

153 async def type_text(self, selector: str, text: str) -> BrowserResult: 

154 """在输入框中输入文本。""" 

155 import time 

156 t0 = time.time() 

157 try: 

158 await self._page.fill(selector, text, timeout=10000) 

159 return BrowserResult( 

160 success=True, action="type", 

161 url=self._page.url, selector=selector, text=text, 

162 duration_ms=(time.time() - t0) * 1000, 

163 ) 

164 except Exception as e: 

165 return BrowserResult( 

166 success=False, action="type", selector=selector, 

167 error=str(e), duration_ms=(time.time() - t0) * 1000, 

168 ) 

169 

170 async def extract_text(self, selector: str = "body") -> BrowserResult: 

171 """提取页面文本。""" 

172 import time 

173 t0 = time.time() 

174 try: 

175 element = await self._page.query_selector(selector) 

176 if element: 

177 text = await element.inner_text() 

178 else: 

179 text = "" 

180 return BrowserResult( 

181 success=True, action="extract", 

182 url=self._page.url, text=text, 

183 duration_ms=(time.time() - t0) * 1000, 

184 ) 

185 except Exception as e: 

186 return BrowserResult( 

187 success=False, action="extract", 

188 error=str(e), duration_ms=(time.time() - t0) * 1000, 

189 ) 

190 

191 async def extract_html(self) -> BrowserResult: 

192 """获取完整 HTML。""" 

193 import time 

194 t0 = time.time() 

195 try: 

196 html = await self._page.content() 

197 return BrowserResult( 

198 success=True, action="extract", 

199 url=self._page.url, html=html, 

200 duration_ms=(time.time() - t0) * 1000, 

201 ) 

202 except Exception as e: 

203 return BrowserResult( 

204 success=False, action="extract", 

205 error=str(e), duration_ms=(time.time() - t0) * 1000, 

206 ) 

207 

208 async def screenshot(self, path: str = "", full_page: bool = True) -> BrowserResult: 

209 """截取页面截图。""" 

210 import time 

211 t0 = time.time() 

212 save_path = path or f"/tmp/agentos_screenshot_{int(t0)}.png" 

213 try: 

214 await self._page.screenshot(path=save_path, full_page=full_page) 

215 return BrowserResult( 

216 success=True, action="screenshot", 

217 url=self._page.url, screenshot_path=save_path, 

218 duration_ms=(time.time() - t0) * 1000, 

219 ) 

220 except Exception as e: 

221 return BrowserResult( 

222 success=False, action="screenshot", 

223 error=str(e), duration_ms=(time.time() - t0) * 1000, 

224 ) 

225 

226 async def execute_js(self, code: str) -> BrowserResult: 

227 """在页面中执行 JavaScript。""" 

228 import time 

229 t0 = time.time() 

230 try: 

231 result = await self._page.evaluate(code) 

232 return BrowserResult( 

233 success=True, action="js", 

234 url=self._page.url, text=str(result), 

235 duration_ms=(time.time() - t0) * 1000, 

236 ) 

237 except Exception as e: 

238 return BrowserResult( 

239 success=False, action="js", 

240 error=str(e), duration_ms=(time.time() - t0) * 1000, 

241 ) 

242 

243 async def wait(self, selector: str = "", milliseconds: int = 1000) -> BrowserResult: 

244 """等待元素出现或等待指定毫秒。""" 

245 import time 

246 t0 = time.time() 

247 try: 

248 if selector: 

249 await self._page.wait_for_selector(selector, timeout=10000) 

250 else: 

251 await asyncio.sleep(milliseconds / 1000) 

252 return BrowserResult( 

253 success=True, action="wait", 

254 url=self._page.url, selector=selector, 

255 duration_ms=(time.time() - t0) * 1000, 

256 ) 

257 except Exception as e: 

258 return BrowserResult( 

259 success=False, action="wait", selector=selector, 

260 error=str(e), duration_ms=(time.time() - t0) * 1000, 

261 ) 

262 

263 async def scroll(self, direction: str = "down", amount: int = 500) -> BrowserResult: 

264 """滚动页面。""" 

265 import time 

266 t0 = time.time() 

267 try: 

268 if direction == "down": 

269 await self._page.evaluate(f"window.scrollBy(0, {amount})") 

270 elif direction == "up": 

271 await self._page.evaluate(f"window.scrollBy(0, -{amount})") 

272 elif direction == "bottom": 

273 await self._page.evaluate("window.scrollTo(0, document.body.scrollHeight)") 

274 elif direction == "top": 

275 await self._page.evaluate("window.scrollTo(0, 0)") 

276 return BrowserResult( 

277 success=True, action="scroll", 

278 url=self._page.url, text=f"已滚动 {direction}", 

279 duration_ms=(time.time() - t0) * 1000, 

280 ) 

281 except Exception as e: 

282 return BrowserResult( 

283 success=False, action="scroll", 

284 error=str(e), duration_ms=(time.time() - t0) * 1000, 

285 ) 

286 

287 @property 

288 def current_url(self) -> str: 

289 return self._page.url if self._page else "" 

290 

291 

292# ── CDP 浏览器管理器 ─────────────────────────────────────────── 

293 

294 

295class CDPBrowser: 

296 """浏览器管理器 — 带权限控制的浏览器自动化入口。 

297 

298 使用: 

299 pm = SystemPermissionManager() 

300 browser = CDPBrowser(pm, "session-123") 

301 

302 async with browser.session() as sess: 

303 await sess.navigate("https://example.com") 

304 text = await sess.extract_text() 

305 """ 

306 

307 def __init__(self, perm_manager: SystemPermissionManager, session_id: str, 

308 headless: bool = True): 

309 self._pm = perm_manager 

310 self._sid = session_id 

311 self._headless = headless 

312 self._current_session: BrowserSession | None = None 

313 

314 def session(self, headless: bool | None = None) -> BrowserSession: 

315 """创建浏览器会话(上下文管理器)。""" 

316 # 权限检查 

317 try: 

318 self._pm.require(self._sid, PermissionTier.BROWSER, "browser:*") 

319 except PermissionDenied as e: 

320 raise PermissionDenied( 

321 PermissionTier.BROWSER, "browser:*", 

322 f"浏览器自动化需要 BROWSER 权限: {e}", 

323 ) 

324 

325 hl = headless if headless is not None else self._headless 

326 self._current_session = BrowserSession(headless=hl) 

327 return self._current_session 

328 

329 async def quick_fetch(self, url: str, extract_text: bool = True) -> BrowserResult: 

330 """快速抓取页面(自动打开关闭浏览器)。""" 

331 async with self.session() as sess: 

332 nav = await sess.navigate(url) 

333 if not nav.success: 

334 return nav 

335 if extract_text: 

336 return await sess.extract_text() 

337 return await sess.extract_html() 

338 

339 async def quick_screenshot(self, url: str, save_path: str) -> BrowserResult: 

340 """快速截图页面。""" 

341 async with self.session() as sess: 

342 nav = await sess.navigate(url) 

343 if not nav.success: 

344 return nav 

345 return await sess.screenshot(save_path) 

346 

347 async def execute_action(self, action: BrowserAction) -> BrowserResult: 

348 """执行单个浏览器动作。""" 

349 if not self._current_session: 

350 raise RuntimeError("没有活跃的浏览器会话,请使用 async with browser.session()") 

351 

352 sess = self._current_session 

353 

354 if action.action_type == "navigate": 

355 return await sess.navigate(action.url, action.wait_until) 

356 elif action.action_type == "click": 

357 return await sess.click(action.selector) 

358 elif action.action_type == "type": 

359 return await sess.type_text(action.selector, action.value) 

360 elif action.action_type == "screenshot": 

361 return await sess.screenshot(action.screenshot_path) 

362 elif action.action_type == "extract": 

363 return await sess.extract_text(action.selector or "body") 

364 elif action.action_type == "js": 

365 return await sess.execute_js(action.value) 

366 elif action.action_type == "wait": 

367 ms = int(action.value) if action.value.isdigit() else 1000 

368 return await sess.wait(action.selector, ms) 

369 elif action.action_type == "scroll": 

370 return await sess.scroll(action.value or "down") 

371 else: 

372 return BrowserResult(success=False, action=action.action_type, error=f"未知动作: {action.action_type}")