Coverage for src / apcore_cli / security / sandbox.py: 95%
58 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-26 10:23 +0800
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-26 10:23 +0800
1"""Subprocess-based execution sandboxing (FE-05)."""
3from __future__ import annotations
5import json
6import os
7import subprocess
8import sys
9import tempfile
10from pathlib import Path
11from typing import TYPE_CHECKING, Any
13if TYPE_CHECKING:
14 from apcore import Executor
17class ModuleExecutionError(Exception):
18 """Raised when a sandboxed module execution fails."""
20 pass
23class CliModuleNotFoundError(Exception):
24 """Raised when a module ID is not found in the registry (exit 44).
26 Renamed from ``ModuleNotFoundError`` in v0.7.x (audit D2-001) to avoid
27 shadowing :class:`builtins.ModuleNotFoundError` — the Python interpreter
28 raises that builtin as part of the import-system contract, and a
29 re-exported same-named class made ``from apcore_cli import *`` clobber
30 the language-defined exception in the calling namespace.
32 Equivalent to TypeScript's ``ModuleNotFoundError`` and Rust's
33 ``DiscoveryError::ModuleNotFound``. Cross-language naming is asymmetric
34 by design: TS and Rust have no built-in collision and keep the short
35 name.
36 """
38 pass
41class SchemaValidationError(Exception):
42 """Raised when JSON schema validation fails (exit 45).
44 Equivalent to TypeScript's ``SchemaValidationError`` and Rust's
45 ``RefResolverError::Unresolvable``.
46 """
48 pass
51# Env forwarding strategy (mirrors Rust spec §4.4 and apcore-cli/docs/features/security.md):
52# Allow: PATH, LANG, LC_ALL + all APCORE_* vars.
53# PYTHONPATH is intentionally excluded (D10-010) — it must not cross the sandbox boundary.
54# Deny prefix: APCORE_AUTH_ — credentials must not cross the sandbox trust boundary.
55_SANDBOX_ALLOW_KEYS = ("PATH", "LANG", "LC_ALL")
56_SANDBOX_ALLOW_PREFIX = "APCORE_"
57_SANDBOX_DENY_PREFIX = "APCORE_AUTH_"
58_SANDBOX_DENY_KEYS: frozenset[str] = frozenset({"APCORE_AUTH_API_KEY"})
61class Sandbox:
62 """Subprocess-isolated module execution.
64 Audit D1-005 parity (v0.6.x): the `timeout_seconds` parameter mirrors
65 the Rust `Sandbox::new(enabled, timeout_ms)` API. When `enabled=False`,
66 `execute()` is a passthrough to the injected apcore Executor.
67 """
69 def __init__(
70 self,
71 enabled: bool = False,
72 timeout_seconds: int = 300,
73 extensions_root: str | None = None,
74 max_output_bytes: int = 64 * 1024 * 1024,
75 ) -> None:
76 self._enabled = enabled
77 self._timeout_seconds = timeout_seconds
78 self._extensions_root = extensions_root
79 self._max_output_bytes = max_output_bytes
81 def execute(self, module_id: str, input_data: dict, executor: Executor) -> Any:
82 if not self._enabled:
83 return executor.call(module_id, input_data)
84 return self._sandboxed_execute(module_id, input_data)
86 def _sandboxed_execute(self, module_id: str, input_data: dict) -> Any:
87 env: dict[str, str] = {}
88 for key in _SANDBOX_ALLOW_KEYS:
89 if key in os.environ:
90 env[key] = os.environ[key]
91 # Forward all APCORE_* vars except the APCORE_AUTH_* deny prefix and
92 # any explicit deny-listed key (D11-002 — defense-in-depth parity with
93 # the TS and Rust sandboxes, which both apply prefix + explicit-key
94 # filtering). The explicit deny set covers entries that match the
95 # allow prefix but should never cross the sandbox trust boundary.
96 for key, val in os.environ.items():
97 if (
98 key.startswith(_SANDBOX_ALLOW_PREFIX)
99 and not key.startswith(_SANDBOX_DENY_PREFIX)
100 and key not in _SANDBOX_DENY_KEYS
101 ):
102 env[key] = val
104 # Inject extensions root as an absolute path so the runner locates
105 # modules correctly even when cwd is changed to the sandbox tempdir.
106 if self._extensions_root is not None:
107 env["APCORE_EXTENSIONS_ROOT"] = str(Path(self._extensions_root).resolve())
108 elif "APCORE_EXTENSIONS_ROOT" in env:
109 env["APCORE_EXTENSIONS_ROOT"] = str(Path(env["APCORE_EXTENSIONS_ROOT"]).resolve())
111 with tempfile.TemporaryDirectory(prefix="apcore_sandbox_") as tmpdir:
112 env["HOME"] = tmpdir
113 env["TMPDIR"] = tmpdir
115 try:
116 result = subprocess.run(
117 [sys.executable, "-m", "apcore_cli._sandbox_runner", module_id],
118 input=json.dumps(input_data),
119 capture_output=True,
120 text=True,
121 env=env,
122 cwd=tmpdir,
123 timeout=self._timeout_seconds,
124 )
125 except subprocess.TimeoutExpired as err:
126 raise ModuleExecutionError(f"Error: Module '{module_id}' timed out in sandbox.") from err
128 # Enforce output size cap (post-capture soft limit; for a hard cap
129 # that prevents memory accumulation, use Popen-based streaming).
130 total_bytes = len(result.stdout.encode()) + len(result.stderr.encode())
131 if total_bytes > self._max_output_bytes:
132 limit_mb = self._max_output_bytes // (1024 * 1024)
133 raise ModuleExecutionError(
134 f"Error: Module '{module_id}' output exceeded the {limit_mb}MB sandbox limit."
135 )
137 if result.returncode != 0:
138 raise ModuleExecutionError(f"Error: Module '{module_id}' execution failed: {result.stderr}")
140 try:
141 return json.loads(result.stdout)
142 except json.JSONDecodeError as err:
143 preview = result.stdout[:200]
144 raise ModuleExecutionError(f"Error: Module '{module_id}' returned non-JSON output: {preview}") from err