Coverage for agentos/tools/web_tools.py: 42%
24 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2网络工具 — 搜索与网页抓取。
3"""
5from __future__ import annotations
7import httpx
9from agentos.tools.base import BaseTool, PermissionLevel, ToolResult
12class WebFetchTool(BaseTool):
14 """网页抓取工具。"""
16 name = "web_fetch"
17 description = "抓取指定URL的网页正文内容。用于读取网页、文档、API响应等。"
18 permission_level = PermissionLevel.SAFE
20 @property
21 def parameters(self) -> dict:
22 return {
23 "type": "object",
24 "properties": {
25 "url": {
26 "type": "string",
27 "description": "要抓取的URL,需以http://或https://开头",
28 },
29 },
30 "required": ["url"],
31 }
33 async def execute(self, arguments: dict, sandbox=None) -> ToolResult:
34 url = arguments["url"]
35 try:
36 async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
37 response = await client.get(
38 url,
39 headers={"User-Agent": "AgentOS/0.1 (+https://agentos.dev)"},
40 )
41 response.raise_for_status()
42 text = response.text[:10000] # 限制最大10K字符
43 return ToolResult.ok("", output=text)
44 except httpx.HTTPStatusError as e:
45 return ToolResult.fail("", error=f"HTTP {e.response.status_code}: {url}")
46 except httpx.TimeoutException:
47 return ToolResult.fail("", error=f"Timeout fetching {url}")
48 except Exception as e:
49 return ToolResult.fail("", error=str(e))