Coverage for agentos/tools/data_tools.py: 19%
107 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""数据处理工具 — JSON/CSV 解析、格式化、查询。"""
3from __future__ import annotations
5import csv
6import json
7import os
8from io import StringIO
9from typing import Any
11from agentos.tools.base import BaseTool, ToolResult
14class JsonTool(BaseTool):
15 """JSON 处理工具 — 解析、格式化、JSONPath 查询、验证。"""
17 name = "json_tool"
18 description = "JSON 解析、格式化、JSONPath 查询、Schema 验证。输入 JSON 字符串或 .json 文件路径"
20 @property
21 def parameters(self) -> dict:
22 return {
23 "type": "object",
24 "properties": {
25 "action": {"type": "string", "description": "操作类型:parse/format/query/validate", "enum": ["parse", "format", "query", "validate"]},
26 "input": {"type": "string", "description": "JSON 字符串或 .json 文件路径"},
27 "jsonpath": {"type": "string", "description": "JSONPath 查询表达式(仅 query),如 $.store.book[0].title"},
28 "indent": {"type": "integer", "description": "缩进空格数,默认 2"},
29 },
30 "required": ["action", "input"],
31 }
33 async def execute(self, arguments: dict, sandbox=None) -> ToolResult:
34 action = arguments.get("action", "parse")
35 input_data = arguments.get("input", "")
36 jsonpath = arguments.get("jsonpath", "$")
37 indent = arguments.get("indent", 2)
39 # Read file if path
40 if os.path.isfile(input_data):
41 try:
42 with open(input_data, "r", encoding="utf-8") as f:
43 data_str = f.read()
44 except Exception as e:
45 return ToolResult.fail(call_id="", error=f"File read error: {e}")
46 else:
47 data_str = input_data
49 # Parse
50 try:
51 data = json.loads(data_str)
52 except json.JSONDecodeError as e:
53 return ToolResult.fail(call_id="", error=f"JSON parse error: {e}")
55 if action == "parse":
56 info = f"Type: {type(data).__name__}\n"
57 if isinstance(data, dict):
58 info += f"Keys: {list(data.keys())[:20]}\n"
59 if isinstance(data, (list, dict)):
60 info += f"Length: {len(data)}\n"
61 info += f"Sample: {json.dumps(data, indent=indent, ensure_ascii=False)[:1000]}"
62 return ToolResult.ok(call_id="", output=info)
64 elif action == "format":
65 formatted = json.dumps(data, indent=indent, ensure_ascii=False)
66 return ToolResult.ok(call_id="", output=formatted)
68 elif action == "query":
69 result = self._jsonpath_query(data, jsonpath)
70 output = json.dumps(result, indent=indent, ensure_ascii=False) if result is not None else "null"
71 return ToolResult.ok(call_id="", output=output)
73 elif action == "validate":
74 return ToolResult.ok(
75 call_id="",
76 output=f"Valid JSON. Type: {type(data).__name__}. "
77 f"Size: {len(data_str)} chars. "
78 f"{'Keys: ' + str(list(data.keys())[:20]) if isinstance(data, dict) else ''}",
79 )
81 return ToolResult.fail(call_id="", error=f"Unknown action: {action}")
83 def _jsonpath_query(self, data: Any, path: str) -> Any:
84 if path == "$":
85 return data
86 parts = path.replace("[", ".").replace("]", "").split(".")
87 current = data
88 for part in parts:
89 if not part or part == "$":
90 continue
91 if isinstance(current, dict):
92 current = current.get(part)
93 elif isinstance(current, list):
94 try:
95 current = current[int(part)]
96 except (ValueError, IndexError):
97 return None
98 else:
99 return None
100 return current
103class CsvTool(BaseTool):
104 """CSV 处理工具 — 读取、查询、统计。"""
106 name = "csv_tool"
107 description = "CSV 文件读取、列提取、基本统计。输入 .csv 文件路径"
109 @property
110 def parameters(self) -> dict:
111 return {
112 "type": "object",
113 "properties": {
114 "action": {"type": "string", "description": "操作类型:read/stats/query", "enum": ["read", "stats", "query"]},
115 "input": {"type": "string", "description": ".csv 文件路径"},
116 "columns": {"type": "string", "description": "要提取的列名,逗号分隔(仅 query)"},
117 "limit": {"type": "integer", "description": "最大行数,默认 50"},
118 },
119 "required": ["action", "input"],
120 }
122 async def execute(self, arguments: dict, sandbox=None) -> ToolResult:
123 action = arguments.get("action", "read")
124 input_data = arguments.get("input", "")
125 columns = arguments.get("columns", "")
126 limit = arguments.get("limit", 50)
128 # Try as file path
129 if os.path.isfile(input_data):
130 try:
131 with open(input_data, "r", encoding="utf-8", errors="ignore") as f:
132 data_str = f.read()
133 except Exception as e:
134 return ToolResult.fail(call_id="", error=f"File read error: {e}")
135 else:
136 data_str = input_data
138 try:
139 reader = csv.DictReader(StringIO(data_str))
140 col_names = reader.fieldnames or []
141 rows = [row for i, row in enumerate(reader) if i < limit]
142 except Exception as e:
143 return ToolResult.fail(call_id="", error=f"CSV parse error: {e}")
145 if action == "read":
146 output = f"Columns: {col_names}\nRows: {len(rows)}\n\n"
147 for row in rows[:20]:
148 output += str(row) + "\n"
149 return ToolResult.ok(call_id="", output=output)
151 elif action == "stats":
152 output = f"Columns: {col_names}\nTotal rows loaded: {len(rows)}\n\n"
153 for col in col_names:
154 values = [row[col] for row in rows if row.get(col)]
155 unique = len(set(values))
156 output += f" {col}: {unique} unique values, sample={values[:3]}\n"
157 return ToolResult.ok(call_id="", output=output)
159 elif action == "query":
160 target_cols = [c.strip() for c in columns.split(",")] if columns else col_names
161 output = f"Columns: {target_cols}\n\n"
162 for row in rows:
163 output += ", ".join(f"{c}={row.get(c, '')}" for c in target_cols if c in row) + "\n"
164 return ToolResult.ok(call_id="", output=output)
166 return ToolResult.fail(call_id="", error=f"Unknown action: {action}")