Coverage for agentos/tools/data_tools.py: 19%

107 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1"""数据处理工具 — JSON/CSV 解析、格式化、查询。""" 

2 

3from __future__ import annotations 

4 

5import csv 

6import json 

7import os 

8from io import StringIO 

9from typing import Any 

10 

11from agentos.tools.base import BaseTool, ToolResult 

12 

13 

14class JsonTool(BaseTool): 

15 """JSON 处理工具 — 解析、格式化、JSONPath 查询、验证。""" 

16 

17 name = "json_tool" 

18 description = "JSON 解析、格式化、JSONPath 查询、Schema 验证。输入 JSON 字符串或 .json 文件路径" 

19 

20 @property 

21 def parameters(self) -> dict: 

22 return { 

23 "type": "object", 

24 "properties": { 

25 "action": {"type": "string", "description": "操作类型:parse/format/query/validate", "enum": ["parse", "format", "query", "validate"]}, 

26 "input": {"type": "string", "description": "JSON 字符串或 .json 文件路径"}, 

27 "jsonpath": {"type": "string", "description": "JSONPath 查询表达式(仅 query),如 $.store.book[0].title"}, 

28 "indent": {"type": "integer", "description": "缩进空格数,默认 2"}, 

29 }, 

30 "required": ["action", "input"], 

31 } 

32 

33 async def execute(self, arguments: dict, sandbox=None) -> ToolResult: 

34 action = arguments.get("action", "parse") 

35 input_data = arguments.get("input", "") 

36 jsonpath = arguments.get("jsonpath", "$") 

37 indent = arguments.get("indent", 2) 

38 

39 # Read file if path 

40 if os.path.isfile(input_data): 

41 try: 

42 with open(input_data, "r", encoding="utf-8") as f: 

43 data_str = f.read() 

44 except Exception as e: 

45 return ToolResult.fail(call_id="", error=f"File read error: {e}") 

46 else: 

47 data_str = input_data 

48 

49 # Parse 

50 try: 

51 data = json.loads(data_str) 

52 except json.JSONDecodeError as e: 

53 return ToolResult.fail(call_id="", error=f"JSON parse error: {e}") 

54 

55 if action == "parse": 

56 info = f"Type: {type(data).__name__}\n" 

57 if isinstance(data, dict): 

58 info += f"Keys: {list(data.keys())[:20]}\n" 

59 if isinstance(data, (list, dict)): 

60 info += f"Length: {len(data)}\n" 

61 info += f"Sample: {json.dumps(data, indent=indent, ensure_ascii=False)[:1000]}" 

62 return ToolResult.ok(call_id="", output=info) 

63 

64 elif action == "format": 

65 formatted = json.dumps(data, indent=indent, ensure_ascii=False) 

66 return ToolResult.ok(call_id="", output=formatted) 

67 

68 elif action == "query": 

69 result = self._jsonpath_query(data, jsonpath) 

70 output = json.dumps(result, indent=indent, ensure_ascii=False) if result is not None else "null" 

71 return ToolResult.ok(call_id="", output=output) 

72 

73 elif action == "validate": 

74 return ToolResult.ok( 

75 call_id="", 

76 output=f"Valid JSON. Type: {type(data).__name__}. " 

77 f"Size: {len(data_str)} chars. " 

78 f"{'Keys: ' + str(list(data.keys())[:20]) if isinstance(data, dict) else ''}", 

79 ) 

80 

81 return ToolResult.fail(call_id="", error=f"Unknown action: {action}") 

82 

83 def _jsonpath_query(self, data: Any, path: str) -> Any: 

84 if path == "$": 

85 return data 

86 parts = path.replace("[", ".").replace("]", "").split(".") 

87 current = data 

88 for part in parts: 

89 if not part or part == "$": 

90 continue 

91 if isinstance(current, dict): 

92 current = current.get(part) 

93 elif isinstance(current, list): 

94 try: 

95 current = current[int(part)] 

96 except (ValueError, IndexError): 

97 return None 

98 else: 

99 return None 

100 return current 

101 

102 

103class CsvTool(BaseTool): 

104 """CSV 处理工具 — 读取、查询、统计。""" 

105 

106 name = "csv_tool" 

107 description = "CSV 文件读取、列提取、基本统计。输入 .csv 文件路径" 

108 

109 @property 

110 def parameters(self) -> dict: 

111 return { 

112 "type": "object", 

113 "properties": { 

114 "action": {"type": "string", "description": "操作类型:read/stats/query", "enum": ["read", "stats", "query"]}, 

115 "input": {"type": "string", "description": ".csv 文件路径"}, 

116 "columns": {"type": "string", "description": "要提取的列名,逗号分隔(仅 query)"}, 

117 "limit": {"type": "integer", "description": "最大行数,默认 50"}, 

118 }, 

119 "required": ["action", "input"], 

120 } 

121 

122 async def execute(self, arguments: dict, sandbox=None) -> ToolResult: 

123 action = arguments.get("action", "read") 

124 input_data = arguments.get("input", "") 

125 columns = arguments.get("columns", "") 

126 limit = arguments.get("limit", 50) 

127 

128 # Try as file path 

129 if os.path.isfile(input_data): 

130 try: 

131 with open(input_data, "r", encoding="utf-8", errors="ignore") as f: 

132 data_str = f.read() 

133 except Exception as e: 

134 return ToolResult.fail(call_id="", error=f"File read error: {e}") 

135 else: 

136 data_str = input_data 

137 

138 try: 

139 reader = csv.DictReader(StringIO(data_str)) 

140 col_names = reader.fieldnames or [] 

141 rows = [row for i, row in enumerate(reader) if i < limit] 

142 except Exception as e: 

143 return ToolResult.fail(call_id="", error=f"CSV parse error: {e}") 

144 

145 if action == "read": 

146 output = f"Columns: {col_names}\nRows: {len(rows)}\n\n" 

147 for row in rows[:20]: 

148 output += str(row) + "\n" 

149 return ToolResult.ok(call_id="", output=output) 

150 

151 elif action == "stats": 

152 output = f"Columns: {col_names}\nTotal rows loaded: {len(rows)}\n\n" 

153 for col in col_names: 

154 values = [row[col] for row in rows if row.get(col)] 

155 unique = len(set(values)) 

156 output += f" {col}: {unique} unique values, sample={values[:3]}\n" 

157 return ToolResult.ok(call_id="", output=output) 

158 

159 elif action == "query": 

160 target_cols = [c.strip() for c in columns.split(",")] if columns else col_names 

161 output = f"Columns: {target_cols}\n\n" 

162 for row in rows: 

163 output += ", ".join(f"{c}={row.get(c, '')}" for c in target_cols if c in row) + "\n" 

164 return ToolResult.ok(call_id="", output=output) 

165 

166 return ToolResult.fail(call_id="", error=f"Unknown action: {action}")