Coverage for src / tracekit / plugins / isolation.py: 90%
137 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Plugin isolation and sandboxing.
3This module provides resource limits, permission models, and sandboxing
4for plugins to prevent interference and ensure security.
5"""
7from __future__ import annotations
9import logging
10import resource
11import threading
12from contextlib import contextmanager, suppress
13from dataclasses import dataclass, field
14from enum import Enum, auto
15from typing import TYPE_CHECKING
17if TYPE_CHECKING:
18 from collections.abc import Generator
20logger = logging.getLogger(__name__)
23class Permission(Enum):
24 """Plugin permission types.
26 References:
27 PLUG-004: Plugin Isolation - permission model
28 """
30 READ_CONFIG = auto()
31 """Read configuration files"""
32 WRITE_CONFIG = auto()
33 """Write configuration files"""
34 READ_DATA = auto()
35 """Read data files"""
36 WRITE_DATA = auto()
37 """Write data files"""
38 NETWORK_ACCESS = auto()
39 """Access network"""
40 SUBPROCESS = auto()
41 """Spawn subprocesses"""
42 NATIVE_CODE = auto()
43 """Execute native code"""
44 SYSTEM_INFO = auto()
45 """Access system information"""
48@dataclass
49class ResourceLimits:
50 """Resource limits for plugin execution.
52 Attributes:
53 max_memory_mb: Maximum memory in MB (None for unlimited)
54 max_cpu_time_sec: Maximum CPU time in seconds (None for unlimited)
55 max_wall_time_sec: Maximum wall time in seconds (None for unlimited)
56 max_file_size_mb: Maximum file size in MB (None for unlimited)
57 max_open_files: Maximum open file descriptors (None for unlimited)
59 References:
60 PLUG-004: Plugin Isolation - resource limits
61 """
63 max_memory_mb: int | None = None
64 max_cpu_time_sec: float | None = None
65 max_wall_time_sec: float | None = None
66 max_file_size_mb: int | None = None
67 max_open_files: int | None = None
69 def to_rlimit_dict(self) -> dict[int, tuple[int, int]]:
70 """Convert to resource.setrlimit compatible dict.
72 Returns:
73 Dictionary of resource limits
75 References:
76 PLUG-004: Plugin Isolation - resource limits
77 """
78 limits = {}
80 if self.max_memory_mb is not None:
81 # RLIMIT_AS - address space limit
82 limit_bytes = self.max_memory_mb * 1024 * 1024
83 limits[resource.RLIMIT_AS] = (limit_bytes, limit_bytes)
85 if self.max_cpu_time_sec is not None:
86 # RLIMIT_CPU - CPU time limit
87 limit_sec = int(self.max_cpu_time_sec)
88 limits[resource.RLIMIT_CPU] = (limit_sec, limit_sec)
90 if self.max_file_size_mb is not None:
91 # RLIMIT_FSIZE - file size limit
92 limit_bytes = self.max_file_size_mb * 1024 * 1024
93 limits[resource.RLIMIT_FSIZE] = (limit_bytes, limit_bytes)
95 if self.max_open_files is not None:
96 # RLIMIT_NOFILE - number of open files
97 limits[resource.RLIMIT_NOFILE] = (
98 self.max_open_files,
99 self.max_open_files,
100 )
102 return limits
105@dataclass
106class PermissionSet:
107 """Set of permissions granted to a plugin.
109 Attributes:
110 allowed: Set of allowed permissions
111 denied: Set of explicitly denied permissions
113 References:
114 PLUG-004: Plugin Isolation - permission model
115 """
117 allowed: set[Permission] = field(default_factory=set)
118 denied: set[Permission] = field(default_factory=set)
120 def grant(self, permission: Permission) -> None:
121 """Grant a permission.
123 Args:
124 permission: Permission to grant
126 References:
127 PLUG-004: Plugin Isolation - permission model
128 """
129 self.allowed.add(permission)
130 if permission in self.denied:
131 self.denied.remove(permission)
133 def deny(self, permission: Permission) -> None:
134 """Deny a permission.
136 Args:
137 permission: Permission to deny
139 References:
140 PLUG-004: Plugin Isolation - permission model
141 """
142 self.denied.add(permission)
143 if permission in self.allowed:
144 self.allowed.remove(permission)
146 def has_permission(self, permission: Permission) -> bool:
147 """Check if permission is granted.
149 Args:
150 permission: Permission to check
152 Returns:
153 True if permission is granted
155 References:
156 PLUG-004: Plugin Isolation - permission model
157 """
158 if permission in self.denied:
159 return False
160 return permission in self.allowed
162 def check(self, permission: Permission) -> None:
163 """Check permission and raise if not granted.
165 Args:
166 permission: Permission to check
168 Raises:
169 PermissionError: If permission not granted
171 References:
172 PLUG-004: Plugin Isolation - permission model
173 """
174 if not self.has_permission(permission):
175 raise PermissionError(f"Plugin does not have {permission.name} permission")
178class TimeoutError(Exception):
179 """Exception raised when execution timeout is exceeded."""
181 pass
184class ResourceExceededError(Exception):
185 """Exception raised when resource limit is exceeded."""
187 pass
190class PluginSandbox:
191 """Sandbox for isolated plugin execution.
193 Provides resource limits, timeout enforcement, and permission checking.
195 References:
196 PLUG-004: Plugin Isolation - resource limits, permission model
197 """
199 def __init__(
200 self,
201 permissions: PermissionSet | None = None,
202 limits: ResourceLimits | None = None,
203 ) -> None:
204 """Initialize sandbox.
206 Args:
207 permissions: Permission set for plugin
208 limits: Resource limits for plugin
210 References:
211 PLUG-004: Plugin Isolation
212 """
213 self.permissions = permissions or PermissionSet()
214 self.limits = limits or ResourceLimits()
215 self._original_limits: dict[int, tuple[int, int]] = {}
217 @contextmanager
218 def execute(
219 self,
220 timeout: float | None = None,
221 ) -> Generator[None, None, None]:
222 """Execute code in sandbox with resource limits.
224 Args:
225 timeout: Execution timeout in seconds
227 Yields:
228 None
230 Raises:
231 ResourceExceededError: If resource limit exceeded
233 Note:
234 Timeout handling is logged but does not raise TimeoutError.
235 Future versions will add proper timeout interruption.
237 References:
238 PLUG-004: Plugin Isolation - resource limits
240 Example:
241 >>> sandbox = PluginSandbox(limits=ResourceLimits(max_memory_mb=100))
242 >>> with sandbox.execute(timeout=5.0):
243 ... # Plugin code runs here with limits
244 ... result = plugin.process_data(data)
245 """
246 # Apply resource limits
247 self._apply_limits()
249 # Setup timeout if specified
250 timer = None
251 if timeout is not None:
252 try:
253 timer = threading.Timer(timeout, self._timeout_handler)
254 timer.start()
255 except RuntimeError:
256 # Thread limit reached, skip timeout
257 logger.warning("Could not create timeout timer (thread limit reached)")
258 timer = None
260 try:
261 yield
263 except MemoryError as e:
264 raise ResourceExceededError("Memory limit exceeded") from e
266 finally:
267 # Cancel timeout timer
268 if timer is not None:
269 with suppress(Exception):
270 timer.cancel()
272 # Restore original limits
273 self._restore_limits()
275 def _apply_limits(self) -> None:
276 """Apply resource limits.
278 References:
279 PLUG-004: Plugin Isolation - resource limits
280 """
281 rlimits = self.limits.to_rlimit_dict()
283 for resource_type, limit in rlimits.items(): 283 ↛ 284line 283 didn't jump to line 284 because the loop on line 283 never started
284 try:
285 # Save original limit
286 self._original_limits[resource_type] = resource.getrlimit(resource_type)
287 # Apply new limit
288 resource.setrlimit(resource_type, limit)
289 logger.debug(f"Applied resource limit: {resource_type} = {limit}")
291 except (OSError, ValueError) as e:
292 logger.warning(f"Failed to set resource limit {resource_type}: {e}")
294 def _restore_limits(self) -> None:
295 """Restore original resource limits.
297 References:
298 PLUG-004: Plugin Isolation - resource limits
299 """
300 for resource_type, limit in self._original_limits.items(): 300 ↛ 301line 300 didn't jump to line 301 because the loop on line 300 never started
301 try:
302 resource.setrlimit(resource_type, limit)
303 except (OSError, ValueError) as e:
304 logger.warning(f"Failed to restore resource limit {resource_type}: {e}")
306 self._original_limits.clear()
308 def _timeout_handler(self) -> None:
309 """Handle execution timeout.
311 References:
312 PLUG-004: Plugin Isolation - CPU time limit
313 """
314 logger.error("Plugin execution timeout exceeded")
315 # In a real implementation, this would interrupt the thread
316 # For now, we just log the error
318 def check_permission(self, permission: Permission) -> None:
319 """Check if plugin has permission.
321 Args:
322 permission: Permission to check
324 References:
325 PLUG-004: Plugin Isolation - permission model
326 """
327 self.permissions.check(permission)
330class IsolationManager:
331 """Manager for plugin isolation and sandboxing.
333 Tracks sandboxes for all plugins and enforces isolation policies.
335 References:
336 PLUG-004: Plugin Isolation
337 """
339 def __init__(self) -> None:
340 """Initialize isolation manager."""
341 self._sandboxes: dict[str, PluginSandbox] = {}
342 self._default_limits = ResourceLimits(
343 max_memory_mb=512, # 512 MB default
344 max_cpu_time_sec=30.0, # 30 seconds default
345 )
347 def create_sandbox(
348 self,
349 plugin_name: str,
350 permissions: PermissionSet | None = None,
351 limits: ResourceLimits | None = None,
352 ) -> PluginSandbox:
353 """Create sandbox for a plugin.
355 Args:
356 plugin_name: Plugin name
357 permissions: Permission set (None for default)
358 limits: Resource limits (None for default)
360 Returns:
361 Plugin sandbox
363 References:
364 PLUG-004: Plugin Isolation
365 """
366 if limits is None:
367 limits = self._default_limits
369 sandbox = PluginSandbox(permissions=permissions, limits=limits)
370 self._sandboxes[plugin_name] = sandbox
372 logger.info(f"Created sandbox for plugin '{plugin_name}'")
373 return sandbox
375 def get_sandbox(self, plugin_name: str) -> PluginSandbox | None:
376 """Get sandbox for a plugin.
378 Args:
379 plugin_name: Plugin name
381 Returns:
382 Plugin sandbox or None
383 """
384 return self._sandboxes.get(plugin_name)
386 def remove_sandbox(self, plugin_name: str) -> None:
387 """Remove sandbox for a plugin.
389 Args:
390 plugin_name: Plugin name
391 """
392 if plugin_name in self._sandboxes:
393 del self._sandboxes[plugin_name]
394 logger.info(f"Removed sandbox for plugin '{plugin_name}'")
397# Global isolation manager
398_isolation_manager: IsolationManager | None = None
401def get_isolation_manager() -> IsolationManager:
402 """Get global isolation manager.
404 Returns:
405 Global IsolationManager instance
406 """
407 global _isolation_manager
408 if _isolation_manager is None:
409 _isolation_manager = IsolationManager()
410 return _isolation_manager
413__all__ = [
414 "IsolationManager",
415 "Permission",
416 "PermissionSet",
417 "PluginSandbox",
418 "ResourceExceededError",
419 "ResourceLimits",
420 "TimeoutError",
421 "get_isolation_manager",
422]