Coverage for src / tracekit / plugins / isolation.py: 90%

137 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Plugin isolation and sandboxing. 

2 

3This module provides resource limits, permission models, and sandboxing 

4for plugins to prevent interference and ensure security. 

5""" 

6 

7from __future__ import annotations 

8 

9import logging 

10import resource 

11import threading 

12from contextlib import contextmanager, suppress 

13from dataclasses import dataclass, field 

14from enum import Enum, auto 

15from typing import TYPE_CHECKING 

16 

17if TYPE_CHECKING: 

18 from collections.abc import Generator 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23class Permission(Enum): 

24 """Plugin permission types. 

25 

26 References: 

27 PLUG-004: Plugin Isolation - permission model 

28 """ 

29 

30 READ_CONFIG = auto() 

31 """Read configuration files""" 

32 WRITE_CONFIG = auto() 

33 """Write configuration files""" 

34 READ_DATA = auto() 

35 """Read data files""" 

36 WRITE_DATA = auto() 

37 """Write data files""" 

38 NETWORK_ACCESS = auto() 

39 """Access network""" 

40 SUBPROCESS = auto() 

41 """Spawn subprocesses""" 

42 NATIVE_CODE = auto() 

43 """Execute native code""" 

44 SYSTEM_INFO = auto() 

45 """Access system information""" 

46 

47 

48@dataclass 

49class ResourceLimits: 

50 """Resource limits for plugin execution. 

51 

52 Attributes: 

53 max_memory_mb: Maximum memory in MB (None for unlimited) 

54 max_cpu_time_sec: Maximum CPU time in seconds (None for unlimited) 

55 max_wall_time_sec: Maximum wall time in seconds (None for unlimited) 

56 max_file_size_mb: Maximum file size in MB (None for unlimited) 

57 max_open_files: Maximum open file descriptors (None for unlimited) 

58 

59 References: 

60 PLUG-004: Plugin Isolation - resource limits 

61 """ 

62 

63 max_memory_mb: int | None = None 

64 max_cpu_time_sec: float | None = None 

65 max_wall_time_sec: float | None = None 

66 max_file_size_mb: int | None = None 

67 max_open_files: int | None = None 

68 

69 def to_rlimit_dict(self) -> dict[int, tuple[int, int]]: 

70 """Convert to resource.setrlimit compatible dict. 

71 

72 Returns: 

73 Dictionary of resource limits 

74 

75 References: 

76 PLUG-004: Plugin Isolation - resource limits 

77 """ 

78 limits = {} 

79 

80 if self.max_memory_mb is not None: 

81 # RLIMIT_AS - address space limit 

82 limit_bytes = self.max_memory_mb * 1024 * 1024 

83 limits[resource.RLIMIT_AS] = (limit_bytes, limit_bytes) 

84 

85 if self.max_cpu_time_sec is not None: 

86 # RLIMIT_CPU - CPU time limit 

87 limit_sec = int(self.max_cpu_time_sec) 

88 limits[resource.RLIMIT_CPU] = (limit_sec, limit_sec) 

89 

90 if self.max_file_size_mb is not None: 

91 # RLIMIT_FSIZE - file size limit 

92 limit_bytes = self.max_file_size_mb * 1024 * 1024 

93 limits[resource.RLIMIT_FSIZE] = (limit_bytes, limit_bytes) 

94 

95 if self.max_open_files is not None: 

96 # RLIMIT_NOFILE - number of open files 

97 limits[resource.RLIMIT_NOFILE] = ( 

98 self.max_open_files, 

99 self.max_open_files, 

100 ) 

101 

102 return limits 

103 

104 

105@dataclass 

106class PermissionSet: 

107 """Set of permissions granted to a plugin. 

108 

109 Attributes: 

110 allowed: Set of allowed permissions 

111 denied: Set of explicitly denied permissions 

112 

113 References: 

114 PLUG-004: Plugin Isolation - permission model 

115 """ 

116 

117 allowed: set[Permission] = field(default_factory=set) 

118 denied: set[Permission] = field(default_factory=set) 

119 

120 def grant(self, permission: Permission) -> None: 

121 """Grant a permission. 

122 

123 Args: 

124 permission: Permission to grant 

125 

126 References: 

127 PLUG-004: Plugin Isolation - permission model 

128 """ 

129 self.allowed.add(permission) 

130 if permission in self.denied: 

131 self.denied.remove(permission) 

132 

133 def deny(self, permission: Permission) -> None: 

134 """Deny a permission. 

135 

136 Args: 

137 permission: Permission to deny 

138 

139 References: 

140 PLUG-004: Plugin Isolation - permission model 

141 """ 

142 self.denied.add(permission) 

143 if permission in self.allowed: 

144 self.allowed.remove(permission) 

145 

146 def has_permission(self, permission: Permission) -> bool: 

147 """Check if permission is granted. 

148 

149 Args: 

150 permission: Permission to check 

151 

152 Returns: 

153 True if permission is granted 

154 

155 References: 

156 PLUG-004: Plugin Isolation - permission model 

157 """ 

158 if permission in self.denied: 

159 return False 

160 return permission in self.allowed 

161 

162 def check(self, permission: Permission) -> None: 

163 """Check permission and raise if not granted. 

164 

165 Args: 

166 permission: Permission to check 

167 

168 Raises: 

169 PermissionError: If permission not granted 

170 

171 References: 

172 PLUG-004: Plugin Isolation - permission model 

173 """ 

174 if not self.has_permission(permission): 

175 raise PermissionError(f"Plugin does not have {permission.name} permission") 

176 

177 

178class TimeoutError(Exception): 

179 """Exception raised when execution timeout is exceeded.""" 

180 

181 pass 

182 

183 

184class ResourceExceededError(Exception): 

185 """Exception raised when resource limit is exceeded.""" 

186 

187 pass 

188 

189 

190class PluginSandbox: 

191 """Sandbox for isolated plugin execution. 

192 

193 Provides resource limits, timeout enforcement, and permission checking. 

194 

195 References: 

196 PLUG-004: Plugin Isolation - resource limits, permission model 

197 """ 

198 

199 def __init__( 

200 self, 

201 permissions: PermissionSet | None = None, 

202 limits: ResourceLimits | None = None, 

203 ) -> None: 

204 """Initialize sandbox. 

205 

206 Args: 

207 permissions: Permission set for plugin 

208 limits: Resource limits for plugin 

209 

210 References: 

211 PLUG-004: Plugin Isolation 

212 """ 

213 self.permissions = permissions or PermissionSet() 

214 self.limits = limits or ResourceLimits() 

215 self._original_limits: dict[int, tuple[int, int]] = {} 

216 

217 @contextmanager 

218 def execute( 

219 self, 

220 timeout: float | None = None, 

221 ) -> Generator[None, None, None]: 

222 """Execute code in sandbox with resource limits. 

223 

224 Args: 

225 timeout: Execution timeout in seconds 

226 

227 Yields: 

228 None 

229 

230 Raises: 

231 ResourceExceededError: If resource limit exceeded 

232 

233 Note: 

234 Timeout handling is logged but does not raise TimeoutError. 

235 Future versions will add proper timeout interruption. 

236 

237 References: 

238 PLUG-004: Plugin Isolation - resource limits 

239 

240 Example: 

241 >>> sandbox = PluginSandbox(limits=ResourceLimits(max_memory_mb=100)) 

242 >>> with sandbox.execute(timeout=5.0): 

243 ... # Plugin code runs here with limits 

244 ... result = plugin.process_data(data) 

245 """ 

246 # Apply resource limits 

247 self._apply_limits() 

248 

249 # Setup timeout if specified 

250 timer = None 

251 if timeout is not None: 

252 try: 

253 timer = threading.Timer(timeout, self._timeout_handler) 

254 timer.start() 

255 except RuntimeError: 

256 # Thread limit reached, skip timeout 

257 logger.warning("Could not create timeout timer (thread limit reached)") 

258 timer = None 

259 

260 try: 

261 yield 

262 

263 except MemoryError as e: 

264 raise ResourceExceededError("Memory limit exceeded") from e 

265 

266 finally: 

267 # Cancel timeout timer 

268 if timer is not None: 

269 with suppress(Exception): 

270 timer.cancel() 

271 

272 # Restore original limits 

273 self._restore_limits() 

274 

275 def _apply_limits(self) -> None: 

276 """Apply resource limits. 

277 

278 References: 

279 PLUG-004: Plugin Isolation - resource limits 

280 """ 

281 rlimits = self.limits.to_rlimit_dict() 

282 

283 for resource_type, limit in rlimits.items(): 283 ↛ 284line 283 didn't jump to line 284 because the loop on line 283 never started

284 try: 

285 # Save original limit 

286 self._original_limits[resource_type] = resource.getrlimit(resource_type) 

287 # Apply new limit 

288 resource.setrlimit(resource_type, limit) 

289 logger.debug(f"Applied resource limit: {resource_type} = {limit}") 

290 

291 except (OSError, ValueError) as e: 

292 logger.warning(f"Failed to set resource limit {resource_type}: {e}") 

293 

294 def _restore_limits(self) -> None: 

295 """Restore original resource limits. 

296 

297 References: 

298 PLUG-004: Plugin Isolation - resource limits 

299 """ 

300 for resource_type, limit in self._original_limits.items(): 300 ↛ 301line 300 didn't jump to line 301 because the loop on line 300 never started

301 try: 

302 resource.setrlimit(resource_type, limit) 

303 except (OSError, ValueError) as e: 

304 logger.warning(f"Failed to restore resource limit {resource_type}: {e}") 

305 

306 self._original_limits.clear() 

307 

308 def _timeout_handler(self) -> None: 

309 """Handle execution timeout. 

310 

311 References: 

312 PLUG-004: Plugin Isolation - CPU time limit 

313 """ 

314 logger.error("Plugin execution timeout exceeded") 

315 # In a real implementation, this would interrupt the thread 

316 # For now, we just log the error 

317 

318 def check_permission(self, permission: Permission) -> None: 

319 """Check if plugin has permission. 

320 

321 Args: 

322 permission: Permission to check 

323 

324 References: 

325 PLUG-004: Plugin Isolation - permission model 

326 """ 

327 self.permissions.check(permission) 

328 

329 

330class IsolationManager: 

331 """Manager for plugin isolation and sandboxing. 

332 

333 Tracks sandboxes for all plugins and enforces isolation policies. 

334 

335 References: 

336 PLUG-004: Plugin Isolation 

337 """ 

338 

339 def __init__(self) -> None: 

340 """Initialize isolation manager.""" 

341 self._sandboxes: dict[str, PluginSandbox] = {} 

342 self._default_limits = ResourceLimits( 

343 max_memory_mb=512, # 512 MB default 

344 max_cpu_time_sec=30.0, # 30 seconds default 

345 ) 

346 

347 def create_sandbox( 

348 self, 

349 plugin_name: str, 

350 permissions: PermissionSet | None = None, 

351 limits: ResourceLimits | None = None, 

352 ) -> PluginSandbox: 

353 """Create sandbox for a plugin. 

354 

355 Args: 

356 plugin_name: Plugin name 

357 permissions: Permission set (None for default) 

358 limits: Resource limits (None for default) 

359 

360 Returns: 

361 Plugin sandbox 

362 

363 References: 

364 PLUG-004: Plugin Isolation 

365 """ 

366 if limits is None: 

367 limits = self._default_limits 

368 

369 sandbox = PluginSandbox(permissions=permissions, limits=limits) 

370 self._sandboxes[plugin_name] = sandbox 

371 

372 logger.info(f"Created sandbox for plugin '{plugin_name}'") 

373 return sandbox 

374 

375 def get_sandbox(self, plugin_name: str) -> PluginSandbox | None: 

376 """Get sandbox for a plugin. 

377 

378 Args: 

379 plugin_name: Plugin name 

380 

381 Returns: 

382 Plugin sandbox or None 

383 """ 

384 return self._sandboxes.get(plugin_name) 

385 

386 def remove_sandbox(self, plugin_name: str) -> None: 

387 """Remove sandbox for a plugin. 

388 

389 Args: 

390 plugin_name: Plugin name 

391 """ 

392 if plugin_name in self._sandboxes: 

393 del self._sandboxes[plugin_name] 

394 logger.info(f"Removed sandbox for plugin '{plugin_name}'") 

395 

396 

397# Global isolation manager 

398_isolation_manager: IsolationManager | None = None 

399 

400 

401def get_isolation_manager() -> IsolationManager: 

402 """Get global isolation manager. 

403 

404 Returns: 

405 Global IsolationManager instance 

406 """ 

407 global _isolation_manager 

408 if _isolation_manager is None: 

409 _isolation_manager = IsolationManager() 

410 return _isolation_manager 

411 

412 

413__all__ = [ 

414 "IsolationManager", 

415 "Permission", 

416 "PermissionSet", 

417 "PluginSandbox", 

418 "ResourceExceededError", 

419 "ResourceLimits", 

420 "TimeoutError", 

421 "get_isolation_manager", 

422]