Coverage for src / apcore_cli / security / sandbox.py: 95%

58 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-26 10:23 +0800

1"""Subprocess-based execution sandboxing (FE-05).""" 

2 

3from __future__ import annotations 

4 

5import json 

6import os 

7import subprocess 

8import sys 

9import tempfile 

10from pathlib import Path 

11from typing import TYPE_CHECKING, Any 

12 

13if TYPE_CHECKING: 

14 from apcore import Executor 

15 

16 

17class ModuleExecutionError(Exception): 

18 """Raised when a sandboxed module execution fails.""" 

19 

20 pass 

21 

22 

23class CliModuleNotFoundError(Exception): 

24 """Raised when a module ID is not found in the registry (exit 44). 

25 

26 Renamed from ``ModuleNotFoundError`` in v0.7.x (audit D2-001) to avoid 

27 shadowing :class:`builtins.ModuleNotFoundError` — the Python interpreter 

28 raises that builtin as part of the import-system contract, and a 

29 re-exported same-named class made ``from apcore_cli import *`` clobber 

30 the language-defined exception in the calling namespace. 

31 

32 Equivalent to TypeScript's ``ModuleNotFoundError`` and Rust's 

33 ``DiscoveryError::ModuleNotFound``. Cross-language naming is asymmetric 

34 by design: TS and Rust have no built-in collision and keep the short 

35 name. 

36 """ 

37 

38 pass 

39 

40 

41class SchemaValidationError(Exception): 

42 """Raised when JSON schema validation fails (exit 45). 

43 

44 Equivalent to TypeScript's ``SchemaValidationError`` and Rust's 

45 ``RefResolverError::Unresolvable``. 

46 """ 

47 

48 pass 

49 

50 

51# Env forwarding strategy (mirrors Rust spec §4.4 and apcore-cli/docs/features/security.md): 

52# Allow: PATH, LANG, LC_ALL + all APCORE_* vars. 

53# PYTHONPATH is intentionally excluded (D10-010) — it must not cross the sandbox boundary. 

54# Deny prefix: APCORE_AUTH_ — credentials must not cross the sandbox trust boundary. 

55_SANDBOX_ALLOW_KEYS = ("PATH", "LANG", "LC_ALL") 

56_SANDBOX_ALLOW_PREFIX = "APCORE_" 

57_SANDBOX_DENY_PREFIX = "APCORE_AUTH_" 

58_SANDBOX_DENY_KEYS: frozenset[str] = frozenset({"APCORE_AUTH_API_KEY"}) 

59 

60 

61class Sandbox: 

62 """Subprocess-isolated module execution. 

63 

64 Audit D1-005 parity (v0.6.x): the `timeout_seconds` parameter mirrors 

65 the Rust `Sandbox::new(enabled, timeout_ms)` API. When `enabled=False`, 

66 `execute()` is a passthrough to the injected apcore Executor. 

67 """ 

68 

69 def __init__( 

70 self, 

71 enabled: bool = False, 

72 timeout_seconds: int = 300, 

73 extensions_root: str | None = None, 

74 max_output_bytes: int = 64 * 1024 * 1024, 

75 ) -> None: 

76 self._enabled = enabled 

77 self._timeout_seconds = timeout_seconds 

78 self._extensions_root = extensions_root 

79 self._max_output_bytes = max_output_bytes 

80 

81 def execute(self, module_id: str, input_data: dict, executor: Executor) -> Any: 

82 if not self._enabled: 

83 return executor.call(module_id, input_data) 

84 return self._sandboxed_execute(module_id, input_data) 

85 

86 def _sandboxed_execute(self, module_id: str, input_data: dict) -> Any: 

87 env: dict[str, str] = {} 

88 for key in _SANDBOX_ALLOW_KEYS: 

89 if key in os.environ: 

90 env[key] = os.environ[key] 

91 # Forward all APCORE_* vars except the APCORE_AUTH_* deny prefix and 

92 # any explicit deny-listed key (D11-002 — defense-in-depth parity with 

93 # the TS and Rust sandboxes, which both apply prefix + explicit-key 

94 # filtering). The explicit deny set covers entries that match the 

95 # allow prefix but should never cross the sandbox trust boundary. 

96 for key, val in os.environ.items(): 

97 if ( 

98 key.startswith(_SANDBOX_ALLOW_PREFIX) 

99 and not key.startswith(_SANDBOX_DENY_PREFIX) 

100 and key not in _SANDBOX_DENY_KEYS 

101 ): 

102 env[key] = val 

103 

104 # Inject extensions root as an absolute path so the runner locates 

105 # modules correctly even when cwd is changed to the sandbox tempdir. 

106 if self._extensions_root is not None: 

107 env["APCORE_EXTENSIONS_ROOT"] = str(Path(self._extensions_root).resolve()) 

108 elif "APCORE_EXTENSIONS_ROOT" in env: 

109 env["APCORE_EXTENSIONS_ROOT"] = str(Path(env["APCORE_EXTENSIONS_ROOT"]).resolve()) 

110 

111 with tempfile.TemporaryDirectory(prefix="apcore_sandbox_") as tmpdir: 

112 env["HOME"] = tmpdir 

113 env["TMPDIR"] = tmpdir 

114 

115 try: 

116 result = subprocess.run( 

117 [sys.executable, "-m", "apcore_cli._sandbox_runner", module_id], 

118 input=json.dumps(input_data), 

119 capture_output=True, 

120 text=True, 

121 env=env, 

122 cwd=tmpdir, 

123 timeout=self._timeout_seconds, 

124 ) 

125 except subprocess.TimeoutExpired as err: 

126 raise ModuleExecutionError(f"Error: Module '{module_id}' timed out in sandbox.") from err 

127 

128 # Enforce output size cap (post-capture soft limit; for a hard cap 

129 # that prevents memory accumulation, use Popen-based streaming). 

130 total_bytes = len(result.stdout.encode()) + len(result.stderr.encode()) 

131 if total_bytes > self._max_output_bytes: 

132 limit_mb = self._max_output_bytes // (1024 * 1024) 

133 raise ModuleExecutionError( 

134 f"Error: Module '{module_id}' output exceeded the {limit_mb}MB sandbox limit." 

135 ) 

136 

137 if result.returncode != 0: 

138 raise ModuleExecutionError(f"Error: Module '{module_id}' execution failed: {result.stderr}") 

139 

140 try: 

141 return json.loads(result.stdout) 

142 except json.JSONDecodeError as err: 

143 preview = result.stdout[:200] 

144 raise ModuleExecutionError(f"Error: Module '{module_id}' returned non-JSON output: {preview}") from err