Coverage for agentos/system/file_ops.py: 20%

193 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2文件操作模块 — 带权限检查的文件系统读写。 

3 

4设计原则: 

5- 所有操作前检查权限 

6- 读操作可穿透任意路径 

7- 写操作区分沙箱/全盘 

8- 操作结果统一为 FileOpResult 

9""" 

10 

11from __future__ import annotations 

12 

13import os 

14import shutil 

15import mimetypes 

16from dataclasses import dataclass, field 

17from datetime import datetime 

18from typing import Optional 

19 

20from agentos.system.permissions import ( 

21 SystemPermissionManager, 

22 PermissionTier, 

23 PermissionDenied, 

24) 

25 

26 

27@dataclass 

28class FileListing: 

29 """文件/目录条目。""" 

30 name: str 

31 path: str 

32 is_dir: bool 

33 size_bytes: int = 0 

34 modified_at: str = "" 

35 mime_type: str = "" 

36 

37 

38@dataclass 

39class FileOpResult: 

40 """文件操作结果。""" 

41 success: bool 

42 action: str # read/write/delete/move/copy/mkdir/list 

43 path: str 

44 content: str = "" # 读取的内容 

45 listing: list[FileListing] = field(default_factory=list) 

46 error: str = "" 

47 bytes_written: int = 0 

48 

49 

50class FileOperator: 

51 """文件操作器 — 带权限检查的文件系统接口。""" 

52 

53 def __init__(self, perm_manager: SystemPermissionManager, session_id: str): 

54 self._pm = perm_manager 

55 self._sid = session_id 

56 

57 # ── 读取操作 ── 

58 

59 def read(self, file_path: str) -> FileOpResult: 

60 """读取文件内容。""" 

61 path = os.path.abspath(os.path.expanduser(file_path)) 

62 try: 

63 self._pm.require(self._sid, PermissionTier.READ, path) 

64 except PermissionDenied as e: 

65 return FileOpResult(success=False, action="read", path=path, error=str(e)) 

66 

67 try: 

68 # 自动检测是否为文本文件 

69 mime, _ = mimetypes.guess_type(path) 

70 if mime and mime.startswith("text/") or path.endswith((".py", ".md", ".txt", ".json", ".yaml", ".yml", ".toml", ".cfg", ".ini", ".log", ".csv", ".xml", ".html", ".css", ".js", ".ts", ".sh", ".bash", ".env", ".gitignore")): 

71 with open(path, "r", encoding="utf-8", errors="replace") as f: 

72 content = f.read() 

73 return FileOpResult(success=True, action="read", path=path, content=content) 

74 else: 

75 # 二进制文件返回预览 

76 size = os.path.getsize(path) 

77 return FileOpResult( 

78 success=True, action="read", path=path, 

79 content=f"[Binary file, {self._format_size(size)}]", 

80 ) 

81 except Exception as e: 

82 return FileOpResult(success=False, action="read", path=path, error=str(e)) 

83 

84 def read_bytes(self, file_path: str, max_bytes: int = 1024 * 1024) -> FileOpResult: 

85 """读取二进制文件(限制大小)。""" 

86 path = os.path.abspath(os.path.expanduser(file_path)) 

87 try: 

88 self._pm.require(self._sid, PermissionTier.READ, path) 

89 except PermissionDenied as e: 

90 return FileOpResult(success=False, action="read", path=path, error=str(e)) 

91 

92 try: 

93 with open(path, "rb") as f: 

94 data = f.read(max_bytes) 

95 # Base64 编码返回 

96 import base64 

97 encoded = base64.b64encode(data).decode("ascii") 

98 return FileOpResult( 

99 success=True, action="read", path=path, 

100 content=encoded, bytes_written=len(data), 

101 ) 

102 except Exception as e: 

103 return FileOpResult(success=False, action="read", path=path, error=str(e)) 

104 

105 # ── 列表操作 ── 

106 

107 def list_dir(self, dir_path: str, show_hidden: bool = False) -> FileOpResult: 

108 """列出目录内容。""" 

109 path = os.path.abspath(os.path.expanduser(dir_path)) 

110 try: 

111 self._pm.require(self._sid, PermissionTier.READ, path) 

112 except PermissionDenied as e: 

113 return FileOpResult(success=False, action="list", path=path, error=str(e)) 

114 

115 if not os.path.isdir(path): 

116 return FileOpResult(success=False, action="list", path=path, error=f"不是目录: {path}") 

117 

118 try: 

119 entries = [] 

120 for name in sorted(os.listdir(path)): 

121 if not show_hidden and name.startswith("."): 

122 continue 

123 full = os.path.join(path, name) 

124 stat = os.stat(full) 

125 mime, _ = mimetypes.guess_type(full) 

126 entries.append(FileListing( 

127 name=name, 

128 path=full, 

129 is_dir=os.path.isdir(full), 

130 size_bytes=stat.st_size, 

131 modified_at=datetime.fromtimestamp(stat.st_mtime).isoformat(), 

132 mime_type=mime or ("inode/directory" if os.path.isdir(full) else "application/octet-stream"), 

133 )) 

134 return FileOpResult(success=True, action="list", path=path, listing=entries) 

135 except Exception as e: 

136 return FileOpResult(success=False, action="list", path=path, error=str(e)) 

137 

138 def search(self, root_dir: str, pattern: str, max_depth: int = 5) -> FileOpResult: 

139 """递归搜索文件(类似 find + glob)。""" 

140 import fnmatch 

141 path = os.path.abspath(os.path.expanduser(root_dir)) 

142 try: 

143 self._pm.require(self._sid, PermissionTier.READ, path) 

144 except PermissionDenied as e: 

145 return FileOpResult(success=False, action="list", path=path, error=str(e)) 

146 

147 results: list[FileListing] = [] 

148 try: 

149 for dirpath, dirnames, filenames in os.walk(path): 

150 depth = dirpath[len(path):].count(os.sep) 

151 if depth >= max_depth: 

152 dirnames.clear() 

153 continue 

154 # 跳过隐藏目录 

155 dirnames[:] = [d for d in dirnames if not d.startswith(".")] 

156 for fname in filenames: 

157 if fnmatch.fnmatch(fname, pattern): 

158 full = os.path.join(dirpath, fname) 

159 stat = os.stat(full) 

160 results.append(FileListing( 

161 name=fname, 

162 path=full, 

163 is_dir=False, 

164 size_bytes=stat.st_size, 

165 modified_at=datetime.fromtimestamp(stat.st_mtime).isoformat(), 

166 )) 

167 return FileOpResult(success=True, action="list", path=path, listing=results) 

168 except Exception as e: 

169 return FileOpResult(success=False, action="list", path=path, error=str(e)) 

170 

171 # ── 写入操作 ── 

172 

173 def write(self, file_path: str, content: str) -> FileOpResult: 

174 """写入文本文件。""" 

175 path = os.path.abspath(os.path.expanduser(file_path)) 

176 # 判断需要沙箱还是全盘权限 

177 sandbox_paths = ["/tmp/agentos/", "/home/marvis/Marvis/"] 

178 needs_full = not any(path.startswith(sp) for sp in sandbox_paths) 

179 tier = PermissionTier.WRITE_ALL if needs_full else PermissionTier.WRITE_SANDBOX 

180 

181 try: 

182 self._pm.require(self._sid, tier, path) 

183 except PermissionDenied as e: 

184 return FileOpResult(success=False, action="write", path=path, error=str(e)) 

185 

186 try: 

187 os.makedirs(os.path.dirname(path) or ".", exist_ok=True) 

188 with open(path, "w", encoding="utf-8") as f: 

189 f.write(content) 

190 return FileOpResult( 

191 success=True, action="write", path=path, 

192 bytes_written=len(content.encode("utf-8")), 

193 ) 

194 except Exception as e: 

195 return FileOpResult(success=False, action="write", path=path, error=str(e)) 

196 

197 def write_bytes(self, file_path: str, data: bytes) -> FileOpResult: 

198 """写入二进制文件。""" 

199 path = os.path.abspath(os.path.expanduser(file_path)) 

200 sandbox_paths = ["/tmp/agentos/", "/home/marvis/Marvis/"] 

201 needs_full = not any(path.startswith(sp) for sp in sandbox_paths) 

202 tier = PermissionTier.WRITE_ALL if needs_full else PermissionTier.WRITE_SANDBOX 

203 

204 try: 

205 self._pm.require(self._sid, tier, path) 

206 except PermissionDenied as e: 

207 return FileOpResult(success=False, action="write", path=path, error=str(e)) 

208 

209 try: 

210 os.makedirs(os.path.dirname(path) or ".", exist_ok=True) 

211 with open(path, "wb") as f: 

212 f.write(data) 

213 return FileOpResult(success=True, action="write", path=path, bytes_written=len(data)) 

214 except Exception as e: 

215 return FileOpResult(success=False, action="write", path=path, error=str(e)) 

216 

217 # ── 删除/移动 ── 

218 

219 def delete(self, target_path: str) -> FileOpResult: 

220 """删除文件或目录(高风险,需 WRITE_ALL 权限)。""" 

221 path = os.path.abspath(os.path.expanduser(target_path)) 

222 try: 

223 self._pm.require(self._sid, PermissionTier.WRITE_ALL, path) 

224 except PermissionDenied as e: 

225 return FileOpResult(success=False, action="delete", path=path, error=str(e)) 

226 

227 try: 

228 if os.path.isdir(path): 

229 shutil.rmtree(path) 

230 else: 

231 os.remove(path) 

232 return FileOpResult(success=True, action="delete", path=path) 

233 except Exception as e: 

234 return FileOpResult(success=False, action="delete", path=path, error=str(e)) 

235 

236 def move(self, src: str, dst: str) -> FileOpResult: 

237 """移动/重命名文件。""" 

238 src_path = os.path.abspath(os.path.expanduser(src)) 

239 dst_path = os.path.abspath(os.path.expanduser(dst)) 

240 try: 

241 self._pm.require(self._sid, PermissionTier.WRITE_ALL, src_path) 

242 self._pm.require(self._sid, PermissionTier.WRITE_ALL, dst_path) 

243 except PermissionDenied as e: 

244 return FileOpResult(success=False, action="move", path=src_path, error=str(e)) 

245 

246 try: 

247 shutil.move(src_path, dst_path) 

248 return FileOpResult(success=True, action="move", path=dst_path) 

249 except Exception as e: 

250 return FileOpResult(success=False, action="move", path=src_path, error=str(e)) 

251 

252 def mkdir(self, dir_path: str) -> FileOpResult: 

253 """创建目录。""" 

254 path = os.path.abspath(os.path.expanduser(dir_path)) 

255 sandbox_paths = ["/tmp/agentos/", "/home/marvis/Marvis/"] 

256 needs_full = not any(path.startswith(sp) for sp in sandbox_paths) 

257 tier = PermissionTier.WRITE_ALL if needs_full else PermissionTier.WRITE_SANDBOX 

258 

259 try: 

260 self._pm.require(self._sid, tier, path) 

261 except PermissionDenied as e: 

262 return FileOpResult(success=False, action="mkdir", path=path, error=str(e)) 

263 

264 try: 

265 os.makedirs(path, exist_ok=True) 

266 return FileOpResult(success=True, action="mkdir", path=path) 

267 except Exception as e: 

268 return FileOpResult(success=False, action="mkdir", path=path, error=str(e)) 

269 

270 # ── 文件信息 ── 

271 

272 def stat(self, file_path: str) -> FileOpResult: 

273 """获取文件/目录详细信息。""" 

274 path = os.path.abspath(os.path.expanduser(file_path)) 

275 try: 

276 self._pm.require(self._sid, PermissionTier.READ, path) 

277 except PermissionDenied as e: 

278 return FileOpResult(success=False, action="read", path=path, error=str(e)) 

279 

280 try: 

281 st = os.stat(path) 

282 info_lines = [ 

283 f"路径: {path}", 

284 f"类型: {'目录' if os.path.isdir(path) else '文件'}", 

285 f"大小: {self._format_size(st.st_size)}", 

286 f"权限: {oct(st.st_mode)[-3:]}", 

287 f"修改时间: {datetime.fromtimestamp(st.st_mtime).isoformat()}", 

288 f"创建时间: {datetime.fromtimestamp(st.st_ctime).isoformat()}", 

289 f"inode: {st.st_ino}", 

290 ] 

291 return FileOpResult(success=True, action="read", path=path, content="\n".join(info_lines)) 

292 except Exception as e: 

293 return FileOpResult(success=False, action="read", path=path, error=str(e)) 

294 

295 # ── 工具 ── 

296 

297 @staticmethod 

298 def _format_size(size: int) -> str: 

299 for unit in ["B", "KB", "MB", "GB", "TB"]: 

300 if size < 1024: 

301 return f"{size:.1f} {unit}" 

302 size /= 1024 

303 return f"{size:.1f} PB"