Coverage for agentos/tools/web_tools.py: 42%

24 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2网络工具 — 搜索与网页抓取。 

3""" 

4 

5from __future__ import annotations 

6 

7import httpx 

8 

9from agentos.tools.base import BaseTool, PermissionLevel, ToolResult 

10 

11 

12class WebFetchTool(BaseTool): 

13 

14 """网页抓取工具。""" 

15 

16 name = "web_fetch" 

17 description = "抓取指定URL的网页正文内容。用于读取网页、文档、API响应等。" 

18 permission_level = PermissionLevel.SAFE 

19 

20 @property 

21 def parameters(self) -> dict: 

22 return { 

23 "type": "object", 

24 "properties": { 

25 "url": { 

26 "type": "string", 

27 "description": "要抓取的URL,需以http://或https://开头", 

28 }, 

29 }, 

30 "required": ["url"], 

31 } 

32 

33 async def execute(self, arguments: dict, sandbox=None) -> ToolResult: 

34 url = arguments["url"] 

35 try: 

36 async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: 

37 response = await client.get( 

38 url, 

39 headers={"User-Agent": "AgentOS/0.1 (+https://agentos.dev)"}, 

40 ) 

41 response.raise_for_status() 

42 text = response.text[:10000] # 限制最大10K字符 

43 return ToolResult.ok("", output=text) 

44 except httpx.HTTPStatusError as e: 

45 return ToolResult.fail("", error=f"HTTP {e.response.status_code}: {url}") 

46 except httpx.TimeoutException: 

47 return ToolResult.fail("", error=f"Timeout fetching {url}") 

48 except Exception as e: 

49 return ToolResult.fail("", error=str(e))