Coverage for agentos/system/file_ops.py: 20%
193 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2文件操作模块 — 带权限检查的文件系统读写。
4设计原则:
5- 所有操作前检查权限
6- 读操作可穿透任意路径
7- 写操作区分沙箱/全盘
8- 操作结果统一为 FileOpResult
9"""
11from __future__ import annotations
13import os
14import shutil
15import mimetypes
16from dataclasses import dataclass, field
17from datetime import datetime
18from typing import Optional
20from agentos.system.permissions import (
21 SystemPermissionManager,
22 PermissionTier,
23 PermissionDenied,
24)
27@dataclass
28class FileListing:
29 """文件/目录条目。"""
30 name: str
31 path: str
32 is_dir: bool
33 size_bytes: int = 0
34 modified_at: str = ""
35 mime_type: str = ""
38@dataclass
39class FileOpResult:
40 """文件操作结果。"""
41 success: bool
42 action: str # read/write/delete/move/copy/mkdir/list
43 path: str
44 content: str = "" # 读取的内容
45 listing: list[FileListing] = field(default_factory=list)
46 error: str = ""
47 bytes_written: int = 0
50class FileOperator:
51 """文件操作器 — 带权限检查的文件系统接口。"""
53 def __init__(self, perm_manager: SystemPermissionManager, session_id: str):
54 self._pm = perm_manager
55 self._sid = session_id
57 # ── 读取操作 ──
59 def read(self, file_path: str) -> FileOpResult:
60 """读取文件内容。"""
61 path = os.path.abspath(os.path.expanduser(file_path))
62 try:
63 self._pm.require(self._sid, PermissionTier.READ, path)
64 except PermissionDenied as e:
65 return FileOpResult(success=False, action="read", path=path, error=str(e))
67 try:
68 # 自动检测是否为文本文件
69 mime, _ = mimetypes.guess_type(path)
70 if mime and mime.startswith("text/") or path.endswith((".py", ".md", ".txt", ".json", ".yaml", ".yml", ".toml", ".cfg", ".ini", ".log", ".csv", ".xml", ".html", ".css", ".js", ".ts", ".sh", ".bash", ".env", ".gitignore")):
71 with open(path, "r", encoding="utf-8", errors="replace") as f:
72 content = f.read()
73 return FileOpResult(success=True, action="read", path=path, content=content)
74 else:
75 # 二进制文件返回预览
76 size = os.path.getsize(path)
77 return FileOpResult(
78 success=True, action="read", path=path,
79 content=f"[Binary file, {self._format_size(size)}]",
80 )
81 except Exception as e:
82 return FileOpResult(success=False, action="read", path=path, error=str(e))
84 def read_bytes(self, file_path: str, max_bytes: int = 1024 * 1024) -> FileOpResult:
85 """读取二进制文件(限制大小)。"""
86 path = os.path.abspath(os.path.expanduser(file_path))
87 try:
88 self._pm.require(self._sid, PermissionTier.READ, path)
89 except PermissionDenied as e:
90 return FileOpResult(success=False, action="read", path=path, error=str(e))
92 try:
93 with open(path, "rb") as f:
94 data = f.read(max_bytes)
95 # Base64 编码返回
96 import base64
97 encoded = base64.b64encode(data).decode("ascii")
98 return FileOpResult(
99 success=True, action="read", path=path,
100 content=encoded, bytes_written=len(data),
101 )
102 except Exception as e:
103 return FileOpResult(success=False, action="read", path=path, error=str(e))
105 # ── 列表操作 ──
107 def list_dir(self, dir_path: str, show_hidden: bool = False) -> FileOpResult:
108 """列出目录内容。"""
109 path = os.path.abspath(os.path.expanduser(dir_path))
110 try:
111 self._pm.require(self._sid, PermissionTier.READ, path)
112 except PermissionDenied as e:
113 return FileOpResult(success=False, action="list", path=path, error=str(e))
115 if not os.path.isdir(path):
116 return FileOpResult(success=False, action="list", path=path, error=f"不是目录: {path}")
118 try:
119 entries = []
120 for name in sorted(os.listdir(path)):
121 if not show_hidden and name.startswith("."):
122 continue
123 full = os.path.join(path, name)
124 stat = os.stat(full)
125 mime, _ = mimetypes.guess_type(full)
126 entries.append(FileListing(
127 name=name,
128 path=full,
129 is_dir=os.path.isdir(full),
130 size_bytes=stat.st_size,
131 modified_at=datetime.fromtimestamp(stat.st_mtime).isoformat(),
132 mime_type=mime or ("inode/directory" if os.path.isdir(full) else "application/octet-stream"),
133 ))
134 return FileOpResult(success=True, action="list", path=path, listing=entries)
135 except Exception as e:
136 return FileOpResult(success=False, action="list", path=path, error=str(e))
138 def search(self, root_dir: str, pattern: str, max_depth: int = 5) -> FileOpResult:
139 """递归搜索文件(类似 find + glob)。"""
140 import fnmatch
141 path = os.path.abspath(os.path.expanduser(root_dir))
142 try:
143 self._pm.require(self._sid, PermissionTier.READ, path)
144 except PermissionDenied as e:
145 return FileOpResult(success=False, action="list", path=path, error=str(e))
147 results: list[FileListing] = []
148 try:
149 for dirpath, dirnames, filenames in os.walk(path):
150 depth = dirpath[len(path):].count(os.sep)
151 if depth >= max_depth:
152 dirnames.clear()
153 continue
154 # 跳过隐藏目录
155 dirnames[:] = [d for d in dirnames if not d.startswith(".")]
156 for fname in filenames:
157 if fnmatch.fnmatch(fname, pattern):
158 full = os.path.join(dirpath, fname)
159 stat = os.stat(full)
160 results.append(FileListing(
161 name=fname,
162 path=full,
163 is_dir=False,
164 size_bytes=stat.st_size,
165 modified_at=datetime.fromtimestamp(stat.st_mtime).isoformat(),
166 ))
167 return FileOpResult(success=True, action="list", path=path, listing=results)
168 except Exception as e:
169 return FileOpResult(success=False, action="list", path=path, error=str(e))
171 # ── 写入操作 ──
173 def write(self, file_path: str, content: str) -> FileOpResult:
174 """写入文本文件。"""
175 path = os.path.abspath(os.path.expanduser(file_path))
176 # 判断需要沙箱还是全盘权限
177 sandbox_paths = ["/tmp/agentos/", "/home/marvis/Marvis/"]
178 needs_full = not any(path.startswith(sp) for sp in sandbox_paths)
179 tier = PermissionTier.WRITE_ALL if needs_full else PermissionTier.WRITE_SANDBOX
181 try:
182 self._pm.require(self._sid, tier, path)
183 except PermissionDenied as e:
184 return FileOpResult(success=False, action="write", path=path, error=str(e))
186 try:
187 os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
188 with open(path, "w", encoding="utf-8") as f:
189 f.write(content)
190 return FileOpResult(
191 success=True, action="write", path=path,
192 bytes_written=len(content.encode("utf-8")),
193 )
194 except Exception as e:
195 return FileOpResult(success=False, action="write", path=path, error=str(e))
197 def write_bytes(self, file_path: str, data: bytes) -> FileOpResult:
198 """写入二进制文件。"""
199 path = os.path.abspath(os.path.expanduser(file_path))
200 sandbox_paths = ["/tmp/agentos/", "/home/marvis/Marvis/"]
201 needs_full = not any(path.startswith(sp) for sp in sandbox_paths)
202 tier = PermissionTier.WRITE_ALL if needs_full else PermissionTier.WRITE_SANDBOX
204 try:
205 self._pm.require(self._sid, tier, path)
206 except PermissionDenied as e:
207 return FileOpResult(success=False, action="write", path=path, error=str(e))
209 try:
210 os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
211 with open(path, "wb") as f:
212 f.write(data)
213 return FileOpResult(success=True, action="write", path=path, bytes_written=len(data))
214 except Exception as e:
215 return FileOpResult(success=False, action="write", path=path, error=str(e))
217 # ── 删除/移动 ──
219 def delete(self, target_path: str) -> FileOpResult:
220 """删除文件或目录(高风险,需 WRITE_ALL 权限)。"""
221 path = os.path.abspath(os.path.expanduser(target_path))
222 try:
223 self._pm.require(self._sid, PermissionTier.WRITE_ALL, path)
224 except PermissionDenied as e:
225 return FileOpResult(success=False, action="delete", path=path, error=str(e))
227 try:
228 if os.path.isdir(path):
229 shutil.rmtree(path)
230 else:
231 os.remove(path)
232 return FileOpResult(success=True, action="delete", path=path)
233 except Exception as e:
234 return FileOpResult(success=False, action="delete", path=path, error=str(e))
236 def move(self, src: str, dst: str) -> FileOpResult:
237 """移动/重命名文件。"""
238 src_path = os.path.abspath(os.path.expanduser(src))
239 dst_path = os.path.abspath(os.path.expanduser(dst))
240 try:
241 self._pm.require(self._sid, PermissionTier.WRITE_ALL, src_path)
242 self._pm.require(self._sid, PermissionTier.WRITE_ALL, dst_path)
243 except PermissionDenied as e:
244 return FileOpResult(success=False, action="move", path=src_path, error=str(e))
246 try:
247 shutil.move(src_path, dst_path)
248 return FileOpResult(success=True, action="move", path=dst_path)
249 except Exception as e:
250 return FileOpResult(success=False, action="move", path=src_path, error=str(e))
252 def mkdir(self, dir_path: str) -> FileOpResult:
253 """创建目录。"""
254 path = os.path.abspath(os.path.expanduser(dir_path))
255 sandbox_paths = ["/tmp/agentos/", "/home/marvis/Marvis/"]
256 needs_full = not any(path.startswith(sp) for sp in sandbox_paths)
257 tier = PermissionTier.WRITE_ALL if needs_full else PermissionTier.WRITE_SANDBOX
259 try:
260 self._pm.require(self._sid, tier, path)
261 except PermissionDenied as e:
262 return FileOpResult(success=False, action="mkdir", path=path, error=str(e))
264 try:
265 os.makedirs(path, exist_ok=True)
266 return FileOpResult(success=True, action="mkdir", path=path)
267 except Exception as e:
268 return FileOpResult(success=False, action="mkdir", path=path, error=str(e))
270 # ── 文件信息 ──
272 def stat(self, file_path: str) -> FileOpResult:
273 """获取文件/目录详细信息。"""
274 path = os.path.abspath(os.path.expanduser(file_path))
275 try:
276 self._pm.require(self._sid, PermissionTier.READ, path)
277 except PermissionDenied as e:
278 return FileOpResult(success=False, action="read", path=path, error=str(e))
280 try:
281 st = os.stat(path)
282 info_lines = [
283 f"路径: {path}",
284 f"类型: {'目录' if os.path.isdir(path) else '文件'}",
285 f"大小: {self._format_size(st.st_size)}",
286 f"权限: {oct(st.st_mode)[-3:]}",
287 f"修改时间: {datetime.fromtimestamp(st.st_mtime).isoformat()}",
288 f"创建时间: {datetime.fromtimestamp(st.st_ctime).isoformat()}",
289 f"inode: {st.st_ino}",
290 ]
291 return FileOpResult(success=True, action="read", path=path, content="\n".join(info_lines))
292 except Exception as e:
293 return FileOpResult(success=False, action="read", path=path, error=str(e))
295 # ── 工具 ──
297 @staticmethod
298 def _format_size(size: int) -> str:
299 for unit in ["B", "KB", "MB", "GB", "TB"]:
300 if size < 1024:
301 return f"{size:.1f} {unit}"
302 size /= 1024
303 return f"{size:.1f} PB"