Coverage for agentos/tools/http_tools.py: 21%
92 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""HTTP 工具 — HTTP 请求、文件下载。"""
3from __future__ import annotations
5import json
6import os
7import tempfile
8import time
9from urllib.parse import urlparse
11from agentos.tools.base import BaseTool, ToolResult
14class HttpRequestTool(BaseTool):
15 """HTTP 请求工具 — 发送 GET/POST/PUT/DELETE 请求。"""
17 name = "http_request"
18 description = "发送 HTTP 请求(GET/POST/PUT/DELETE),支持 JSON body、自定义 header"
20 @property
21 def parameters(self) -> dict:
22 return {
23 "type": "object",
24 "properties": {
25 "url": {"type": "string", "description": "请求 URL"},
26 "method": {"type": "string", "description": "HTTP 方法:GET/POST/PUT/DELETE,默认 GET", "enum": ["GET", "POST", "PUT", "DELETE", "PATCH"]},
27 "body": {"type": "string", "description": "请求体(JSON 字符串)"},
28 "headers": {"type": "string", "description": "自定义 Header,JSON 格式串"},
29 "timeout": {"type": "integer", "description": "超时秒数,默认 30"},
30 },
31 "required": ["url"],
32 }
34 async def execute(self, arguments: dict, sandbox=None) -> ToolResult:
35 import urllib.request
36 import urllib.error
38 url = arguments.get("url", "")
39 method = arguments.get("method", "GET").upper()
40 body = arguments.get("body", "")
41 headers_str = arguments.get("headers", "{}")
42 timeout = arguments.get("timeout", 30)
44 try:
45 parsed_headers = json.loads(headers_str) if headers_str else {}
46 except json.JSONDecodeError:
47 return ToolResult.fail(call_id="", error=f"Invalid headers JSON: {headers_str}")
49 data = body.encode("utf-8") if body else None
50 req = urllib.request.Request(url, data=data, method=method)
51 req.add_header("User-Agent", "AgentOS-HttpTool/1.0")
52 req.add_header("Accept", "application/json, text/plain, */*")
53 if body:
54 req.add_header("Content-Type", "application/json")
55 for k, v in parsed_headers.items():
56 req.add_header(k, str(v))
58 t0 = time.time()
59 try:
60 with urllib.request.urlopen(req, timeout=timeout) as resp:
61 elapsed_ms = (time.time() - t0) * 1000
62 raw_body = resp.read()
63 text_body = raw_body.decode("utf-8", errors="replace")
64 content_type = resp.headers.get("Content-Type", "")
66 output = (
67 f"Status: {resp.status}\n"
68 f"Content-Type: {content_type}\n"
69 f"Body length: {len(raw_body)} bytes\n"
70 f"Elapsed: {elapsed_ms:.0f}ms\n\n"
71 f"{text_body[:3000]}"
72 )
73 return ToolResult.ok(call_id="", output=output)
75 except urllib.error.HTTPError as e:
76 elapsed_ms = (time.time() - t0) * 1000
77 error_body = ""
78 try:
79 error_body = e.read().decode("utf-8", errors="replace")[:1000]
80 except Exception:
81 pass
82 return ToolResult.ok(
83 call_id="",
84 output=f"HTTP {e.code} {e.reason}\nElapsed: {elapsed_ms:.0f}ms\n\n{error_body}",
85 )
86 except Exception as e:
87 return ToolResult.fail(call_id="", error=f"Request failed: {e}")
90class DownloadTool(BaseTool):
91 """文件下载工具 — 下载 URL 内容到本地文件。"""
93 name = "download_file"
94 description = "从 URL 下载文件到本地,返回本地路径和文件大小"
96 @property
97 def parameters(self) -> dict:
98 return {
99 "type": "object",
100 "properties": {
101 "url": {"type": "string", "description": "下载 URL"},
102 "output_path": {"type": "string", "description": "输出目录或文件路径,默认临时目录"},
103 },
104 "required": ["url"],
105 }
107 async def execute(self, arguments: dict, sandbox=None) -> ToolResult:
108 import urllib.request
110 url = arguments.get("url", "")
111 output_path = arguments.get("output_path", "")
113 parsed = urlparse(url)
114 filename = os.path.basename(parsed.path) or "download"
115 if output_path:
116 if os.path.isdir(output_path) or output_path.endswith("/"):
117 filepath = os.path.join(output_path, filename)
118 else:
119 filepath = output_path
120 else:
121 filepath = os.path.join(tempfile.gettempdir(), filename)
123 # Avoid overwriting
124 if os.path.exists(filepath):
125 base, ext = os.path.splitext(filename)
126 counter = 1
127 while os.path.exists(filepath):
128 filepath = os.path.join(os.path.dirname(filepath), f"{base}_{counter}{ext}")
129 counter += 1
131 os.makedirs(os.path.dirname(filepath) or ".", exist_ok=True)
133 t0 = time.time()
134 try:
135 with urllib.request.urlopen(url, timeout=300) as resp:
136 total = 0
137 with open(filepath, "wb") as f:
138 while True:
139 chunk = resp.read(8192)
140 if not chunk:
141 break
142 f.write(chunk)
143 total += len(chunk)
145 elapsed_ms = (time.time() - t0) * 1000
146 size_mb = total / (1024 * 1024)
147 return ToolResult.ok(
148 call_id="",
149 output=f"Downloaded: {filepath}\nSize: {total} bytes ({size_mb:.2f} MB)\nTime: {elapsed_ms:.0f}ms",
150 )
151 except Exception as e:
152 return ToolResult.fail(call_id="", error=f"Download failed: {e}")