Coverage for agentos/tools/http_tools.py: 21%

92 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1"""HTTP 工具 — HTTP 请求、文件下载。""" 

2 

3from __future__ import annotations 

4 

5import json 

6import os 

7import tempfile 

8import time 

9from urllib.parse import urlparse 

10 

11from agentos.tools.base import BaseTool, ToolResult 

12 

13 

14class HttpRequestTool(BaseTool): 

15 """HTTP 请求工具 — 发送 GET/POST/PUT/DELETE 请求。""" 

16 

17 name = "http_request" 

18 description = "发送 HTTP 请求(GET/POST/PUT/DELETE),支持 JSON body、自定义 header" 

19 

20 @property 

21 def parameters(self) -> dict: 

22 return { 

23 "type": "object", 

24 "properties": { 

25 "url": {"type": "string", "description": "请求 URL"}, 

26 "method": {"type": "string", "description": "HTTP 方法:GET/POST/PUT/DELETE,默认 GET", "enum": ["GET", "POST", "PUT", "DELETE", "PATCH"]}, 

27 "body": {"type": "string", "description": "请求体(JSON 字符串)"}, 

28 "headers": {"type": "string", "description": "自定义 Header,JSON 格式串"}, 

29 "timeout": {"type": "integer", "description": "超时秒数,默认 30"}, 

30 }, 

31 "required": ["url"], 

32 } 

33 

34 async def execute(self, arguments: dict, sandbox=None) -> ToolResult: 

35 import urllib.request 

36 import urllib.error 

37 

38 url = arguments.get("url", "") 

39 method = arguments.get("method", "GET").upper() 

40 body = arguments.get("body", "") 

41 headers_str = arguments.get("headers", "{}") 

42 timeout = arguments.get("timeout", 30) 

43 

44 try: 

45 parsed_headers = json.loads(headers_str) if headers_str else {} 

46 except json.JSONDecodeError: 

47 return ToolResult.fail(call_id="", error=f"Invalid headers JSON: {headers_str}") 

48 

49 data = body.encode("utf-8") if body else None 

50 req = urllib.request.Request(url, data=data, method=method) 

51 req.add_header("User-Agent", "AgentOS-HttpTool/1.0") 

52 req.add_header("Accept", "application/json, text/plain, */*") 

53 if body: 

54 req.add_header("Content-Type", "application/json") 

55 for k, v in parsed_headers.items(): 

56 req.add_header(k, str(v)) 

57 

58 t0 = time.time() 

59 try: 

60 with urllib.request.urlopen(req, timeout=timeout) as resp: 

61 elapsed_ms = (time.time() - t0) * 1000 

62 raw_body = resp.read() 

63 text_body = raw_body.decode("utf-8", errors="replace") 

64 content_type = resp.headers.get("Content-Type", "") 

65 

66 output = ( 

67 f"Status: {resp.status}\n" 

68 f"Content-Type: {content_type}\n" 

69 f"Body length: {len(raw_body)} bytes\n" 

70 f"Elapsed: {elapsed_ms:.0f}ms\n\n" 

71 f"{text_body[:3000]}" 

72 ) 

73 return ToolResult.ok(call_id="", output=output) 

74 

75 except urllib.error.HTTPError as e: 

76 elapsed_ms = (time.time() - t0) * 1000 

77 error_body = "" 

78 try: 

79 error_body = e.read().decode("utf-8", errors="replace")[:1000] 

80 except Exception: 

81 pass 

82 return ToolResult.ok( 

83 call_id="", 

84 output=f"HTTP {e.code} {e.reason}\nElapsed: {elapsed_ms:.0f}ms\n\n{error_body}", 

85 ) 

86 except Exception as e: 

87 return ToolResult.fail(call_id="", error=f"Request failed: {e}") 

88 

89 

90class DownloadTool(BaseTool): 

91 """文件下载工具 — 下载 URL 内容到本地文件。""" 

92 

93 name = "download_file" 

94 description = "从 URL 下载文件到本地,返回本地路径和文件大小" 

95 

96 @property 

97 def parameters(self) -> dict: 

98 return { 

99 "type": "object", 

100 "properties": { 

101 "url": {"type": "string", "description": "下载 URL"}, 

102 "output_path": {"type": "string", "description": "输出目录或文件路径,默认临时目录"}, 

103 }, 

104 "required": ["url"], 

105 } 

106 

107 async def execute(self, arguments: dict, sandbox=None) -> ToolResult: 

108 import urllib.request 

109 

110 url = arguments.get("url", "") 

111 output_path = arguments.get("output_path", "") 

112 

113 parsed = urlparse(url) 

114 filename = os.path.basename(parsed.path) or "download" 

115 if output_path: 

116 if os.path.isdir(output_path) or output_path.endswith("/"): 

117 filepath = os.path.join(output_path, filename) 

118 else: 

119 filepath = output_path 

120 else: 

121 filepath = os.path.join(tempfile.gettempdir(), filename) 

122 

123 # Avoid overwriting 

124 if os.path.exists(filepath): 

125 base, ext = os.path.splitext(filename) 

126 counter = 1 

127 while os.path.exists(filepath): 

128 filepath = os.path.join(os.path.dirname(filepath), f"{base}_{counter}{ext}") 

129 counter += 1 

130 

131 os.makedirs(os.path.dirname(filepath) or ".", exist_ok=True) 

132 

133 t0 = time.time() 

134 try: 

135 with urllib.request.urlopen(url, timeout=300) as resp: 

136 total = 0 

137 with open(filepath, "wb") as f: 

138 while True: 

139 chunk = resp.read(8192) 

140 if not chunk: 

141 break 

142 f.write(chunk) 

143 total += len(chunk) 

144 

145 elapsed_ms = (time.time() - t0) * 1000 

146 size_mb = total / (1024 * 1024) 

147 return ToolResult.ok( 

148 call_id="", 

149 output=f"Downloaded: {filepath}\nSize: {total} bytes ({size_mb:.2f} MB)\nTime: {elapsed_ms:.0f}ms", 

150 ) 

151 except Exception as e: 

152 return ToolResult.fail(call_id="", error=f"Download failed: {e}")