Metadata-Version: 2.4
Name: cjm-substrate
Version: 0.0.51
Summary: A dependency-isolated capability-composition runtime: heterogeneous tools run in their own environments behind a uniform HTTP/JSON boundary; the host composes them into workflows with resource-aware scheduling and threads provenance through a context graph.
Author-email: "Christian J. Mills" <9126128+cj-mills@users.noreply.github.com>
License: Apache-2.0
Project-URL: Repository, https://github.com/cj-mills/cjm-substrate
Project-URL: Documentation, https://cj-mills.github.io/cjm-substrate
Keywords: nbdev,jupyter,notebook,python
Classifier: Natural Language :: English
Classifier: Intended Audience :: Developers
Classifier: Development Status :: 3 - Alpha
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3 :: Only
Requires-Python: >=3.12
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: fastapi
Requires-Dist: fastcore
Requires-Dist: psutil
Requires-Dist: uvicorn
Requires-Dist: httpx
Requires-Dist: typer
Requires-Dist: pyyaml
Dynamic: license-file

# cjm-substrate


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

## Install

``` bash
pip install cjm_substrate
```

## Project Structure

    nbs/
    ├── core/ (21)
    │   ├── adapter.ipynb            # The typed-task half of the capability-unit fracture (pass-2 Thread 3) —
    │   ├── adapter_manifest.ipynb   # The ADAPTER unit's registration manifest + the surface-based compatibility matcher (CR-17 pt 2, stage 4). Pass-2 Thread 3: registration/discovery = per-unit manifests generated in-env and found by `discover_manifests()`; compatibility is DERIVED, not declared — the capability records only its structural surface, the adapter declares its protocol (recorded here as member names + parameter lists), and the substrate matches manifest-vs-manifest. Works against UNLOADED capabilities with zero protocol imports host-side.
    │   ├── capability.ipynb         # The tool-capability interface — the manage-the-tool half of the capability-unit fracture (pass-2 Thread 3)
    │   ├── config.ipynb             # Project-level configuration for paths, runtime settings, and environment management
    │   ├── config_store.ipynb       # Persistent storage for per-capability configuration (with enabled flag)
    │   ├── diagnostics_store.ipynb  # CR-14 (stage 7): the disposable diagnostic-narrative class. Worker-written structured log records (substrate handler stamps contextvars identity — authors never supply attribution) + the host-pumped raw stream chunks (the zero-cooperation death-rattle floor). Retention is a QUERY, not file mechanics. Design ledger: `claude-docs/stage-7-evidence.md`.
    │   ├── empirical_store.ipynb    # Persistent store for empirically-observed resource usage per (instance_id, config_hash) pair. CR-7's data foundation — `record_sample` is called from `CapabilityManager.execute_capability*` finally blocks; aggregates feed eviction-candidate selection + future UI hints + cost-aware retry decisions.
    │   ├── errors.ipynb             # Typed exception hierarchy + JobError dataclass + default classification of bare Python exceptions. The substrate's CR-5 implementation per the 2026-05-19 substrate audit.
    │   ├── journal_store.ipynb      # CR-14 (stage 7): the durable account-of-action. One substrate-derived, host-written, never-auto-deleted SQLite store of typed observability events — the operational half of the attempted-vs-happened asymmetry (the graph records what HAPPENED; the journal records what was ATTEMPTED, including everything the graph by design refuses to contain: failures, refusals, retries, admission decisions, worker lifecycle). Design ledger: `claude-docs/stage-7-evidence.md`.
    │   ├── manager.ipynb            # Capability discovery, loading, and lifecycle management system
    │   ├── manifest_format.ipynb    # Typed parser + writer for the nested v2.0 manifest layout per the 2026-05-19 substrate audit's CR-8. Substrate manifests transitioned from a flat top-level JSON object to a four-section nested layout: `install` (deployment-specific facts populated at install time), `code` (code-derived facts refreshed by `cjm-ctl regenerate-manifest`), `drift_tracking` (a config_schema hash that records the witness shape so live-vs-stored comparisons can detect drift), and `overrides` (an operator-supplied overlay placeholder).
    │   ├── metadata.ipynb           # Data structures for capability metadata
    │   ├── platform.ipynb           # Cross-platform utilities for process management, path handling, and system detection
    │   ├── ports.ipynb              # Capability compositions as DAGs of invocation nodes with typed input/output
    │   ├── proxy.ipynb              # Bridge between Host application and isolated Worker processes
    │   ├── queue.ipynb              # Resource-aware job queue for sequential capability execution with cancellation support
    │   ├── scheduling.ipynb         # Resource scheduling policies for capability execution
    │   ├── secret_store.ipynb       # CR-12: project-local secret storage for API-based capabilities (file-backed, 0600)
    │   ├── telemetry.ipynb          # Shared GPU/CPU attribution helpers used by both `JobQueue._sample_resource_snapshot` (CR-6 Stage 3) and `CapabilityManager._record_sample_safe` (CR-7).
    │   ├── wire.ipynb               # Typed data transfer at the worker boundary — the zero-copy `FileBackedDTO`
    │   └── worker.ipynb             # FastAPI server that runs inside isolated capability environments
    ├── utils/ (3)
    │   ├── cache_paths.ipynb  # Per-(input-content, config) deterministic cache directories for capability outputs
    │   ├── hashing.ipynb      # Shared cryptographic hashing primitives for content integrity verification
    │   └── validation.ipynb   # Validation helpers for capability configuration dataclasses
    ├── bootstrap.ipynb  # One-call factory that assembles a CapabilityManager + JobQueue + capability bindings — closes the demo-app boilerplate duplication audited across 5 substrate consumers.
    └── cli.ipynb        # CLI tool for declarative capability management

Total: 26 notebooks across 2 directories

## Module Dependencies

``` mermaid
graph LR
    bootstrap["bootstrap<br/>Bootstrap"]
    cli["cli<br/>cli"]
    core_adapter["core.adapter<br/>Task Adapter"]
    core_adapter_manifest["core.adapter_manifest<br/>core.adapter_manifest"]
    core_capability["core.capability<br/>Tool Capability"]
    core_config["core.config<br/>Configuration"]
    core_config_store["core.config_store<br/>Capability Config Store"]
    core_diagnostics_store["core.diagnostics_store<br/>Diagnostics Store"]
    core_empirical_store["core.empirical_store<br/>Empirical Resource Tracking"]
    core_errors["core.errors<br/>Capability Error Taxonomy"]
    core_journal_store["core.journal_store<br/>Journal Store"]
    core_manager["core.manager<br/>Capability Manager"]
    core_manifest_format["core.manifest_format<br/>Manifest Format (v2.0)"]
    core_metadata["core.metadata<br/>Capability Metadata"]
    core_platform["core.platform<br/>Platform Utilities"]
    core_ports["core.ports<br/>Composition Ports"]
    core_proxy["core.proxy<br/>Remote Capability Proxy"]
    core_queue["core.queue<br/>Job Queue"]
    core_scheduling["core.scheduling<br/>Scheduling"]
    core_secret_store["core.secret_store<br/>Capability Secret Store"]
    core__telemetry["core._telemetry<br/>Substrate Telemetry Helpers"]
    core_wire["core.wire<br/>Typed Wire Layer"]
    core_worker["core.worker<br/>Universal Worker"]
    utils_cache_paths["utils.cache_paths<br/>Cache Paths"]
    utils_hashing["utils.hashing<br/>Content Hashing Utilities"]
    utils_validation["utils.validation<br/>Configuration Validation"]

    bootstrap --> core_manager
    bootstrap --> core_scheduling
    bootstrap --> core_queue
    cli --> core_config
    cli --> core_platform
    cli --> core_manifest_format
    cli --> core_adapter_manifest
    cli --> core_metadata
    core_capability --> core_errors
    core_diagnostics_store --> core_wire
    core_empirical_store --> utils_hashing
    core_manager --> core_diagnostics_store
    core_manager --> core_config
    core_manager --> core_errors
    core_manager --> core_metadata
    core_manager --> core_config_store
    core_manager --> core_capability
    core_manager --> core__telemetry
    core_manager --> core_empirical_store
    core_manager --> core_manifest_format
    core_manager --> core_adapter_manifest
    core_manager --> core_journal_store
    core_manager --> utils_validation
    core_manager --> core_scheduling
    core_manager --> core_secret_store
    core_manager --> core_proxy
    core_manifest_format --> core_metadata
    core_manifest_format --> utils_hashing
    core_platform --> core_config
    core_ports --> core_errors
    core_proxy --> core_diagnostics_store
    core_proxy --> core_config
    core_proxy --> core_errors
    core_proxy --> core_wire
    core_proxy --> core_capability
    core_proxy --> core_journal_store
    core_proxy --> core_platform
    core_queue --> core_diagnostics_store
    core_queue --> core_ports
    core_queue --> core_wire
    core_queue --> core__telemetry
    core_queue --> core_journal_store
    core_queue --> core_errors
    core_scheduling --> core_metadata
    core_worker --> core_wire
    core_worker --> core_platform
    core_worker --> core_errors
    core_worker --> core_diagnostics_store
    core_worker --> core_journal_store
    core_worker --> core_capability
    utils_cache_paths --> core_empirical_store
    utils_cache_paths --> utils_hashing
    utils_validation --> core_errors
```

*53 cross-module dependencies detected*

## CLI Reference

### `cjm-ctl` Command

                                                                                    
     Usage: cjm-ctl [OPTIONS] COMMAND [ARGS]...                                     
                                                                                    
     cjm-substrate CLI                                                              
                                                                                    
    ╭─ Options ────────────────────────────────────────────────────────────────────╮
    │ --cjm-config                PATH  Path to cjm.yaml configuration file        │
    │ --data-dir                  PATH  Override data directory (manifests, logs)  │
    │ --conda-prefix              PATH  Override conda/mamba prefix path           │
    │ --conda-type                TEXT  Conda implementation: micromamba,          │
    │                                   miniforge, or conda                        │
    │ --install-completion              Install completion for the current shell.  │
    │ --show-completion                 Show completion for the current shell, to  │
    │                                   copy it or customize the installation.     │
    │ --help                            Show this message and exit.                │
    ╰──────────────────────────────────────────────────────────────────────────────╯
    ╭─ Commands ───────────────────────────────────────────────────────────────────╮
    │ setup-runtime              Download and setup micromamba runtime for         │
    │                            project-local mode.                               │
    │ regenerate-manifest        Re-run introspection for an installed capability  │
    │                            and rewrite its manifest.                         │
    │ generate-adapter-manifest  CR-17 pt 2 (stage 4): introspect a task-adapter   │
    │                            impl in-env and write its adapter manifest.       │
    │ install-all                Install and register all capabilities defined in  │
    │                            capabilities.yaml.                                │
    │ setup-host                 Install interface libraries in the current Python │
    │                            environment.                                      │
    │ list                       List installed capabilities from manifest         │
    │                            directory.                                        │
    │ logs                       Tail / follow the observability stores (CR-14).   │
    │ retention                  Apply the diagnostics retention policy now        │
    │                            (CR-14).                                          │
    │ remove                     Remove a capability's manifest and conda          │
    │                            environment.                                      │
    │ validate                   SG-6 + T23: validate a manifest /                 │
    │                            capabilities.yaml / capability source.            │
    │ set-secret                 Store a capability secret in the project-local    │
    │                            SecretStore (CR-12).                              │
    │ list-secrets               List the secret KEY NAMES stored for a capability │
    │                            — never the values (CR-12).                       │
    ╰──────────────────────────────────────────────────────────────────────────────╯

For detailed help on any command, use `cjm-ctl <command> --help`.

## Module Overview

Detailed documentation for each module in the project:

### Task Adapter (`adapter.ipynb`)

> The typed-task half of the capability-unit fracture (pass-2 Thread 3)
> —

#### Import

``` python
from cjm_substrate.core.adapter import (
    TaskAdapter
)
```

#### Classes

``` python
class TaskAdapter(ABC):
    """
    Base for task adapters — the typed-task half of the capability-unit
    fracture (pass-2 Thread 3).
    
    Subclasses (one ABC per task, in `cjm-<task>-adapter-interface` libraries)
    declare:
    
    - the TYPED task method (the contract `execute(*args, **kwargs)` never
      gave the task), abstract on the per-task ABC;
    - `task_name`: the task this adapter serves (e.g. "transcription");
    - `required_tool_protocol`: the structural contract required of a tool
      capability (a `typing.Protocol`; provisional `None` until the
      protocol is evidence-locked — Q5 posture: declare the slot, let
      stage-4/8 tool-splitting evidence finalize the protocol bodies);
    - the task's persistence helpers (storage classes), beside the task
      method rather than on it.
    
    Implementations run in-worker beside their tool capability. The base is
    deliberately mechanism-light: registry/routing is CR-17 pt 2 (stage 4).
    """
```

### core.adapter_manifest (`adapter_manifest.ipynb`)

> The ADAPTER unit’s registration manifest + the surface-based
> compatibility matcher (CR-17 pt 2, stage 4). Pass-2 Thread 3:
> registration/discovery = per-unit manifests generated in-env and found
> by `discover_manifests()`; compatibility is DERIVED, not declared —
> the capability records only its structural surface, the adapter
> declares its protocol (recorded here as member names + parameter
> lists), and the substrate matches manifest-vs-manifest. Works against
> UNLOADED capabilities with zero protocol imports host-side.

#### Import

``` python
from cjm_substrate.core.adapter_manifest import (
    AdapterManifest,
    adapter_manifest_from_dict,
    is_adapter_manifest,
    match_protocol_against_surface
)
```

#### Functions

``` python
def adapter_manifest_from_dict(
    d: Dict[str, Any],  # On-disk JSON dict (the "class" key maps to class_name)
) -> AdapterManifest:  # Typed adapter manifest
    "Reconstruct an `AdapterManifest` from its on-disk JSON shape."
```

``` python
def is_adapter_manifest(
    data: Any,  # Raw JSON-decoded manifest content
) -> bool:  # True when the payload declares the adapter unit kind
    """
    Route a manifest file by the `unit` discriminator (capability manifests
    carry no `unit` key; checked BEFORE `load_manifest` parsing).
    """
```

``` python
def match_protocol_against_surface(
    protocol_members: Dict[str, Any],  # {"methods": [...], "properties": [...]} from the adapter manifest
    structural_surface: Optional[Dict[str, Any]],  # Capability manifest code.structural_surface (None = pre-fracture)
) -> Dict[str, Any]:  # {"compatible", "missing_methods", "missing_properties", "param_mismatches", "reason"}
    """
    Surface-based compatibility (pass-2 Thread 3) — host-side, manifest-vs-
    manifest, safe against UNLOADED capabilities.
    
    Method rule: same name present + (when both sides record params) the
    surface params must START WITH the protocol params (prefix rule).
    Property rule: name present in the surface's properties.
    No surface recorded -> NOT compatible, with the reason spelled out.
    """
```

#### Classes

``` python
@dataclass
class AdapterManifest:
    """
    A discovered ADAPTER unit (CR-17 pt 2) — the registration record for one
    task-adapter implementation installed in some tool's worker env.
    
    Generated in-env by `cjm-ctl generate-adapter-manifest` (the protocol
    members are introspected where the protocol is importable); discovered
    host-side beside capability manifests via the `unit` discriminator.
    """
    
    name: str  # Unique unit name ("module.ClassName")
    version: str  # Interface-lib version at generation
    task_name: str  # The task this adapter serves (e.g. "graph-storage")
    module: str  # Impl module (importable in the tool's worker env)
    class_name: str  # Impl class name
    required_tool_protocol: str  # Protocol FQN (semantic contract; host never imports it)
    protocol_members: Dict[str, Any] = field(...)  # {"methods": [{"name","signature","params"}], "properties": [...]}
    conda_env: str = ''  # Env the manifest was generated from
    generated_at: str = ''  # ISO timestamp
    unit: str = 'adapter'  # Manifest-kind discriminator
    
    def to_dict(self) -> Dict[str, Any]:  # JSON-ready dict ("class" key on disk)
            """Serialize to the on-disk JSON shape."""
            return {
                "unit": self.unit,
        "Serialize to the on-disk JSON shape."
```

### Bootstrap (`bootstrap.ipynb`)

> One-call factory that assembles a CapabilityManager + JobQueue +
> capability bindings — closes the demo-app boilerplate duplication
> audited across 5 substrate consumers.

#### Import

``` python
from cjm_substrate.bootstrap import (
    CapabilitySpec,
    Pipeline,
    create_pipeline
)
```

#### Functions

``` python
def _normalize_spec(
    spec: CapabilitySpec  # Raw spec from caller
) -> Tuple[str, Optional[Dict[str, Any]]]:  # (capability_name, optional config)
    """
    Normalize a capability spec into a `(name, config)` pair.
    
    Accepts a bare string, a `(name, config)` tuple, or a mapping with a 'name'
    key (config under 'config' or the mapping itself minus 'name').
    """
```

``` python
def create_pipeline(
    capabilities: Optional[Iterable[CapabilitySpec]] = None,  # Capabilities to discover + load
    scheduler: Optional[ResourceScheduler] = None,  # Resource policy (default: permissive)
    system_monitor: Optional[str] = None,  # Capability name to register as system monitor
    search_paths: Optional[List[Path]] = None,  # Custom manifest search paths
    queue_kwargs: Optional[Dict[str, Any]] = None,  # Extra kwargs forwarded to JobQueue
    strict: bool = True,  # SG-5 strict config validation on each load
) -> Pipeline:  # Assembled stack ready to start
    """
    Assemble a CapabilityManager + JobQueue + capability bindings in one call.
    
    Steps performed:
      1. Construct CapabilityManager with the given scheduler + search paths
      2. discover_manifests()
      3. For each spec in `capabilities`: load the capability and create a CapabilityBinding
      4. If `system_monitor` is set, register that capability as the sys-mon
      5. Construct JobQueue (NOT started — caller starts via context manager)
    
    Capabilities that fail to load are logged but do not raise; their entries are
    omitted from `Pipeline.bindings`. Use the returned `Pipeline.manager` to
    inspect which loads succeeded.
    """
```

#### Classes

``` python
@dataclass
class Pipeline:
    "Assembled substrate stack: manager + queue + capability bindings."
    
    manager: CapabilityManager  # Discovery + lifecycle
    queue: JobQueue  # Job submission + scheduling
    bindings: Dict[str, CapabilityBinding] = field(...)  # Capability name -> bound view
    
    async def start(self) -> None:
            """Start the job queue's background processor."""
            await self.queue.start()
        
        async def stop(self) -> None
        "Start the job queue's background processor."
    
    async def stop(self) -> None:
            """Stop the job queue and unload all capabilities."""
            await self.queue.stop()
            self.manager.unload_all()
        
        async def __aenter__(self) -> "Pipeline"
        "Stop the job queue and unload all capabilities."
```

### Cache Paths (`cache_paths.ipynb`)

> Per-(input-content, config) deterministic cache directories for
> capability outputs

#### Import

``` python
from cjm_substrate.utils.cache_paths import (
    cache_dir_for_config,
    list_cache_entries,
    prune_cache_for_input
)
```

#### Functions

``` python
def _sanitize_stem(
    input_path: Union[str, Path],  # Path whose stem to sanitize
) -> str:                            # Filesystem-portable stem string
    """
    Return a filesystem-portable, length-capped version of input_path's stem.
    
    Steps: take `Path(input_path).stem`, replace unsafe characters with `_`,
    strip leading/trailing dots + whitespace (Windows path-component rule),
    length-cap to `_MAX_STEM_LEN`. Empty stems (degenerate paths like `.`)
    return `_` so callers always get a usable path component.
    """
```

``` python
def _get_stat_cache_path() -> Optional[Path]:
    """Locate the stat-cache SQLite path under the substrate's data dir.

    Returns the configured `cfg.data_dir / "input_hash_cache.db"` when the
    substrate config is available, falling back to `~/.cjm/input_hash_cache.db`
    when get_config() raises (e.g., during early-init tests). Returns None
    only when neither resolves — callers then skip caching entirely.
    """
    try
    """
    Locate the stat-cache SQLite path under the substrate's data dir.
    
    Returns the configured `cfg.data_dir / "input_hash_cache.db"` when the
    substrate config is available, falling back to `~/.cjm/input_hash_cache.db`
    when get_config() raises (e.g., during early-init tests). Returns None
    only when neither resolves — callers then skip caching entirely.
    """
```

``` python
def _ensure_stat_cache_schema(conn: sqlite3.Connection) -> None
    "Create the stat-cache table + indices if not present."
```

``` python
def _hash_input_with_stat_cache(
    input_path: Path,                      # File whose content hash we need
    cache_path: Optional[Path] = None,     # Explicit cache DB path (default: substrate-resolved)
    *,
    skip_cache: bool = False,              # If True, bypass cache entirely (compute always)
) -> str:                                   # Hash string in "algo:hexdigest" format
    """
    Return the SHA-256 content hash of `input_path`, with stat-cache memoization.
    
    Fast path: stat the file, look up `(absolute_path, mtime_ns, size)` in the
    SQLite cache; cache hit returns in microseconds. Cold path: `hash_file`
    streams the file content, then writes the result to the cache.
    
    `skip_cache=True` bypasses the cache entirely — useful for callers that
    KNOW the file content changed (e.g., a capability just wrote it and wants
    to record the canonical hash without polluting the cache with intermediate
    states).
    
    File not found / unreadable → propagates the underlying OSError. Cache
    DB errors are LOG-and-FALLBACK to direct hashing — the cache is an
    optimization, not a correctness invariant.
    """
```

``` python
def cache_dir_for_config(
    capability_data_dir: Union[str, Path],     # The capability's own data subdirectory (typically <cfg.capability_data_dir>/<capability_name>)
    input_path: Union[str, Path],           # The input file the capability operates on
    action: str,                            # The capability action name (e.g., "segment_audio", "convert", "execute")
    config_dict: Dict[str, Any],            # The capability's effective config for this action
    *,
    input_hash_length: int = 6,             # Truncation length for the input content hash in the directory name
    config_hash_length: int = 12,           # Truncation length for the config hash in the directory name
    create: bool = True,                    # Auto-create the directory (parents=True, exist_ok=True)
    hash_input_content: bool = True,        # If False, hash str(input_path) instead (e.g., URL inputs)
    skip_input_cache: bool = False,         # If True, bypass the stat-cache (always recompute content hash)
) -> Path:                                   # The deterministic cache directory path
    """
    Return (and optionally create) a per-(input-content, config) cache directory.
    
    Path layout::
    
        <capability_data_dir>/<action>/<sanitized-stem>/<input_hash[:N]>_<config_hash[:M]>/
    
    The same `(input_content, action, config_dict)` always resolves to the same
    path; any change to input content OR config produces a different path. This
    means:
    
    1. Different configs go to different directories — no silent overwrite.
    2. Stale-artifact accumulation is impossible — each unique
       `(input_content, config)` tuple has its OWN directory.
    3. For chained capability sequences, upstream config changes propagate through
       content changes: if capability A's output content depends on A's config and
       capability B reads that output, B's cache key automatically reflects A's
       config indirectly.
    
    `hash_input_content=False` switches to hashing the string form of
    `input_path` instead of file content — for capabilities whose "input" is a URL,
    a database row ID, or another non-file identifier. Sequence chaining via
    content propagation only works for true file inputs.
    
    `skip_input_cache=True` recomputes the input content hash even if the
    stat-cache has a record. Useful for capabilities that just wrote the input file
    and want to record its canonical hash without stale-cache risk.
    
    Raises FileNotFoundError if `input_path` doesn't exist and
    `hash_input_content=True`. Raises OSError on directory-create failure
    when `create=True`.
    """
```

``` python
def list_cache_entries(
    """
    Enumerate all per-config cache directories for a given (input, action).
    
    Returns the paths of every `<input_hash>_<config_hash>` directory under
    `<capability_data_dir>/<action>/<sanitized-stem>/`. Each entry corresponds to
    a unique `(input_content, config)` tuple — operators can inspect their
    contents, diff them, or pass selected ones to `prune_cache_for_input` to
    keep them through a sweep.
    
    Returns an empty list if the parent directory doesn't exist (capability never
    ran this action for this input).
    """
```

``` python
def prune_cache_for_input(
    """
    Delete per-config cache directories for `(input, action)`, optionally
    preserving a `keep` set.
    
    Pairs with `list_cache_entries` for inspect-then-prune workflows: list
    candidates, choose which to keep, then call prune with the keep set.
    `keep=None` deletes ALL entries.
    
    `dry_run=True` returns the would-delete list without touching the
    filesystem — useful for operator confirmation before destructive ops.
    
    Returns the list of deleted (or would-delete) paths.
    """
```

#### Variables

``` python
_MAX_STEM_LEN = 100
_UNSAFE_CHARS
```

### Tool Capability (`capability.ipynb`)

> The tool-capability interface — the manage-the-tool half of the
> capability-unit fracture (pass-2 Thread 3)

#### Import

``` python
from cjm_substrate.core.capability import (
    RELOAD_TRIGGER,
    WORKER_ENV_TEMPLATE_PLACEHOLDERS,
    ConfigOption,
    FieldOptions,
    EnvVarSpec,
    expand_worker_env_template,
    template_check_placeholders,
    ToolCapability,
    capability_action,
    collect_capability_actions,
    derive_structural_surface
)
```

#### Functions

``` python
def expand_worker_env_template(
    template: str,                       # The raw EnvVarSpec.default value (may contain ${...} placeholders)
    placeholders: Mapping[str, Optional[str]],  # Resolved values keyed by placeholder name
    *,
    capability_name: str = "",               # For error context ("template X on capability Y references ...")
    var_name: str = "",                  # For error context ("on EnvVarSpec(name=Z)")
) -> str
    """
    Substitute `${VAR}` placeholders in `template` using `placeholders`.
    
    Strict mode (no `safe_substitute`): unknown placeholders raise
    `CapabilityConfigError` with descriptive context. Single-pass, non-recursive —
    substituted values are taken verbatim, never re-scanned for further
    placeholders. Templates without any `${...}` syntax pass through unchanged
    (so plain static defaults work as before).
    
    The allowed placeholder vocabulary is fixed via `WORKER_ENV_TEMPLATE_PLACEHOLDERS`.
    A `${FOO}` whose name is in the vocabulary but whose RESOLVED value is None
    (e.g. `CJM_MODELS_DIR` when the operator hasn't configured one) raises
    `CapabilityConfigError` with the same shape — operators get a clear signal that
    the capability needs a value they haven't provided, rather than a silent
    substitution of empty string into a load-bearing path.
    """
```

``` python
def template_check_placeholders(
    template: str,                       # The raw EnvVarSpec.default value
) -> Set[str]:                            # Placeholder names referenced (allowed-vocabulary-validated)
    """
    Return the set of placeholder names referenced by a worker-env template.
    
    Validates the vocabulary (unknown names raise CapabilityConfigError) without
    requiring a placeholder-value mapping. Useful for `cjm-ctl validate`'s
    dry-run check at install/release time — surface the bug BEFORE the capability
    tries to spawn a worker with a malformed default.
    
    Templates without `${...}` return an empty set.
    """
```

``` python
def _report_progress_threadsafe(
    self,
    progress: float,  # 0.0 to 1.0, or -1.0 for indeterminate
    message: str = "",  # Descriptive status message
) -> None
    """
    Thread-safe report_progress — replaces ToolCapability.report_progress.
    
    Writes `_progress` + `_status_message_base` + `_status_message` under the
    capability's `_progress_lock` if one exists (lazy-init by `heartbeat()` before
    spawning its thread). When no lock exists yet — the single-threaded common
    case before any heartbeat() call — the writes happen without lock overhead.
    
    The heartbeat thread reads `_status_message_base` (NOT `_status_message`)
    so heartbeat-amended messages don't accumulate elapsed-time suffixes when
    `report_progress` is called concurrently. The capability's call overwrites both
    fields atomically; the next heartbeat tick reads the new base.
    """
```

``` python
@contextmanager
def _heartbeat(
    self,
    phase: str,                    # Phase label embedded in the heartbeat message
    *,
    interval: float = 0.5,         # Seconds between heartbeats (substrate stalls at 60s default)
    progress: Optional[float] = None,  # Optional initial progress; None preserves current
) -> Iterator[None]
    """
    Heartbeat context manager — spawn a daemon thread that advances the
    (progress, message) tuple every `interval` seconds, defeating the
    substrate's prefetch stall detection during silent blocking calls.
    
    Usage:
    
        def prefetch(self):
            with self.heartbeat("loading whisper model"):
                self._load_model()  # Blocking; HF Hub download / from_pretrained
    
    Behavior:
    
    - At entry: writes `(progress, phase)` to set the initial state. If
      `progress` is None, preserves whatever `self._progress` already holds
      (defaulting to 0.5 indeterminate if never set).
    - During the block: a daemon thread emits an updated `_status_message =
      "<base> ({elapsed:.1f}s)"` every `interval` seconds. The thread reads
      `_status_message_base` (set by `report_progress`) for the base, so an
      explicit `report_progress(0.3, "downloading weights")` from inside the
      block makes the next heartbeat tick display "downloading weights (Xs)".
    - At exit: signals the thread to stop, joins with timeout (the thread is
      daemon so cleanup is best-effort but reliable). The capability's final
      `_status_message` state is left as the last heartbeat-amended value;
      callers wanting a clean "completed" state should call
      `report_progress(1.0, "done")` after the with-block.
    
    Thread safety: relies on the upgraded thread-safe `report_progress`
    (loaded by this same cell). The lock is lazy-initialized HERE — before
    spawning the heartbeat thread — so concurrent main-thread + heartbeat-
    thread calls always see a lock.
    
    Cancellation: the heartbeat thread checks `stop_event` between sleeps
    and exits cleanly. If the with-block raises, the `finally` clause still
    fires `stop_event.set()`, so the thread won't leak.
    """
```

``` python
def capability_action(
    action_name: str  # Public action name the decorated method handles
) -> Callable[[Callable], Callable]:  # Decorator
    """
    Marker decorator tagging a capability method as the handler for `action_name`.
    
    Sets `func._capability_action = action_name`. Capability authors with dispatcher-style
    `execute(action, **kwargs)` use `collect_capability_actions(cls)` to derive their
    `supported_actions` set from these markers rather than maintaining a separate
    list. The decorator does not change call semantics — the wrapped function is
    returned unchanged.
    """
```

``` python
def collect_capability_actions(
    cls: type  # Class (or subclass) to scan for @capability_action-tagged methods
) -> Set[str]:  # Set of action names handled by `cls` (including inherited)
    """
    Collect action names from `@capability_action`-decorated methods on `cls`.
    
    Walks the class's MRO so subclasses inherit action handlers from base
    classes automatically. The returned set is suitable for
    `supported_actions: ClassVar[Set[str]] = collect_capability_actions(MyCapability)`
    once the capability class body has been defined.
    """
```

``` python
def _dispatch_to_action(
    self,
    action: str,  # Action name to dispatch (matched against @capability_action tags)
    **kwargs,     # Forwarded verbatim to the resolved handler
) -> Any:         # Whatever the handler returns
    """
    T28: dispatch `action` to its `@capability_action`-tagged handler.
    
    Walks the instance's MRO for a method tagged `_capability_action == action`
    (the SAME markers `collect_capability_actions` / `supported_actions` are built
    from) and calls it as `handler(self, **kwargs)`. Unknown actions raise the
    typed `CapabilityInputError(fields_invalid=["action"])` (CR-5) — identical
    behaviour to the hand-rolled dispatchers this replaces.
    
    Dispatcher-style capabilities (MediaProcessing / Graph / Text) collapse their
    `execute` to a one-liner instead of reimplementing the MRO walk in every
    capability (the 5x copy SG-44 + this helper retire):
    
        @capability_action("separate_vocals")
        def _separate_vocals(self, **kwargs): ...
    
        supported_actions = collect_capability_actions(MyCapability)
    
        def execute(self, action="separate_vocals", **kwargs):
            return self.dispatch_to_action(action, **kwargs)
    """
```

``` python
def derive_structural_surface(
    cls: type,  # The capability class to introspect
) -> Dict[str, Any]:  # {"methods": [...], "properties": [...], "attributes": [...]}
    """
    Record a capability class's structural surface by pure self-introspection.
    
    The FULL public surface is recorded, inherited members included —
    the surface is what adapter protocols match against, and protocol
    members may name inherited methods. Deterministic (name-sorted) so
    the canonical-JSON witness hash is stable across runs.
    
    Classification: `property` → properties (names only); functions
    (static/class methods unwrapped) → methods with `str(inspect.signature)`
    + the parameter NAME list `params` (self excluded — the CR-17 pt 2
    surface matcher's input; stage 4);
    everything else public → attributes with the value's type name
    (config_class, supported_actions, WORKER_ENV, ...).
    """
```

#### Classes

``` python
@dataclass
class ConfigOption:
    "CR-11: one live option for a dynamic config field, with optional metadata."
    
    value: Any  # option value (e.g. "gemini-2.5-flash")
    label: str  # display label (e.g. "Gemini 2.5 Flash")
    metadata: Dict[str, Any] = dataclasses.field(default_factory=dict)  # token limits, descriptions, ...
```

``` python
@dataclass
class FieldOptions:
    """
    CR-11: the live option domain for one dynamic config field.
    
    Kept SEPARATE from the static config_schema (which CR-8 hashes for drift
    detection). The capability-config UI merges these live options on top of the
    static schema; folding them into the schema would make every API capability
    perpetually 'drift'.
    """
    
    options: List[ConfigOption]  # current valid options
    constraints: Dict[str, Any] = dataclasses.field(default_factory=dict)  # derived field-constraint overrides
```

``` python
@dataclass
class EnvVarSpec:
    """
    CR-12: one entry of a capability's spawn-time worker-environment contract.
    
    A capability declares the environment variables its worker subprocess reads at
    startup via `WORKER_ENV: ClassVar[List[EnvVarSpec]]`. Worker env vars are
    FIXED AT SPAWN — changing one requires a worker RESPAWN, so the substrate
    routes such changes through `reload_capability`, never `reconfigure` (the env is
    baked into the subprocess at `Popen` and can't be mutated in-process). This
    is the lifecycle distinction from a normal config field (reconfigurable in
    place via `reconfigure`/`_release_<trigger>`).
    
    Two flavors share this one declaration:
    
      - `secret=True`  : value resolved from the `SecretStore` (masked; never
                         persisted in the config store, echoed in config_schema,
                         or logged). A secret never carries a `default` — a
                         baked-in secret is a leak.
      - `secret=False` : visible value resolved from the override chain
                         (operator override > manifest `install.env_vars` >
                         this `default`); safe to display.
    
    Both share one injection seam: the substrate composes the resolved
    {name: value} overlay at load time and injects it into the worker env at
    spawn (extending the existing CJM_CAPABILITY_DATA_DIR / CJM_MODELS_DIR injection).
    This is "derive from behaviour, not metadata" applied to the spawn env: the
    capability declares WHICH vars it consumes + whether each is secret/required;
    the substrate owns resolution + injection.
    
    `options` is a forward seam for visible vars with a finite domain (e.g. a
    device selector enumerating GPUs); unused today but reserved so the
    capability-config UI / a future `set-env` surface isn't blocked.
    """
    
    name: str  # The env var the worker reads, e.g. "GEMINI_API_KEY"
    secret: bool = False  # True -> value resolved from the SecretStore, masked
    required: bool = False  # Worker can't do useful work until this is satisfied
    label: str = ''  # Display label for CLI / GUI affordances
    description: str = ''  # Help text for CLI / GUI
    default: Optional[str]  # Visible vars only; secrets must leave this None
    options: Optional[List[str]]  # Forward seam: finite domain for a visible var (unused today)
```

``` python
class ToolCapability(ABC):
    """
    Tool-capability interface: manage the tool/worker — identity, lifecycle,
    config, cancellation, observability (pass-2 Thread 3 fracture).
    
    The task channel is NOT part of this surface: `execute` left the abstract
    set when the capability-unit fracture split tool capabilities from task
    adapters. Typed task contracts live on adapters (`core.adapter` + the
    per-task `cjm-<task>-adapter-interface` libraries). Fused-era capabilities (the
    pre-Option-C 12) still define `execute` themselves and their domain ABCs
    still declare it abstract — they kept working unchanged through the
    class-identical `ToolCapability` alias (later REMOVED at SG-48) until the
    Option C migration cascade split them (the alias was REMOVED at SG-48).
    
    CR-4 extended this surface with: prefetch hook (SG-19), made cleanup optional
    (SG-43), reconfigure split + RELOAD_TRIGGER declarative-helper (lifecycle
    hook split), and cooperative-cancellation primitives (SG-16 — flag + callback
    + context manager).
    
    Abstract methods: `name`, `version`, `initialize`, `get_config_schema`,
    `get_current_config`. Concrete defaults (overridable): `execute_stream`
    (transitional — see method), `cleanup`, `prefetch`, `reconfigure`,
    `cancel`, `check_cancel`, `register_cancel_callback`, `cancel_signal_to`,
    `report_progress`, `report_usage`, `fields_that_changed`,
    `reconfigure_with_triggers`, `on_disable`, `on_enable`.
    """
    
    def name(self) -> str: # Unique identifier for this capability
            """Unique capability identifier."""
            ...
    
        @property
        @abstractmethod
        def version(self) -> str: # Semantic version string (e.g., "1.0.0")
        "Unique capability identifier."
    
    def version(self) -> str: # Semantic version string (e.g., "1.0.0")
            """Capability version."""
            ...
    
        @abstractmethod
        def initialize(
            self,
            config: Optional[Dict[str, Any]] = None # Configuration dictionary
        ) -> None
        "Capability version."
    
    def initialize(
            self,
            config: Optional[Dict[str, Any]] = None # Configuration dictionary
        ) -> None
        "Initialize or re-configure the capability.

CR-4: this is "first-time setup" — called once after construction with
the initial config. Substrate uses `reconfigure(old, new)` for delta
updates afterwards. Capabilities predating CR-4 see no behavior change since
the default `reconfigure()` body delegates to `reconfigure_with_triggers`
which is a no-op unless the capability opts in via RELOAD_TRIGGER metadata."
    
    def execute_stream(
            self,
            *args,
            **kwargs
        ) -> Generator[Any, None, None]: # Yields partial results
        "Stream execution results chunk by chunk.

TRANSITIONAL(option-c-cascade): streaming is substrate/composition-
supplied under the pass-2 fracture (off both interfaces); the default
stays here only because fused-era capabilities rely on it (it calls the
capability's own `execute`, which a split tool capability does not have).
Relocates when CR-17 adapter routing lands (execution stage 4)."
    
    def get_config_schema(self) -> Dict[str, Any]: # JSON Schema for configuration
            """Return JSON Schema describing the capability's configuration options."""
            ...
    
        @abstractmethod
        def get_current_config(self) -> Dict[str, Any]: # Current configuration values
        "Return JSON Schema describing the capability's configuration options."
    
    def get_current_config(self) -> Dict[str, Any]: # Current configuration values
            """Return the current configuration state as a dictionary."""
            ...
    
        def get_config_options(self) -> Dict[str, "FieldOptions"]
        "Return the current configuration state as a dictionary."
    
    def get_config_options(self) -> Dict[str, "FieldOptions"]:
            """CR-11: live option domains for dynamic config fields, keyed by field name.
    
            Optional. Default: {} (fully static capabilities). For fields whose valid
            domain is determined at runtime (e.g. an API model list), return a
            `FieldOptions` carrying current `ConfigOption` values + per-option
            metadata (token limits, etc.) + optional derived constraints. Runs in
            the worker subprocess (has the capability's deps + credentials).
    
            Kept SEPARATE from get_config_schema(): the schema is static + hashed for
            CR-8 drift detection; these options are the live, un-hashed companion the
            capability-config UI merges on top. A fetch failure should raise a typed CR-5
            error; the substrate's CapabilityManager.get_config_options accessor degrades
            to {} so the UI can fall back to the static schema.
            """
            return {}
    
        def cleanup(self) -> None
        "CR-11: live option domains for dynamic config fields, keyed by field name.

Optional. Default: {} (fully static capabilities). For fields whose valid
domain is determined at runtime (e.g. an API model list), return a
`FieldOptions` carrying current `ConfigOption` values + per-option
metadata (token limits, etc.) + optional derived constraints. Runs in
the worker subprocess (has the capability's deps + credentials).

Kept SEPARATE from get_config_schema(): the schema is static + hashed for
CR-8 drift detection; these options are the live, un-hashed companion the
capability-config UI merges on top. A fetch failure should raise a typed CR-5
error; the substrate's CapabilityManager.get_config_options accessor degrades
to {} so the UI can fall back to the static schema."
    
    def cleanup(self) -> None:
            """Clean up resources when capability is unloaded.
            
            CR-4: made optional (SG-43 closure). Was `@abstractmethod` before; every
            audited capability overrode it with a near-no-op, so the default is now a
            no-op and capability authors override only when they have non-trivial
            teardown (file handles, GPU memory, database connections). The
            substrate's worker /cleanup endpoint calls this regardless.
            """
            pass
    
        def prefetch(self) -> None
        "Clean up resources when capability is unloaded.

CR-4: made optional (SG-43 closure). Was `@abstractmethod` before; every
audited capability overrode it with a near-no-op, so the default is now a
no-op and capability authors override only when they have non-trivial
teardown (file handles, GPU memory, database connections). The
substrate's worker /cleanup endpoint calls this regardless."
    
    def prefetch(self) -> None:
            """Acquire heavy resources eagerly without invoking execute().
            
            CR-4 (SG-19): default no-op. Capability authors override when downstream
            callers benefit from eager acquisition — typically transcription /
            inference capabilities that lazy-download models on first execute. The
            substrate's prefetch_capability(name) API + worker /prefetch endpoint
            invoke this. Should be idempotent (safe to call multiple times) since
            the substrate may pre-warm at load time AND on operator request.
            """
            pass
    
        def reconfigure(
            self,
            old_config: Optional[Dict[str, Any]],  # Previous configuration values
            new_config: Optional[Dict[str, Any]],  # New configuration values to apply
        ) -> None
        "Acquire heavy resources eagerly without invoking execute().

CR-4 (SG-19): default no-op. Capability authors override when downstream
callers benefit from eager acquisition — typically transcription /
inference capabilities that lazy-download models on first execute. The
substrate's prefetch_capability(name) API + worker /prefetch endpoint
invoke this. Should be idempotent (safe to call multiple times) since
the substrate may pre-warm at load time AND on operator request."
    
    def reconfigure(
            self,
            old_config: Optional[Dict[str, Any]],  # Previous configuration values
            new_config: Optional[Dict[str, Any]],  # New configuration values to apply
        ) -> None
        "Apply a configuration change without re-running full initialize().

CR-4 completion (2026-05-25): reconfigure is the substrate's canonical
delta path - `CapabilityManager.update_capability_config` routes here, NOT through
a bare `initialize(new_config)`. It fires `_release_<trigger>` releases for
changed RELOAD_TRIGGER fields, then re-applies config (see body below).

Distinction from initialize(): initialize sets up persistent state once
after construction; reconfigure applies delta updates and is the
canonical entry point for hot-reload via the substrate's
update_capability_config path."
    
    def fields_that_changed(
            self,
            old: Dict[str, Any],  # Previous config snapshot
            new: Dict[str, Any],  # Proposed new config snapshot
        ) -> Set[str]:  # Field names whose values differ between old and new
        "Return the set of field names whose values differ between old and new.

Includes fields present in only one dict (treated as a change to/from
the implicit None). Equality is structural via `!=`; nested dicts /
lists compare by value, not identity. Hashable-vs-unhashable values
compare correctly because we never put them in a set themselves."
    
    def reconfigure_with_triggers(
            self,
            old_config: Dict[str, Any],  # Previous configuration values
            new_config: Dict[str, Any],  # New configuration values being applied
        ) -> None
        "CR-4 helper: walk RELOAD_TRIGGER metadata + fire `_release_<trigger>` methods.

Resolution sequence:

  1. Find the capability's `config_class` attribute (a dataclass). Absent
     means the capability hasn't opted into the declarative pattern; the
     helper returns silently.
  2. Compute the diff between `old_config` and `new_config` via
     `fields_that_changed`.
  3. For each changed field, read its `RELOAD_TRIGGER` metadata key
     (if any) and accumulate the trigger names into a set (de-duping
     when multiple fields share a trigger).
  4. For each trigger, call `self._release_<trigger>()` if it exists
     on the instance.

Capability authors opt in by:

  - Setting `config_class = MyConfigDataclass` as a class attribute.
  - Annotating field metadata with `{RELOAD_TRIGGER: "model"}` etc.
  - Implementing matching `_release_model(self)` instance methods.

Capabilities WITHOUT config_class or RELOAD_TRIGGER metadata land here as a
no-op — safe default for the SG-T3tr migration window where the
cascade hasn't yet adopted the declarative pattern everywhere."
    
    def cancel(self) -> None:
            """Request cooperative cancellation of the current execute() call.
            
            CR-4: default sets the substrate-tracked `_cancel_requested` flag and
            fires any callbacks registered via `register_cancel_callback`.
            
            Capability authors who need extra teardown logic (signaling a subprocess,
            closing a network connection) override `cancel()` and SHOULD call
            `super().cancel()` to preserve the flag-setting + callback-fire
            behavior. The capability's `execute()` polls via `check_cancel()` at safe
            interruption points and unwinds when it raises `CapabilityCancelledError`.
            """
            self._cancel_requested = True
            for cb in list(getattr(self, "_cancel_callbacks", ()))
        "Request cooperative cancellation of the current execute() call.

CR-4: default sets the substrate-tracked `_cancel_requested` flag and
fires any callbacks registered via `register_cancel_callback`.

Capability authors who need extra teardown logic (signaling a subprocess,
closing a network connection) override `cancel()` and SHOULD call
`super().cancel()` to preserve the flag-setting + callback-fire
behavior. The capability's `execute()` polls via `check_cancel()` at safe
interruption points and unwinds when it raises `CapabilityCancelledError`."
    
    def check_cancel(self) -> None:
            """Raise `CapabilityCancelledError` if cancellation has been requested.
            
            CR-4 (SG-16 polling primitive): capability authors call this at safe
            interruption points inside `execute()`. The substrate sets the flag via
            `cancel()` (typically driven by an operator's "Cancel" button); the
            next `check_cancel` call surfaces the cancellation as a typed exception
            that unwinds execute() cleanly.
            
            The substrate's worker /execute wrapper resets the flag before each
            call so cancellation doesn't leak from one job into the next.
            """
            if self._cancel_requested
        "Raise `CapabilityCancelledError` if cancellation has been requested.

CR-4 (SG-16 polling primitive): capability authors call this at safe
interruption points inside `execute()`. The substrate sets the flag via
`cancel()` (typically driven by an operator's "Cancel" button); the
next `check_cancel` call surfaces the cancellation as a typed exception
that unwinds execute() cleanly.

The substrate's worker /execute wrapper resets the flag before each
call so cancellation doesn't leak from one job into the next."
    
    def register_cancel_callback(
            self,
            callback: Callable[[], None],  # Called when cancel() fires
        ) -> None
        "Register a callback that fires when cancel() is called.

CR-4 (SG-16 callback primitive): for capabilities that can't easily insert
polling at strategic points (e.g., a capability wrapping a blocking C
extension). Callbacks should be non-blocking and idempotent. Multiple
callbacks can be registered; all fire in registration order when
cancel() is invoked. A misbehaving callback that raises is logged
and skipped — the remaining callbacks still fire."
    
    def cancel_signal_to(
            self,
            callback: Callable[[], None],  # Cancellation callback scoped to the with-block
        ) -> Generator[None, None, None]
        "Context manager registering `callback` for the duration of the with-block.

Useful for binding cancellation to a finite-scope resource (a subprocess,
a network request, a temporary file handle) without needing to
deregister manually. Pairs with `register_cancel_callback` for cases
where lifetime is tied to a `try:`/`finally:` block.

Example:

    def execute(self, *args, **kwargs):
        proc = subprocess.Popen(...)
        with self.cancel_signal_to(lambda: proc.terminate()):
            return proc.wait()"
    
    def on_disable(self) -> None:
            """CR-2: signal that the substrate has marked this capability as disabled.
            
            Worker stays alive; capability can release heavy resources here (e.g., free
            GPU memory, close model files). The substrate fires this hook AFTER any
            in-flight job for this capability finishes — see CapabilityManager.disable_capability
            deferred-hook semantics. Default: no-op; capabilities opt in by overriding.
            """
            pass
    
        def on_enable(self) -> None
        "CR-2: signal that the substrate has marked this capability as disabled.

Worker stays alive; capability can release heavy resources here (e.g., free
GPU memory, close model files). The substrate fires this hook AFTER any
in-flight job for this capability finishes — see CapabilityManager.disable_capability
deferred-hook semantics. Default: no-op; capabilities opt in by overriding."
    
    def on_enable(self) -> None:
            """CR-2: signal that the substrate has marked this capability as re-enabled.
            
            Capability can eagerly re-acquire heavy resources here, or rely on lazy
            re-acquisition via the next execute() call (substrate doesn't prefer
            one strategy over the other). Default: no-op; capabilities opt in by overriding.
            """
            pass
    
        def report_progress(
            self,
            progress: float, # 0.0 to 1.0, or -1.0 for indeterminate
            message: str = "" # Descriptive status message
        ) -> None
        "CR-2: signal that the substrate has marked this capability as re-enabled.

Capability can eagerly re-acquire heavy resources here, or rely on lazy
re-acquisition via the next execute() call (substrate doesn't prefer
one strategy over the other). Default: no-op; capabilities opt in by overriding."
    
    def report_progress(
            self,
            progress: float, # 0.0 to 1.0, or -1.0 for indeterminate
            message: str = "" # Descriptive status message
        ) -> None
        "Report execution progress. Call during execute() to update status."
    
    def report_usage(
            self,
            usage: Dict[str, float],  # Measured usage for this execute, keyed by capability-defined unit name
        ) -> None
        "SG-54: report measured API/service usage for the current execute() call.

Unit-agnostic by design — the capability (which holds the API response)
supplies whatever unit names it measures: {"input_tokens": .., "output_tokens": ..}
for an LLM, {"pages": ..} for OCR, {"characters": ..} for TTS,
{"credits"/"requests"/"minutes": ..} for others. The substrate stores +
accumulates per unit name WITHOUT interpreting them (summed across runs in
the empirical store's api_usage_totals). Pricing is deliberately NOT here
(volatile, per-service, often not API-accessible) — a consumer-side rate
table turns raw units into cost. "Derive from behaviour": the capability
MEASURES actual usage from the response; the substrate aggregates.

Stored on `self._last_api_usage`; the worker exposes it via /stats and the
substrate folds it into the post-execute ResourceSample. The worker resets
it before each execute so a failed/usage-less call can't inherit stale
usage. Default: store-only (parallel to report_progress)."
```

``` python
class _CR4MinimalCapability(ToolCapability):
    "Concrete capability satisfying abstracts; relies on CR-4 default cleanup()."
    
    def name(self) -> str: return "cr4-minimal"
        @property
        def version(self) -> str: return "0.0.0"
    
    def version(self) -> str: return "0.0.0"
        def initialize(self, config=None): self._cfg = dict(config or {})
    
    def initialize(self, config=None): self._cfg = dict(config or {})
        def execute(self, *args, **kwargs): return None
    
    def execute(self, *args, **kwargs): return None
        def get_config_schema(self) -> Dict[str, Any]: return {}
    
    def get_config_schema(self) -> Dict[str, Any]: return {}
        def get_current_config(self) -> Dict[str, Any]: return dict(getattr(self, "_cfg", {}))
    
    def get_current_config(self) -> Dict[str, Any]: return dict(getattr(self, "_cfg", {}))
```

#### Variables

``` python
RELOAD_TRIGGER = 'reload_trigger'
WORKER_ENV_TEMPLATE_PLACEHOLDERS: Set[str]
```

### cli (`cli.ipynb`)

> CLI tool for declarative capability management

#### Import

``` python
from cjm_substrate.cli import (
    app,
    main,
    setup_runtime,
    run_cmd,
    regenerate_manifest,
    generate_adapter_manifest,
    install_all,
    setup_host,
    list_capabilities,
    logs_command,
    retention_command,
    remove_capability,
    validate_file,
    set_secret,
    list_secrets
)
```

#### Functions

``` python
def main(
    ctx:typer.Context,
    cjm_config:Annotated[Optional[Path], typer.Option(
        "--cjm-config",
        help="Path to cjm.yaml configuration file"
    )]=None,
    data_dir:Annotated[Optional[Path], typer.Option(
        "--data-dir",
        help="Override data directory (manifests, logs)"
    )]=None,
    conda_prefix:Annotated[Optional[Path], typer.Option(
        "--conda-prefix",
        help="Override conda/mamba prefix path"
    )]=None,
    conda_type:Annotated[Optional[str], typer.Option(
        "--conda-type",
        help="Conda implementation: micromamba, miniforge, or conda"
    )]=None,
) -> None
    "cjm-substrate CLI for managing isolated capability environments."
```

``` python
def setup_runtime(
    force:bool=typer.Option(False, "--force", "-f", help="Re-download even if binary exists")
) -> None
    "Download and setup micromamba runtime for project-local mode."
```

``` python
def _check_runtime_available() -> None:
    """Check if the configured conda runtime is available, exit with helpful message if not."""
    cfg = get_config()
    
    if not ensure_runtime_available(cfg)
    "Check if the configured conda runtime is available, exit with helpful message if not."
```

``` python
def _get_conda_cmd_str() -> str
    "Get the conda/micromamba command string for shell commands."
```

``` python
def _download_url_to_temp(
    url: str,  # URL to download
    suffix: str = ".yml"  # File suffix for temp file
) -> Optional[Path]:  # Path to temp file or None if failed
    "Download a URL to a temporary file. Returns None if download fails."
```

``` python
def _resolve_env_file(
    env_file: str  # Path or URL to environment file
) -> tuple[str, Optional[Path]]:  # (resolved_path, temp_file_to_cleanup)
    """
    Resolve env_file to a local path, downloading if it's a URL.
    
    Returns (local_path, temp_file) where temp_file is set if we created
    a temporary file that should be cleaned up later.
    """
```

``` python
def run_cmd(
    cmd: str,  # Shell command to execute
    check: bool = True  # Whether to raise on non-zero exit
) -> None
    """
    Run a shell command and stream output.
    
    Uses the platform's default shell (no hardcoded /bin/bash).
    """
```

``` python
def _generate_manifest(
    env_name: str,  # Name of the Conda environment
    package_name: str,  # Package source string (git URL or package name)
    manifest_dir: Path  # Directory to write manifest JSON files
) -> Optional[Path]:  # Path to the manifest written, or None on failure
    """
    Run introspection script inside the target env and write a v2.0 manifest.
    
    Builds a `ManifestV2` from the introspection output + install-time markers,
    computes `drift_tracking.config_schema_hash`, and writes via
    `write_manifest` (nested layout, indent=2). Both `installed_at` and
    `regenerated_at` are set to "now"; `regenerate_manifest` preserves the
    original `installed_at` via post-write fix-up.
    """
```

``` python
def regenerate_manifest(
    capability_name: str = typer.Argument(..., help="Capability name as it appears in the manifest"),
    capabilities_path: Optional[str] = typer.Option(
        None, "--capabilities",
        help="Path to capabilities.yaml for package_source recovery (legacy manifests)",
    ),
    package: Optional[str] = typer.Option(
        None, "--package",
        help="Package spec override (e.g., git URL or pip name); wins over manifest/capabilities.yaml lookups",
    ),
) -> None
    """
    Re-run introspection for an installed capability and rewrite its manifest.
    
    Reads the existing manifest via `load_manifest`, recovers `env_name` +
    `package_source` from the install section, runs `_generate_manifest` to
    refresh the code section, then post-writes to preserve the original
    `installed_at` so the regenerate only updates `regenerated_at` semantically.
    Always emits v2.0 layout.
    """
```

``` python
def _generate_adapter_manifest(
    """
    Introspect a task-adapter impl in-env and write its adapter manifest (CR-17 pt 2).
    
    The non-typer core shared by the `generate-adapter-manifest` command and
    `install-all`'s per-capability `adapters:` entries (stage 6 J10 -- the I6/J8
    install-pipeline gap: adapter installation + registration ride the SAME
    pipeline as capability installation, never manual afterthoughts).
    Raises ValueError on a malformed target and CalledProcessError when the
    in-env introspection fails; callers decide the exit posture.
    """
```

``` python
def generate_adapter_manifest(
    env_name: str = typer.Argument(..., help="Conda env containing the adapter impl (the tool's worker env)"),
    target: str = typer.Argument(..., help="Adapter impl spec 'module:ClassName'"),
)
    """
    CR-17 pt 2 (stage 4): introspect a task-adapter impl in-env and write its adapter manifest.
    
    The adapter manifest is the REGISTRATION unit (pass-2 Thread 3): task_name
    + required_tool_protocol members (names + parameter lists + signatures)
    recorded IN-ENV where the protocol is importable, so host-side
    compatibility matching works against UNLOADED capabilities with zero
    protocol imports host-side. Written to the same manifests dir capability
    manifests live in; `discover_manifests()` routes by the `unit` key.
    
    Thin typer wrapper over `_generate_adapter_manifest` (stage 6 J10:
    install-all runs the same core for per-capability `adapters:` entries).
    """
```

``` python
def _conda_env_exists_configured(
    env_name: str  # Name of the conda environment
) -> bool:  # True if environment exists
    "Check if conda environment exists using configured conda command."
```

``` python
def install_all(
    capabilities_path:Optional[str]=typer.Option(None, "--capabilities", help="Path to capabilities.yaml (default: cjm.yaml capabilities_config)"),
    substrate_source:str=typer.Option("cjm-substrate", "--substrate-source", help="Substrate package spec installed into every worker env (default: published; pass a path or '-e <path>' for local-editable dev)"),
    force:bool=typer.Option(False, help="Force recreation of environments")
) -> None
    """
    Install and register all capabilities defined in capabilities.yaml.
    
    Per-capability `adapters:` entries ride the same pipeline (stage 6 J10; closes
    the I6/J8 manual-step gap): each entry's `lib` is pip-installed into the
    worker env alongside the interface libs, and each `impl`
    ('module:ClassName') gets its adapter manifest generated right after the
    capability manifest -- INSTALL puts code in envs, REGISTRATION is per-unit
    manifests (pass-2 Thread 3), one command does both.
    """
```

``` python
def setup_host(
    capabilities_path:str=typer.Option("capabilities.yaml", "--capabilities", help="Path to capabilities.yaml file"),
    yes:bool=typer.Option(False, "--yes", "-y", help="Skip confirmation prompt")
) -> None
    "Install interface libraries in the current Python environment."
```

``` python
def _format_size(
    size_bytes: int  # Size in bytes
) -> str:  # Human-readable size string
    "Format bytes as human-readable string."
```

``` python
def _get_pypi_size(
    package_spec: str  # Package name or git URL
) -> tuple[int, str]:  # (size_bytes, package_name)
    """
    Query PyPI for package download size.
    
    SG-37: PyPI normalizes package names to dash-form per PEP 503, so we try
    the dash form first and fall back to the underscore form. Without this,
    cjm-* packages (whose repo names contain dashes) silently 404 because
    the old heuristic only tried the underscored form.
    """
```

``` python
def _estimate_conda_size(
    env_file: str,  # Path or URL to environment.yml
    env_name: str  # Target environment name
) -> tuple[int, int]:  # (total_bytes, package_count)
    "Estimate conda package sizes using dry-run."
```

``` python
def _estimate_pip_sizes(
    packages: list[str]  # List of pip package specs
) -> tuple[int, int, list[tuple[str, int]]]:  # (total_bytes, found_count, [(name, size), ...])
    "Estimate pip package sizes from PyPI."
```

``` python
def _get_conda_envs() -> set[str]: # Set of existing conda environment names
    """Get list of existing conda environment names using configured conda command."""
    cfg = get_config()
    cmd_parts = build_conda_command(cfg, "env", "list", "--json")
    
    try
    "Get list of existing conda environment names using configured conda command."
```

``` python
def _get_installed_manifests(
    manifest_dir:Optional[Path]=None # Directory to scan (uses config default if None)
) -> "list[ManifestV2]": # Typed capability manifests (adapter manifests skipped)
    """
    Load installed capability manifests as typed `ManifestV2` objects.
    
    Adapter manifests (routed by the `unit` discriminator) are skipped;
    unreadable or unrecognized-format files are silently ignored.
    """
```

``` python
def _extract_env_from_python_path(
    python_path:str # Path like /home/user/miniforge3/envs/my-env/bin/python
) -> str: # Extracted environment name or empty string
    "Extract conda environment name from python_path."
```

``` python
def list_capabilities(
    capabilities_path:Optional[str]=typer.Option(None, "--capabilities", help="Path to capabilities.yaml for cross-reference"),
    show_envs:bool=typer.Option(False, "--envs", "-e", help="Show conda environment status")
) -> None
    "List installed capabilities from manifest directory."
```

``` python
def _fmt_short(value: Optional[str], width: int = 8) -> str:
    """First `width` chars of an id, or '-' for None (display only)."""
    return (value or "-")[:width]
    "First `width` chars of an id, or '-' for None (display only)."
```

``` python
def _compact_payload(payload: Dict[str, Any], max_len: int = 160) -> str:
    """One-line payload rendering; the bulky job_snapshot collapses to its status."""
    d = dict(payload or {})
    snap = d.get("job_snapshot")
    if isinstance(snap, dict)
    "One-line payload rendering; the bulky job_snapshot collapses to its status."
```

``` python
def logs_command(
    job:Annotated[Optional[str], typer.Option("--job", help="Filter to one queue job id (exact)")]=None,
    run:Annotated[Optional[str], typer.Option("--run", help="Filter to one host run id (implies --journal)")]=None,
    session:Annotated[Optional[str], typer.Option("--session", help="Filter to one worker session id")]=None,
    level:Annotated[Optional[str], typer.Option("--level", help="Diagnostics level filter (e.g. WARNING)")]=None,
    journal:Annotated[bool, typer.Option("--journal", help="Show the journal (account-of-action) instead of diagnostics records")]=False,
    chunks:Annotated[bool, typer.Option("--chunks", help="Show raw stream chunks (death-rattle floor)")]=False,
    limit:Annotated[int, typer.Option("--limit", "-n", help="Most recent N entries (0 = all)")]=50,
    follow:Annotated[bool, typer.Option("--follow", "-f", help="Poll for new entries (Ctrl-C to stop)")]=False,
) -> None
    """
    Tail / follow the observability stores (CR-14).
    
    Default view: structured diagnostics records (worker logger output,
    EXACTLY job-stamped via the call envelope). `--chunks`: the raw stream
    pump. `--journal`: the durable account-of-action (job lifecycle, worker
    spawn/death, admission, config, runs, worker-reported accounts).
    `--follow` polls the store's seq cursor — exact, no byte offsets.
    """
```

``` python
def retention_command(
    max_age_days:Annotated[Optional[float], typer.Option(
        "--max-age-days", help="Delete diagnostics rows older than this (overrides cjm.yaml)")]=None,
    max_total_mb:Annotated[Optional[float], typer.Option(
        "--max-total-mb", help="Delete oldest rows until diagnostics.db is under this budget")]=None,
) -> None
    """
    Apply the diagnostics retention policy now (CR-14).
    
    The explicit half of the invocation policy (CapabilityManager's startup
    sweep is the automatic half). Defaults come from `cjm.yaml`'s
    `substrate.diagnostics_retention_days` / `diagnostics_retention_max_mb`.
    The JOURNAL is never touched — it has no retention surface by design.
    """
```

``` python
def remove_capability(
    capability_name:str=typer.Argument(..., help="Name of the capability to remove"),
    capabilities_path:Optional[str]=typer.Option(None, "--capabilities", help="Path to capabilities.yaml for env name lookup"),
    keep_env:bool=typer.Option(False, "--keep-env", help="Keep the conda environment, only remove manifest"),
    yes:bool=typer.Option(False, "--yes", "-y", help="Skip confirmation prompt")
) -> None
    "Remove a capability's manifest and conda environment."
```

``` python
def _validate_resources_block(
    res: Any,  # resources sub-dict (may be None or non-dict; we type-check here)
    path_prefix: str,  # Error message prefix
) -> List[str]
    "Phase 5a: type-check the resources block. Shared between v1.0 and v2.0."
```

``` python
def _validate_manifest_v2_dict(
    data: Dict[str, Any]  # v2.0 nested manifest dict (caller already verified format_version)
) -> List[str]:  # Empty list = valid
    """
    CR-8: validate the nested v2.0 manifest layout.
    
    Required sections: `install` + `code`. Optional sections: `drift_tracking`
    (substrate emits it on every fresh write but legacy-via-load_manifest
    upgrades leave it empty until the first regenerate); `overrides` (free-form
    operator overlay).
    
    Required `code.*` fields mirror v1.0's required top-level fields. Required
    `install.python_path` mirrors v1.0's required top-level `python_path`.
    """
```

``` python
def _validate_manifest_dict(
    data: Any  # Loaded manifest JSON
) -> List[str]:  # List of human-readable error messages (empty == valid)
    """
    SG-6 + CR-8: structural validation, dispatching on `format_version`.
    
    `format_version == "2.0"` validates the nested v2.0 layout. Any other
    value (including a missing field — the legacy v1.0 flat shim was removed
    at SG-48) rejects with a single error so unknown formats fail loud rather
    than silently degrading.
    """
```

``` python
def _validate_capabilities_yaml_dict(
    data: Any  # Loaded capabilities.yaml content
) -> List[str]:  # List of human-readable error messages (empty == valid)
    """
    Structural validation of a capabilities.yaml file.
    
    Each capability entry must have name + env_name + package, plus either env_file
    or python_version (one defines how the conda env is created).
    """
```

``` python
def _collect_manifest_warnings(
    data: Any  # Loaded manifest JSON
) -> List[str]:  # Human-readable warning strings (non-failing lints)
    """
    T23: non-failing manifest lints (warnings, not errors).
    
    - V4: a single-element `enum` in a config_schema property offers no operator
          choice — the field should be dropped or its domain expanded.
    - V12: quantitative resource fields (`min_gpu_vram_mb` / `recommended_gpu_vram_mb`
          / `min_system_ram_mb`) were dropped by the CR-7 reactive-resource reframe;
          the substrate ignores them, so they are stale dead data.
    
    Resolves the resources/config_schema location for both v2.0 (nested under
    `code`) and legacy v1.0 (flat) layouts. The `validate` command prints these
    without exiting non-zero (warnings alone don't fail validation).
    """
```

``` python
def _lint_capability_logging(
    path: Path  # A capability .py file or package directory to scan
) -> tuple:  # (errors, warnings) — lists of human-readable findings
    """
    T23 (CR-14): lint capability source for `logging.basicConfig` calls.
    
    The substrate installs the worker's root handler
    (`install_worker_diagnostics`) before capability code runs. A capability calling
    `logging.basicConfig(force=True)` DESTROYS that handler (every
    subsequent record silently bypasses the diagnostics store) -> ERROR.
    A plain `basicConfig` call is a no-op once a handler exists — a fragile
    pre-CR-14 idiom that suggests the capability expects to own process logging
    -> WARNING. Directories scan their tree, skipping hidden dirs and
    `tests_manual`/`_proc` (host-side scripts own their own logging).
    """
```

``` python
def _detect_manifest_format(
    path: Path  # File to inspect
) -> Optional[str]:  # 'manifest' | 'capabilities_yaml' | None
    "Auto-detect format: extension for files; directories lint as source."
```

``` python
def validate_file(
    path:Path=typer.Argument(..., help="Manifest JSON, capabilities.yaml, or capability source (.py / package dir) to validate"),
    format:Optional[str]=typer.Option(
        None, "--format", "-f",
        help="Override format detection: 'manifest', 'capabilities_yaml', or 'source'",
    ),
) -> None
    """
    SG-6 + T23: validate a manifest / capabilities.yaml / capability source.
    
    Auto-detects format from the path (`.json` → manifest, `.yaml`/`.yml` →
    capabilities.yaml, `.py` or a directory → source lint). The source lint is
    the CR-14 `logging.basicConfig` gate: `force=True` is an ERROR (it
    destroys the substrate diagnostics handler), a plain call is a WARNING.
    Exits non-zero with a list of validation errors if any check fails.
    """
```

``` python
def _open_secret_store():
    """Open the project-local LocalSecretStore at <data_dir>/secrets (CR-12)."""
    from cjm_substrate.core.secret_store import LocalSecretStore
    cfg = get_config()
    data_dir = getattr(cfg, "data_dir", None)
    secrets_dir = (data_dir / "secrets") if data_dir is not None else None
    return LocalSecretStore(secrets_dir)


@app.command("set-secret")
def set_secret(
    capability_name: str = typer.Argument(..., help="Capability name (manifest 'name', e.g. my-api-capability)"),
    key: str = typer.Argument(..., help="Secret key = the env-var name the worker reads (e.g. MY_API_KEY)"),
    value: Optional[str] = typer.Option(None, "--value", help="Secret value (omit to be prompted with hidden input)"),
    scope: Optional[str] = typer.Option(None, "--scope", help="Reserved multi-user scope (default: single-user)"),
)
    "Open the project-local LocalSecretStore at <data_dir>/secrets (CR-12)."
```

``` python
def set_secret(
    capability_name: str = typer.Argument(..., help="Capability name (manifest 'name', e.g. my-api-capability)"),
    key: str = typer.Argument(..., help="Secret key = the env-var name the worker reads (e.g. MY_API_KEY)"),
    value: Optional[str] = typer.Option(None, "--value", help="Secret value (omit to be prompted with hidden input)"),
    scope: Optional[str] = typer.Option(None, "--scope", help="Reserved multi-user scope (default: single-user)"),
)
    """
    Store a capability secret in the project-local SecretStore (CR-12).
    
    The value is written to <data_dir>/secrets/secrets.json (0600) — never to
    capabilities.yaml, manifests, or the config store. Capabilities read it from their
    worker env at spawn. Omit --value to be prompted (hidden input) so the
    secret stays out of shell history. After setting, reload the capability (or
    restart the host) so its worker respawns with the new env — the GUI /
    CapabilityManager.set_capability_secret do this automatically.
    """
```

``` python
def list_secrets(
    capability_name: str = typer.Argument(..., help="Capability name to list secret KEY NAMES for"),
    scope: Optional[str] = typer.Option(None, "--scope", help="Reserved multi-user scope"),
)
    "List the secret KEY NAMES stored for a capability — never the values (CR-12)."
```

#### Variables

``` python
_PYPI_404_CACHE: set[str]
```

### Configuration (`config.ipynb`)

> Project-level configuration for paths, runtime settings, and
> environment management

#### Import

``` python
from cjm_substrate.core.config import (
    RuntimeMode,
    CondaType,
    RuntimeConfig,
    SubstrateConfig,
    CJMConfig,
    load_config,
    get_config,
    set_config,
    reset_config
)
```

#### Functions

``` python
def _load_from_yaml(
    yaml_path:Path # Path to cjm.yaml file
) -> CJMConfig: # Parsed configuration
    "Load config from YAML file, resolving relative paths."
```

``` python
def load_config(
    config_path:Optional[Path]=None, # CLI --cjm-config
    data_dir:Optional[Path]=None, # CLI --data-dir
    conda_prefix:Optional[Path]=None, # CLI --conda-prefix
    conda_type:Optional[str]=None # CLI --conda-type
) -> CJMConfig: # Resolved configuration
    "Load config with layered resolution (CLI > env vars > yaml > defaults)."
```

``` python
def get_config() -> CJMConfig: # Current configuration
    """Get current config (loads defaults if not set)."""
    global _current_config
    if _current_config is None
    "Get current config (loads defaults if not set)."
```

``` python
def set_config(
    config:CJMConfig # Configuration to set as current
) -> None
    "Set current config (called by CLI callback)."
```

``` python
def reset_config() -> None
    "Reset to unloaded state (for testing)."
```

#### Classes

``` python
class RuntimeMode(str, Enum):
    "Runtime mode for the capability system."
```

``` python
class CondaType(str, Enum):
    "Type of conda implementation to use."
```

``` python
@dataclass
class RuntimeConfig:
    "Runtime environment configuration."
    
    mode: RuntimeMode = RuntimeMode.SYSTEM  # LOCAL for project-local, SYSTEM for global
    conda_type: CondaType = CondaType.CONDA  # Conda implementation to use
    prefix: Optional[Path]  # Path to runtime directory (LOCAL mode only)
    binaries: Dict[str, Path] = field(...)  # Platform-specific binary paths
```

``` python
@dataclass
class SubstrateConfig:
    """
    Substrate behavior toggles.
    
    Loaded from the `substrate:` section of `cjm.yaml`. Each flag gates a
    substrate-wide behavior that hosts can disable when they don't want the
    per-load or per-execute cost.
    
    - `drift_detection` (CR-8): per-load `/config_schema` HTTP call + hash
      comparison against the manifest's stored hash. CapabilityManager's load
      path branches around `_check_config_schema_drift` when False.
    - `empirical_tracking` (CR-7): per-execute resource sample recording into
      `EmpiricalResourceStore`. CapabilityManager skips `record_sample` calls when
      False; the store's lazy-init also short-circuits.
    - `prefetch_stall_threshold_seconds` (CR-4 / Session A 2026-05-27): how long
      proxy.prefetch waits with no observed progress (via `/progress` polling)
      before declaring a stall. Replaces per-capability wall-clock timeouts —
      operators no longer race network speed against an arbitrary value. Capabilities
      defeat the stall counter by calling `self.report_progress(...)` periodically
      during long lifecycle operations (model download / vLLM server startup).
      Default 60 s; bump higher for capabilities that don't report progress, or lower
      if false-positive stalls are noisy.
    """
    
    drift_detection: bool = True  # Run /config_schema hash compare on every load_capability
    empirical_tracking: bool = True  # Record ResourceSample after every execute_capability*
    prefetch_stall_threshold_seconds: float = 60.0  # CR-4 / Session A: stall detection threshold for proxy.prefetch
    diagnostics_retention_days: float = 30.0  # CR-14 follow-up: age-based diagnostics retention; <=0 disables the startup sweep
    diagnostics_retention_max_mb: Optional[float]  # CR-14 follow-up: diagnostics.db size budget (None = no size-based deletion)
```

``` python
@dataclass
class CJMConfig:
    "Main configuration for cjm-substrate."
    
    runtime: RuntimeConfig = field(...)  # Runtime environment settings
    data_dir: Path = field(...)  # Base directory for manifests, logs
    capabilities_config: Path = field(...)  # Path to capabilities.yaml file
    models_dir: Optional[Path]  # Directory for model downloads
    substrate: SubstrateConfig = field(...)  # CR-8 substrate behavior toggles
    
    def manifests_dir(self) -> Path: # Directory containing capability manifests
            """Directory containing capability manifests."""
            return self.data_dir / "manifests"
    
        @property
        def capability_data_dir(self) -> Path: # Directory for capability runtime data
        "Directory containing capability manifests."
    
    def capability_data_dir(self) -> Path: # Directory for capability runtime data
            """Directory for capability runtime data (databases, caches)."""
            return self.data_dir / "data"
    
        @property
        def journal_db_path(self) -> Path: # Journal store (CR-14: durable account-of-action)
        "Directory for capability runtime data (databases, caches)."
    
    def journal_db_path(self) -> Path: # Journal store (CR-14: durable account-of-action)
            """Journal store path — the precious, host-written observability record."""
            return self.data_dir / "journal.db"
    
        @property
        def diagnostics_db_path(self) -> Path: # Diagnostics store (CR-14: disposable narrative)
        "Journal store path — the precious, host-written observability record."
    
    def diagnostics_db_path(self) -> Path: # Diagnostics store (CR-14: disposable narrative)
            """Diagnostics store path — worker records + raw stream chunks; retention-managed."""
            return self.data_dir / "diagnostics.db"
    
        @property
        def conda_binary_path(self) -> Optional[Path]: # Path to conda/micromamba binary or None
        "Diagnostics store path — worker records + raw stream chunks; retention-managed."
    
    def conda_binary_path(self) -> Optional[Path]: # Path to conda/micromamba binary or None
            """Get the configured binary path for the current platform."""
            # Inline platform detection to avoid circular imports
            system = platform_mod.system().lower()
            machine = platform_mod.machine().lower()
            
            if system == "windows"
        "Get the configured binary path for the current platform."
```

#### Variables

``` python
_current_config: Optional[CJMConfig] = None
```

### Capability Config Store (`config_store.ipynb`)

> Persistent storage for per-capability configuration (with enabled
> flag)

#### Import

``` python
from cjm_substrate.core.config_store import (
    CapabilityConfigRecord,
    CapabilityConfigStore,
    LocalCapabilityConfigStore
)
```

#### Functions

``` python
def _default_db_path() -> Path:
    """Default SQLite location: `~/.cjm/capability_configs.db`."""
    return Path.home() / ".cjm" / "capability_configs.db"


class LocalCapabilityConfigStore
    "Default SQLite location: `~/.cjm/capability_configs.db`."
```

``` python
@patch
@contextmanager
def _conn(self:LocalCapabilityConfigStore) -> Iterator[sqlite3.Connection]:
    """Open a connection, creating parent dirs + schema on demand."""
    self.db_path.parent.mkdir(parents=True, exist_ok=True)
    conn = sqlite3.connect(self.db_path)
    try
    "Open a connection, creating parent dirs + schema on demand."
```

``` python
@patch
def get(
    self:LocalCapabilityConfigStore,
    capability_name: str  # Capability to look up
) -> Optional[CapabilityConfigRecord]:  # Persisted record or None if absent
    "Fetch the record for a capability."
```

``` python
@patch
def set(
    self:LocalCapabilityConfigStore,
    capability_name: str,  # Capability to write
    record: CapabilityConfigRecord  # New record (updated_at overwritten with current time)
) -> None
    "Persist a record. Stamps `updated_at` to the current time."
```

``` python
@patch
def delete(
    self:LocalCapabilityConfigStore,
    capability_name: str  # Capability to remove
) -> bool:  # True if a row was deleted
    "Remove the record for a capability."
```

``` python
@patch
def list_all(self:LocalCapabilityConfigStore) -> Dict[str, CapabilityConfigRecord]:  # capability_name -> record
    """Return all stored records keyed by capability name."""
    if not self.db_path.exists()
    "Return all stored records keyed by capability name."
```

#### Classes

``` python
@dataclass
class CapabilityConfigRecord:
    "Persisted state for a capability: config dict + enabled flag."
    
    config: Dict[str, Any] = field(...)  # Capability's current config values
    enabled: bool = True  # Whether the substrate should accept jobs for this capability
    updated_at: float = 0.0  # Unix timestamp of the last write (server clock)
```

``` python
@runtime_checkable
class CapabilityConfigStore(Protocol):
    "Protocol for persisting per-capability `CapabilityConfigRecord` across sessions."
    
    def get(self, capability_name: str) -> Optional[CapabilityConfigRecord]:
            """Fetch the record for a capability, or None if no record exists yet."""
            ...
        
        def set(self, capability_name: str, record: CapabilityConfigRecord) -> None
        "Fetch the record for a capability, or None if no record exists yet."
    
    def set(self, capability_name: str, record: CapabilityConfigRecord) -> None:
            """Persist a record. Overwrites any prior record for the same capability.
            
            Implementations stamp `record.updated_at` to the current time during
            the write so callers don't have to manage timestamps.
            """
            ...
        
        def delete(self, capability_name: str) -> bool
        "Persist a record. Overwrites any prior record for the same capability.

Implementations stamp `record.updated_at` to the current time during
the write so callers don't have to manage timestamps."
    
    def delete(self, capability_name: str) -> bool:
            """Remove the record for a capability. Returns True if a record was deleted."""
            ...
        
        def list_all(self) -> Dict[str, CapabilityConfigRecord]
        "Remove the record for a capability. Returns True if a record was deleted."
    
    def list_all(self) -> Dict[str, CapabilityConfigRecord]
        "Return every stored record, keyed by capability name."
```

``` python
class LocalCapabilityConfigStore:
    def __init__(self, db_path: Optional[Path] = None)
    """
    SQLite-backed default implementation of `CapabilityConfigStore`.
    
    The DB is created lazily on first write. Reads against a non-existent DB
    return empty results rather than raising, so hosts can call `.get()` on
    a fresh install without preparing the file first.
    """
    
    def __init__(self, db_path: Optional[Path] = None)
        "Initialize the store. `db_path=None` uses `~/.cjm/capability_configs.db`."
```

#### Variables

``` python
_SCHEMA = '\nCREATE TABLE IF NOT EXISTS capability_configs (\n    capability_name TEXT PRIMARY KEY,\n    config_json TEXT NOT NULL,\n    enabled INTEGER NOT NULL DEFAULT 1,\n    updated_at REAL NOT NULL\n)\n'
```

### Diagnostics Store (`diagnostics_store.ipynb`)

> CR-14 (stage 7): the disposable diagnostic-narrative class.
> Worker-written structured log records (substrate handler stamps
> contextvars identity — authors never supply attribution) + the
> host-pumped raw stream chunks (the zero-cooperation death-rattle
> floor). Retention is a QUERY, not file mechanics. Design ledger:
> `claude-docs/stage-7-evidence.md`.

#### Import

``` python
from cjm_substrate.core.diagnostics_store import (
    DiagnosticRecord,
    StreamChunk,
    DiagnosticsStore,
    LocalDiagnosticsStore,
    DiagnosticsLogHandler,
    install_worker_diagnostics,
    normalize_stream_line
)
```

#### Functions

``` python
@patch
@contextmanager
def _conn(self: LocalDiagnosticsStore) -> Iterator[sqlite3.Connection]:
    """Yield the persistent connection under the instance lock (lazy init:
    parent dirs + WAL + schema on first use).

    Same shape + rationale as `LocalJournalStore._conn` (stage-7 stress
    catch: per-call close = WAL checkpoint = ~16 ms/append). Disposable
    class on the WORKER hot path — `synchronous=NORMAL` is plenty.
    """
    with self._lock
    """
    Yield the persistent connection under the instance lock (lazy init:
    parent dirs + WAL + schema on first use).
    
    Same shape + rationale as `LocalJournalStore._conn` (stage-7 stress
    catch: per-call close = WAL checkpoint = ~16 ms/append). Disposable
    class on the WORKER hot path — `synchronous=NORMAL` is plenty.
    """
```

``` python
@patch
def append_record(
    self: LocalDiagnosticsStore,
    record: DiagnosticRecord,  # Structured record to persist
) -> int:  # Store-assigned seq
    "Persist one structured record."
```

``` python
@patch
def append_chunk(
    self: LocalDiagnosticsStore,
    chunk: StreamChunk,  # Raw stream line to persist
) -> int:  # Store-assigned seq
    "Persist one raw stream line."
```

``` python
@patch
def query_records(
    self: LocalDiagnosticsStore,
    job_id: Optional[str] = None,  # EXACT job correlation (stamped at write)
    worker_session_id: Optional[str] = None,  # Session scope
    level: Optional[str] = None,  # Level name filter
    after_seq: Optional[int] = None,  # Tail cursor
    limit: Optional[int] = None,  # Max rows
    descending: bool = False,  # True = newest first
) -> List[DiagnosticRecord]:  # Matching records, seq-ordered
    "Filtered structured-record read."
```

``` python
@patch
def query_chunks(
    self: LocalDiagnosticsStore,
    worker_session_id: Optional[str] = None,  # Session scope
    after_seq: Optional[int] = None,  # Tail cursor
    limit: Optional[int] = None,  # Max rows
    descending: bool = False,  # True = newest first
) -> List[StreamChunk]:  # Matching chunks, seq-ordered
    "Raw stream read, session-scoped."
```

``` python
@patch
def apply_retention(
    self: LocalDiagnosticsStore,
    max_age_days: Optional[float] = None,  # Delete rows older than this
    max_total_mb: Optional[float] = None,  # Delete oldest rows until DB under budget
) -> Dict[str, int]:  # {'records_deleted': n, 'chunks_deleted': m}
    """
    Retention as a QUERY (the CR-14 reframe's mechanical payoff).
    
    Age first, then size: oldest rows (both tables, interleaved by ts)
    deleted in batches until the DB file is under budget. Safe against
    concurrent writers (WAL; each batch is its own transaction).
    """
```

``` python
def install_worker_diagnostics() -> Optional[DiagnosticsLogHandler]:
    """Configure worker-process logging (replaces the old `basicConfig`).

    Env contract (injected by the proxy at spawn):
    - `CJM_DIAGNOSTICS_DB`: diagnostics store path -> install the handler.
    - `CJM_WORKER_SESSION_ID`: spawn-scoped session id stamped on records.
    - `CJM_LOG_LEVEL`: operator level control (default INFO) — the old
      worker hardcoded INFO with no surface.

    Without `CJM_DIAGNOSTICS_DB` (standalone/dev import) falls back to the
    pre-CR-14 stdout `basicConfig` so nothing changes for direct runs.
    Returns the installed handler (None on fallback).
    """
    level_name = os.environ.get("CJM_LOG_LEVEL", "INFO").upper()
    level = getattr(logging, level_name, logging.INFO)
    db_path = os.environ.get("CJM_DIAGNOSTICS_DB")
    if not db_path
    """
    Configure worker-process logging (replaces the old `basicConfig`).
    
    Env contract (injected by the proxy at spawn):
    - `CJM_DIAGNOSTICS_DB`: diagnostics store path -> install the handler.
    - `CJM_WORKER_SESSION_ID`: spawn-scoped session id stamped on records.
    - `CJM_LOG_LEVEL`: operator level control (default INFO) — the old
      worker hardcoded INFO with no surface.
    
    Without `CJM_DIAGNOSTICS_DB` (standalone/dev import) falls back to the
    pre-CR-14 stdout `basicConfig` so nothing changes for direct runs.
    Returns the installed handler (None on fallback).
    """
```

``` python
def normalize_stream_line(
    raw: str,  # One decoded line (may contain \r progress frames)
) -> Optional[str]:  # Final frame, or None when nothing durable remains
    "Collapse CR progress frames to the final frame; drop empty results."
```

#### Classes

``` python
@dataclass
class DiagnosticRecord:
    "One structured worker log record (CR-14 diagnostics class)."
    
    message: str  # record.getMessage() result
    level: str = 'INFO'  # Logging level name
    logger_name: str = ''  # Logger hierarchy name (restored — flat logs dropped it)
    ts: datetime = field(...)  # tz-aware UTC
    worker_session_id: Optional[str]  # Spawn-scoped session
    job_id: Optional[str]  # EXACT correlation via contextvars (None outside a call span)
    exc_text: Optional[str]  # Formatted traceback when the record carried exc_info
    seq: Optional[int]  # Store-assigned cursor
```

``` python
@dataclass
class StreamChunk:
    "One raw stdout/stderr line the host pump captured (death-rattle floor)."
    
    content: str  # Decoded line content (tqdm CR-frames collapsed to final frame)
    ts: datetime = field(...)  # Capture time (host clock)
    worker_session_id: Optional[str]  # Session attribution (the honest unit)
    stream: str = 'stdout'  # Source stream (stderr merged into stdout today)
    seq: Optional[int]  # Store-assigned cursor
```

``` python
@runtime_checkable
class DiagnosticsStore(Protocol):
    "Protocol for the disposable diagnostic-narrative store (CR-14)."
    
    def append_record(self, record: DiagnosticRecord) -> int:
            """Persist one structured record; returns seq."""
            ...
    
        def append_chunk(self, chunk: StreamChunk) -> int
        "Persist one structured record; returns seq."
    
    def append_chunk(self, chunk: StreamChunk) -> int:
            """Persist one raw stream line; returns seq."""
            ...
    
        def query_records(
            self,
            job_id: Optional[str] = None,
            worker_session_id: Optional[str] = None,
            level: Optional[str] = None,
            after_seq: Optional[int] = None,
            limit: Optional[int] = None,
            descending: bool = False,
        ) -> List[DiagnosticRecord]
        "Persist one raw stream line; returns seq."
    
    def query_records(
            self,
            job_id: Optional[str] = None,
            worker_session_id: Optional[str] = None,
            level: Optional[str] = None,
            after_seq: Optional[int] = None,
            limit: Optional[int] = None,
            descending: bool = False,
        ) -> List[DiagnosticRecord]
        "Filtered structured-record read; `job_id` is EXACT (stamped, not sliced)."
    
    def query_chunks(
            self,
            worker_session_id: Optional[str] = None,
            after_seq: Optional[int] = None,
            limit: Optional[int] = None,
            descending: bool = False,
        ) -> List[StreamChunk]
        "Raw stream read, session-scoped."
    
    def apply_retention(
            self,
            max_age_days: Optional[float] = None,
            max_total_mb: Optional[float] = None,
        ) -> Dict[str, int]
        "Delete old rows by age and/or size budget; returns deleted counts."
```

``` python
class LocalDiagnosticsStore:
    def __init__(self, db_path: Optional[Path] = None):
        """`db_path=None` uses `~/.cjm/diagnostics.db`; workers receive the
        host's path via the `CJM_DIAGNOSTICS_DB` env var at spawn."""
        self.db_path = Path(db_path) if db_path is not None else Path.home() / ".cjm" / "diagnostics.db"
        # Persistent lock-protected connection (stage-7 stress part-1 catch;
        # see LocalJournalStore._conn): per-call open/close paid a WAL
    """
    SQLite-backed default `DiagnosticsStore` (CR-14).
    
    Many concurrent writers (workers + the host pump) -> WAL +
    busy_timeout + per-call connections (no long-held handles; safe
    from any thread). Disposable class: retention deletes are routine.
    """
    
    def __init__(self, db_path: Optional[Path] = None):
            """`db_path=None` uses `~/.cjm/diagnostics.db`; workers receive the
            host's path via the `CJM_DIAGNOSTICS_DB` env var at spawn."""
            self.db_path = Path(db_path) if db_path is not None else Path.home() / ".cjm" / "diagnostics.db"
            # Persistent lock-protected connection (stage-7 stress part-1 catch;
            # see LocalJournalStore._conn): per-call open/close paid a WAL
        "`db_path=None` uses `~/.cjm/diagnostics.db`; workers receive the
host's path via the `CJM_DIAGNOSTICS_DB` env var at spawn."
```

``` python
class DiagnosticsLogHandler:
    def __init__(
        self,
        store: DiagnosticsStore,  # Sink (LocalDiagnosticsStore in-process)
        worker_session_id: Optional[str] = None,  # Spawn-scoped session id
    )
    """
    Worker-side logging handler writing `DiagnosticRecord`s (CR-14).
    
    Thread-safe via per-call connections (the worker runs capability execute
    in an executor thread; contextvars propagate via copy_context at the
    endpoint). Never raises into application code.
    """
    
    def __init__(
            self,
            store: DiagnosticsStore,  # Sink (LocalDiagnosticsStore in-process)
            worker_session_id: Optional[str] = None,  # Spawn-scoped session id
        )
    
    def emit(self, record: logging.LogRecord) -> None:
            """Write one record; job identity from the call-envelope contextvar."""
            try
        "Write one record; job identity from the call-envelope contextvar."
```

#### Variables

``` python
_DIAGNOSTICS_SCHEMA = "\nCREATE TABLE IF NOT EXISTS records (\n    seq INTEGER PRIMARY KEY AUTOINCREMENT,\n    ts TEXT NOT NULL,\n    worker_session_id TEXT,\n    job_id TEXT,\n    level TEXT NOT NULL DEFAULT 'INFO',\n    logger_name TEXT NOT NULL DEFAULT '',\n    message TEXT NOT NULL,\n    exc_text TEXT\n);\nCREATE INDEX IF NOT EXISTS idx_records_job ON records (job_id);\nCREATE INDEX IF NOT EXISTS idx_records_session ON records (worker_session_id);\nCREATE INDEX IF NOT EXISTS idx_records_ts ON records (ts);\nCREATE TABLE IF NOT EXISTS stream_chunks (\n    seq INTEGER PRIMARY KEY AUTOINCREMENT,\n    ts TEXT NOT NULL,\n    worker_session_id TEXT,\n    stream TEXT NOT NULL DEFAULT 'stdout',\n    content TEXT NOT NULL\n);\nCREATE INDEX IF NOT EXISTS idx_chunks_session ON stream_chunks (worker_session_id);\nCREATE INDEX IF NOT EXISTS idx_chunks_ts ON stream_chunks (ts);\n"
```

### Empirical Resource Tracking (`empirical_store.ipynb`)

> Persistent store for empirically-observed resource usage per
> (instance_id, config_hash) pair. CR-7’s data foundation —
> `record_sample` is called from `CapabilityManager.execute_capability*`
> finally blocks; aggregates feed eviction-candidate selection + future
> UI hints + cost-aware retry decisions.

#### Import

``` python
from cjm_substrate.core.empirical_store import (
    compute_config_hash,
    ResourceSample,
    EmpiricalResourceRecord,
    EmpiricalResourceStore,
    LocalEmpiricalResourceStore
)
```

#### Functions

``` python
def compute_config_hash(
    """
    CR-7: hash a capability instance's effective config for empirical-record keying.
    
    Same canonicalization as CR-8's `compute_config_schema_hash` — sorted keys,
    no whitespace, `"sha256:hex"` shape. None / empty configs hash deterministically
    to the canonical-empty value so capabilities with no config still get a single
    record per instance rather than scattering across hash-of-None edge cases.
    """
```

``` python
def _default_db_path() -> Path:
    """Default SQLite location: `~/.cjm/empirical_resources.db`.
    
    Hosts using per-project `data_dir` (the intended pattern per CR-8 cascade_manifests
    docs) override this by passing `db_path=cfg.data_dir / "empirical_resources.db"`
    when constructing the store. CapabilityManager's lazy-init does this automatically.
    """
    return Path.home() / ".cjm" / "empirical_resources.db"


class LocalEmpiricalResourceStore
    """
    Default SQLite location: `~/.cjm/empirical_resources.db`.
    
    Hosts using per-project `data_dir` (the intended pattern per CR-8 cascade_manifests
    docs) override this by passing `db_path=cfg.data_dir / "empirical_resources.db"`
    when constructing the store. CapabilityManager's lazy-init does this automatically.
    """
```

``` python
@patch
@contextmanager
def _conn(self:LocalEmpiricalResourceStore) -> Iterator[sqlite3.Connection]:
    """Open a connection, creating parent dirs + schema on demand."""
    self.db_path.parent.mkdir(parents=True, exist_ok=True)
    conn = sqlite3.connect(self.db_path)
    try
    "Open a connection, creating parent dirs + schema on demand."
```

``` python
@patch
def record_sample(
    self:LocalEmpiricalResourceStore,
    instance_id: str,  # CapabilityInstance.instance_id
    capability_name: str,  # CapabilityInstance.capability_name (denormalized for filtering)
    config_hash: str,  # compute_config_hash(inst.config)
    sample: ResourceSample,  # One observation
) -> None
    """
    Fold a sample into the running aggregate. Creates a new row on first call.
    
    Welford update for each mean. Max-of-peaks for memory metrics.
    success_count incremented by 1 if sample.success else 0.
    """
```

``` python
@patch
def get_record(
    self:LocalEmpiricalResourceStore,
    instance_id: str,
    config_hash: str,
) -> Optional[EmpiricalResourceRecord]
    "Fetch the aggregated record for (instance_id, config_hash), or None."
```

``` python
@patch
def list_records(
    self:LocalEmpiricalResourceStore,
    capability_name: Optional[str] = None,
) -> List[EmpiricalResourceRecord]
    "List all records, optionally filtered to a capability."
```

``` python
@patch
def delete_record(
    self:LocalEmpiricalResourceStore,
    instance_id: str,
    config_hash: str,
) -> bool
    "Remove a record. Returns True if a row was deleted."
```

#### Classes

``` python
class ResourceSample:
    """
    Single observation captured after an execute call completes.
    
    Frozen — substrate aggregates online via Welford's algorithm; no need to
    keep raw samples around. `observed_at` is tz-aware per the CR-5 convention.
    """
```

``` python
@dataclass
class EmpiricalResourceRecord:
    "Aggregated empirical resource profile for a (instance_id, config_hash) pair."
    
    instance_id: str  # CapabilityInstance.instance_id (CR-10 multi-instance aware)
    capability_name: str  # Convenience: CapabilityInstance.capability_name; derivable but cheap to denormalize
    config_hash: str  # compute_config_hash(inst.config) at sample time
    sample_count: int  # Number of ResourceSamples folded into this record
    cpu_percent_mean: float  # Welford running mean of cpu_percent
    memory_mb_peak_max: float  # max(sample.memory_mb_peak over all samples) — worst observed
    memory_mb_peak_mean: float  # Welford running mean of sample.memory_mb_peak
    gpu_memory_mb_peak_max: float  # max(sample.gpu_memory_mb_peak over all samples)
    gpu_memory_mb_peak_mean: float  # Welford running mean of sample.gpu_memory_mb_peak
    duration_seconds_mean: float  # Welford running mean of sample.duration_seconds
    success_rate: float  # success_count / sample_count
    last_observed: datetime  # tz-aware; tracks most recent ResourceSample.observed_at
    api_usage_totals: Dict[str, float] = field(...)  # SG-54: cumulative per-unit usage summed across runs (tokens/credits/pages/...); {} for compute-only capabilities
```

``` python
@runtime_checkable
class EmpiricalResourceStore(Protocol):
    """
    Protocol for persisting empirically-observed resource usage.
    
    Implementations aggregate online (Welford for means, max-of-peaks for memory).
    No raw-sample retention required — v1 is one row per (instance_id, config_hash)
    pair with running aggregates. A future implementation can add a samples table
    if time-series queries become necessary.
    """
    
    def record_sample(
            self,
            instance_id: str,
            capability_name: str,
            config_hash: str,
            sample: ResourceSample,
        ) -> None
        "Fold a sample into the running aggregate. Creates a new record on first call."
    
    def get_record(
            self,
            instance_id: str,
            config_hash: str,
        ) -> Optional[EmpiricalResourceRecord]
        "Fetch the aggregated record for (instance_id, config_hash), or None."
    
    def list_records(
            self,
            capability_name: Optional[str] = None,
        ) -> List[EmpiricalResourceRecord]
        "List all records, optionally filtered to a single capability_name."
    
    def delete_record(
            self,
            instance_id: str,
            config_hash: str,
        ) -> bool
        "Remove a record. Returns True if a row was deleted."
```

``` python
class LocalEmpiricalResourceStore:
    def __init__(self, db_path: Optional[Path] = None):
        """Initialize the store. `db_path=None` uses `~/.cjm/empirical_resources.db`."""
        self.db_path = Path(db_path) if db_path is not None else _default_db_path()
    
    
    
    
    
    
    @staticmethod
    def _row_to_record(row) -> EmpiricalResourceRecord
    """
    SQLite-backed default implementation of `EmpiricalResourceStore`.
    
    Online Welford aggregation for means; max-of-peaks for memory metrics.
    success_rate computed at read time from `success_count / sample_count`.
    DB + schema created lazily on first write.
    """
    
    def __init__(self, db_path: Optional[Path] = None):
            """Initialize the store. `db_path=None` uses `~/.cjm/empirical_resources.db`."""
            self.db_path = Path(db_path) if db_path is not None else _default_db_path()
        
        
        
        
        
        
        @staticmethod
        def _row_to_record(row) -> EmpiricalResourceRecord
        "Initialize the store. `db_path=None` uses `~/.cjm/empirical_resources.db`."
```

#### Variables

``` python
_SCHEMA = "\nCREATE TABLE IF NOT EXISTS empirical_resources (\n    instance_id TEXT NOT NULL,\n    capability_name TEXT NOT NULL,\n    config_hash TEXT NOT NULL,\n    sample_count INTEGER NOT NULL DEFAULT 0,\n    success_count INTEGER NOT NULL DEFAULT 0,\n    cpu_percent_mean REAL NOT NULL DEFAULT 0.0,\n    memory_mb_peak_max REAL NOT NULL DEFAULT 0.0,\n    memory_mb_peak_mean REAL NOT NULL DEFAULT 0.0,\n    gpu_memory_mb_peak_max REAL NOT NULL DEFAULT 0.0,\n    gpu_memory_mb_peak_mean REAL NOT NULL DEFAULT 0.0,\n    duration_seconds_mean REAL NOT NULL DEFAULT 0.0,\n    last_observed TEXT NOT NULL,\n    api_usage_totals TEXT NOT NULL DEFAULT '{}',\n    PRIMARY KEY (instance_id, config_hash)\n)\n"
```

### Capability Error Taxonomy (`errors.ipynb`)

> Typed exception hierarchy + JobError dataclass + default
> classification of bare Python exceptions. The substrate’s CR-5
> implementation per the 2026-05-19 substrate audit.

#### Import

``` python
from cjm_substrate.core.errors import (
    CapabilityError,
    CapabilityInputError,
    CapabilityTransientError,
    CapabilityResourceError,
    CapabilityFatalError,
    CapabilityDisabledError,
    CapabilityNotLoadedError,
    CapabilityTimeoutError,
    CapabilityCancelledError,
    WorkerOOMError,
    CapabilityConfigError,
    ResourceShortfall,
    TracebackPolicy,
    JobError,
    classify_exception,
    map_bare_exception_to_job_error
)
```

#### Functions

``` python
def classify_exception(
    exc: BaseException  # The exception to classify
) -> "Literal['user_input', 'transient', 'resource', 'fatal']":  # Category
    """
    Return the substrate category for any exception.
    
    CapabilityError subclasses report their own declared `category`. Bare Python
    exceptions are mapped via `__mro__` walk against `_BARE_EXCEPTION_CATEGORY_MAP`;
    the first ancestor in the table wins. Unrecognized exceptions classify as
    `fatal` (don't auto-retry the unknown).
    """
```

``` python
def map_bare_exception_to_job_error(
    exc: BaseException,  # The raised exception
    *,
    capability_name: Optional[str] = None,  # Name of the capability that raised
    capability_instance_id: Optional[str] = None,  # Per CR-10
    traceback_policy: TracebackPolicy = TracebackPolicy.FULL,  # How much detail to record
    occurred_at: Optional[datetime] = None,  # Override; defaults to datetime.now(timezone.utc)
) -> JobError
    """
    Convert any exception into a structured `JobError`.
    
    CapabilityError subclasses contribute their category-specific structured data
    (`fields_invalid` for input errors, `resource_shortfall` for resource errors,
    `retry_after_seconds` for transient errors). Bare exceptions get the
    default category-based retriable flag and no structured side-channel.
    """
```

#### Classes

``` python
class CapabilityError(Exception):
    """
    Base for substrate-recognized capability exceptions.
    
    Subclasses declare a `category` and `default_retriable` ClassVar so the
    JobQueue + scheduler can route the failure without sniffing exception
    text. Bare Python exceptions raised by capability code go through
    `map_bare_exception_to_job_error` to acquire a default category.
    """
```

``` python
class CapabilityInputError:
    def __init__(
        self,
        message: str,  # Human-readable description
        *,
        fields_invalid: Optional[List[str]] = None,  # Names of inputs that failed validation
    )
    """
    User-fixable error: bad config, invalid argument, missing file.
    
    Like the other category bases (`CapabilityTransientError`,
    `CapabilityResourceError`, `CapabilityFatalError`), it extends only
    `CapabilityError`; the right reader intent is `except CapabilityInputError:`
    (or the broader `except CapabilityError:`).
    """
    
    def __init__(
            self,
            message: str,  # Human-readable description
            *,
            fields_invalid: Optional[List[str]] = None,  # Names of inputs that failed validation
        )
```

``` python
class CapabilityTransientError:
    def __init__(
        self,
        message: str,  # Human-readable description
        *,
        retry_after_seconds: Optional[float] = None,  # Hint for backoff strategies
    )
    """
    Temporary failure: timeout, network blip, brief resource contention.
    
    Substrate / JobQueue may retry on its own initiative. Capability authors raise
    this when they know the failure is recoverable.
    """
    
    def __init__(
            self,
            message: str,  # Human-readable description
            *,
            retry_after_seconds: Optional[float] = None,  # Hint for backoff strategies
        )
```

``` python
class CapabilityResourceError:
    def __init__(
        self,
        message: str,  # Human-readable description
        *,
        resource_shortfall: Optional["ResourceShortfall"] = None,  # Quantitative gap
    )
    """
    Resource exhaustion: GPU VRAM, system RAM, disk full.
    
    JobQueue's reactive-eviction flow (CR-7) routes resource errors to retry
    after attempting to free the named resource. Capability authors set
    `resource_shortfall` so the substrate knows what to evict.
    """
    
    def __init__(
            self,
            message: str,  # Human-readable description
            *,
            resource_shortfall: Optional["ResourceShortfall"] = None,  # Quantitative gap
        )
```

``` python
class CapabilityFatalError(CapabilityError):
    """
    Bug / irrecoverable state. The capability cannot complete this job; retrying won't help.
    
    Capability authors raise this when they know the failure is permanent for the
    given inputs. The substrate does NOT retry fatal errors.
    """
```

``` python
class CapabilityDisabledError:
    def __init__(self, capability_name: str)
    """
    JobQueue / execute_capability rejected: the capability is currently disabled.
    
    User-fixable (re-enable the capability). Raised by CR-2's enable/disable
    wiring once that lands.
    """
    
    def __init__(self, capability_name: str)
```

``` python
class CapabilityNotLoadedError:
    def __init__(self, capability_name: str)
    """
    Caller submitted to a capability that was never loaded.
    
    Fatal category because this is a programmer / orchestration bug, not a
    user-fixable condition. The right reader intent is
    `except CapabilityNotLoadedError:` (or the broader `except CapabilityError:`).
    """
    
    def __init__(self, capability_name: str)
```

``` python
class CapabilityTimeoutError:
    def __init__(
        self,
        capability_name: str,
        timeout_seconds: float,
        *,
        retry_after_seconds: Optional[float] = None,
    )
    """
    A per-job timeout fired before the capability finished.
    
    Transient category — retry may succeed if the slow operation completes faster
    next time. Carries `retry_after_seconds` from `CapabilityTransientError`.
    Raised by SG-14's per-job timeout primitive when that lands.
    """
    
    def __init__(
            self,
            capability_name: str,
            timeout_seconds: float,
            *,
            retry_after_seconds: Optional[float] = None,
        )
```

``` python
class CapabilityCancelledError:
    def __init__(self, capability_name: str)
    """
    Cooperative cancellation signal raised from `ToolCapability.check_cancel()`.
    
    Anchors under `CapabilityTransientError` because cancellation is in-principle
    re-runnable — a future attempt with the same inputs won't auto-fail if the
    cancel flag isn't set. But `default_retriable` is False: cancellation was
    a deliberate operator action, so the substrate should NOT auto-retry.
    Job-monitor / JobQueue render cancelled jobs with their own state
    (separate from "failed"); the JobError category remains `transient` so
    consumers reading the typed taxonomy can group recoverable signals.
    
    Capability authors raise this implicitly via `self.check_cancel()` inside
    `execute()`; substrate sets the underlying `_cancel_requested` flag via
    `cancel()`. See CR-4's cancellation primitives for the cooperative-cancel
    protocol.
    """
    
    def __init__(self, capability_name: str)
```

``` python
class WorkerOOMError:
    def __init__(
        self,
        capability_name: str,
        *,
        process_returncode: Optional[int] = None,
        message: Optional[str] = None,
    )
    """
    The worker subprocess died with a kill-signal during an active execute call.
    
    CR-7 Track A — substrate-side OOM detection: when an HTTP call to the worker
    faults and the subprocess has died with `returncode == -signal.SIGKILL` (or
    the platform equivalent), the substrate raises this. The kernel OOM-killer
    is the most common cause of SIGKILL during normal execute paths, so the
    substrate treats SIGKILL-during-call as "assume OOM" and surfaces a typed
    resource error for the reactive retry path.
    
    `resource_shortfall` is `None` for Track A — the substrate only saw "worker
    died from kill-signal" and has no per-resource needed/available numbers.
    Track B (per SG-47's sub-task: capability-side wrapping of `torch.cuda.OutOfMemoryError`
    et al.) raises `CapabilityResourceError` directly with a populated
    `ResourceShortfall` because the capability had the context. Both land at the
    same `except CapabilityResourceError` site in CR-7's reactive retry loop.
    
    `process_returncode` carries the observed exit code for debugging /
    classification (e.g. operators can distinguish kernel-OOM SIGKILL from
    other signals if they read it). Defaults to `None` for callers that don't
    have it on hand.
    """
    
    def __init__(
            self,
            capability_name: str,
            *,
            process_returncode: Optional[int] = None,
            message: Optional[str] = None,
        )
```

``` python
class CapabilityConfigError:
    def __init__(
        self,
        message: str,  # Human-readable description
        *,
        fields_invalid: Optional[List[str]] = None,  # Canonical: list of bad config keys
        config_class_name: str = "",  # Dataclass / capability name for the schema
    )
    """
    Unknown / invalid keys in a config dict against a capability's config schema.
    
    Reparented from `cjm_substrate.utils.validation` (Wave 2 / SG-8) under
    CR-5. Inherits `CapabilityInputError`'s ValueError MRO automatically.
    `config_class_name` is the dataclass / capability name whose schema was violated.
    """
    
    def __init__(
            self,
            message: str,  # Human-readable description
            *,
            fields_invalid: Optional[List[str]] = None,  # Canonical: list of bad config keys
            config_class_name: str = "",  # Dataclass / capability name for the schema
        )
```

``` python
@dataclass
class ResourceShortfall:
    "Quantitative gap between what a capability needed and what was available."
    
    resource: Literal['gpu_vram_mb', 'system_ram_mb', 'disk_mb']  # Which resource
    needed: float  # Amount the capability reported it needed
    available: float  # Amount actually available when the failure occurred
```

``` python
class TracebackPolicy(str, Enum):
    "How much exception detail the substrate records on a JobError."
```

``` python
@dataclass
class JobError:
    """
    Structured failure summary recorded on a completed Job.
    
    Populated by the JobQueue when a capability execution fails (CR-6 owns the
    population logic; CR-5 owns the shape). Sufficient for UI to render a
    failure card + retry affordance without re-running the capability.
    """
    
    category: Literal['user_input', 'transient', 'resource', 'fatal']
    message: str  # Human-readable error message
    retriable: bool  # Whether the substrate considers this safe to auto-retry
    original_exc_repr: str  # repr(exc) of the original exception
    traceback: Optional[str]  # Full traceback per TracebackPolicy
    retry_after_seconds: Optional[float]  # Backoff hint from CapabilityTransientError
    fields_invalid: Optional[List[str]]  # From CapabilityInputError subclasses
    resource_shortfall: Optional[ResourceShortfall]  # From CapabilityResourceError
    capability_name: Optional[str]  # Name of the capability that raised
    capability_instance_id: Optional[str]  # Per CR-10 multi-instance support
    occurred_at: Optional[datetime]  # When the failure was recorded
```

#### Variables

``` python
_BARE_EXCEPTION_CATEGORY_MAP: "dict[type, Literal['user_input', 'transient', 'resource', 'fatal']]"
_CATEGORY_RETRIABLE_DEFAULTS: 'dict[str, bool]'
```

### Content Hashing Utilities (`hashing.ipynb`)

> Shared cryptographic hashing primitives for content integrity
> verification

#### Import

``` python
from cjm_substrate.utils.hashing import (
    hash_bytes,
    hash_file,
    verify_hash,
    hash_dict_canonical
)
```

#### Functions

``` python
def hash_bytes(
    content: bytes,  # Byte content to hash
    algo: str = "sha256"  # Hash algorithm name (e.g., "sha256", "sha3_256")
) -> str:  # Hash string in "algo:hexdigest" format
    "Compute a hash of byte content."
```

``` python
def hash_file(
    path: Union[str, Path],  # Path to file to hash
    algo: str = "sha256",  # Hash algorithm name
    chunk_size: int = 8192  # Read chunk size in bytes
) -> str:  # Hash string in "algo:hexdigest" format
    "Stream-hash a file without loading it entirely into memory."
```

``` python
def verify_hash(
    content: bytes,  # Content to verify
    expected: str  # Expected hash in "algo:hexdigest" format
) -> bool:  # True if content matches expected hash
    "Verify content against an expected hash string."
```

``` python
def hash_dict_canonical(
    data: Optional[Dict[str, Any]],  # Dict to hash (or None — treated as {})
    algo: str = "sha256",  # Hash algorithm name
) -> str:  # Hash string in "algo:hexdigest" format
    """
    Hash a dict via canonical JSON encoding.
    
    Canonicalization: `json.dumps(data, sort_keys=True, separators=(",", ":"))`.
    Sorted keys eliminate dict-insertion-order variance; minimal separators
    eliminate whitespace variance. Result is deterministic across Python
    versions and machines.
    """
```

### Journal Store (`journal_store.ipynb`)

> CR-14 (stage 7): the durable account-of-action. One substrate-derived,
> host-written, never-auto-deleted SQLite store of typed observability
> events — the operational half of the attempted-vs-happened asymmetry
> (the graph records what HAPPENED; the journal records what was
> ATTEMPTED, including everything the graph by design refuses to
> contain: failures, refusals, retries, admission decisions, worker
> lifecycle). Design ledger: `claude-docs/stage-7-evidence.md`.

#### Import

``` python
from cjm_substrate.core.journal_store import (
    LIVENESS_EVENT_TYPES,
    SubstrateEventType,
    JournalEvent,
    JournalStore,
    LocalJournalStore
)
```

#### Functions

``` python
@patch
@contextmanager
def _conn(self: LocalJournalStore) -> Iterator[sqlite3.Connection]:
    """Yield the persistent connection under the instance lock (lazy init:
    parent dirs + WAL + schema on first use).

    Stage-7 stress catch: the previous per-call connect/close shape paid a
    WAL checkpoint on every close (~16 ms/append — 25x over the design's
    latency claim). `synchronous=NORMAL` is the standard WAL pairing
    (durable to process crash; an OS/power crash may lose only the most
    recent commits — the wedge gate covers append FAILURES, which stay
    loud). `check_same_thread=False` + the lock makes any-thread use safe.
    """
    with self._lock
    """
    Yield the persistent connection under the instance lock (lazy init:
    parent dirs + WAL + schema on first use).
    
    Stage-7 stress catch: the previous per-call connect/close shape paid a
    WAL checkpoint on every close (~16 ms/append — 25x over the design's
    latency claim). `synchronous=NORMAL` is the standard WAL pairing
    (durable to process crash; an OS/power crash may lose only the most
    recent commits — the wedge gate covers append FAILURES, which stay
    loud). `check_same_thread=False` + the lock makes any-thread use safe.
    """
```

``` python
@patch
def append(
    self: LocalJournalStore,
    event: JournalEvent,  # Event to persist
) -> int:  # Store-assigned seq (cursor)
    """
    Persist one event; sets and returns `event.seq`.
    
    LOUD by contract: sqlite errors propagate (the audit trail never
    degrades silently — ratified design #13). One tiny WAL INSERT;
    synchronous on purpose (G4: the dispatch fast path must stay
    predictable; at substrate event volume this is microseconds).
    """
```

``` python
@patch
def query(
    self: LocalJournalStore,
    job_id: Optional[str] = None,  # Filter: job correlation
    run_id: Optional[str] = None,  # Filter: host-tier run
    composition_id: Optional[str] = None,  # Filter: composition
    capability_instance_id: Optional[str] = None,  # Filter: instance
    worker_session_id: Optional[str] = None,  # Filter: worker session
    event_type: Optional[str] = None,  # Filter: one vocabulary value
    after_seq: Optional[int] = None,  # Tail cursor: rows with seq > this
    since_ts: Optional[datetime] = None,  # Filter: ts >= (isoformat compare)
    until_ts: Optional[datetime] = None,  # Filter: ts <= (isoformat compare)
    limit: Optional[int] = None,  # Max rows
    descending: bool = False,  # True = newest first
) -> List[JournalEvent]:  # Matching events, seq-ordered
    "Filtered read; all filters AND-combined."
```

``` python
@patch
def count(
    self: LocalJournalStore,
    event_type: Optional[str] = None,  # Optional per-type count
) -> int:  # Row count
    "Total journal rows (volume regression checks)."
```

``` python
@patch
def terminal_state_events(
    self: LocalJournalStore,
    limit: Optional[int] = None,  # Most recent N (None = all)
) -> List[JournalEvent]:  # Terminal STATE_TRANSITION rows, newest first
    """
    The durable job history (`_history` migration rider): terminal
    STATE_TRANSITION rows whose payload carries the job snapshot.
    """
```

#### Classes

``` python
class SubstrateEventType(str, Enum):
    """
    Journal vocabulary beyond the job-scoped `JobEventType` set (CR-14).
    
    Reserved up front (emission progressive). Job-scoped types stay in
    `core.queue.JobEventType`; both serialize to plain strings in the
    journal's `event_type` column — the journal is vocabulary-tolerant
    by design (unknown types round-trip; the P5/P6 tolerant-unknown law).
    """
```

``` python
@dataclass
class JournalEvent:
    """
    One durable observability record (CR-14).
    
    The journal never duplicates what manifests / capability DBs / the graph
    already record — graph-touching payloads carry REFERENCES (node ids +
    content hashes, verifiable via the CR-19 machinery), never content.
    `worker_reported=True` marks payloads that originated in-worker and rode
    a wire envelope; the HOST still wrote the row (single-writer-class rule).
    """
    
    event_type: str  # JobEventType.value or SubstrateEventType.value (vocabulary-tolerant)
    event_id: str = field(...)  # Generated occurrence id (EventRef anchor)
    ts: datetime = field(...)  # Substrate-stamped, tz-aware UTC
    run_id: Optional[str]  # Host-tier run correlation (core run manifests)
    job_id: Optional[str]  # Queue job correlation
    composition_id: Optional[str]  # Stage-3 composition correlation
    node_id: Optional[str]  # Composition node correlation
    capability_instance_id: Optional[str]  # CR-10 instance correlation
    capability_name: Optional[str]  # Denormalized for filtering
    config_hash: Optional[str]  # Effective config at event time (CR-7 keying)
    task_name: Optional[str]  # Task-channel address (stage 4)
    method: Optional[str]  # Task-channel method (stage 4)
    worker_session_id: Optional[str]  # Spawn-scoped worker session (replaces ctime markers)
    actor: Optional[str]  # Who/what initiated (operator / agent / host id)
    worker_reported: bool = False  # Payload originated in-worker (rode the wire); host wrote the row
    payload: Dict[str, Any] = field(...)  # Per-event-type structured detail
    seq: Optional[int]  # Store-assigned cursor (rowid); None until appended
```

``` python
@runtime_checkable
class JournalStore(Protocol):
    """
    Protocol for the durable account-of-action (CR-14).
    
    Implementations MUST raise on append failure (loud, never silent —
    the audit trail does not degrade quietly) and MUST NOT expose a
    delete/retention surface (precious class).
    """
    
    def append(self, event: JournalEvent) -> int:
            """Persist one event; returns the store-assigned seq (cursor)."""
            ...
    
        def query(
            self,
            job_id: Optional[str] = None,
            run_id: Optional[str] = None,
            composition_id: Optional[str] = None,
            capability_instance_id: Optional[str] = None,
            worker_session_id: Optional[str] = None,
            event_type: Optional[str] = None,
            after_seq: Optional[int] = None,
            since_ts: Optional[datetime] = None,
            until_ts: Optional[datetime] = None,
            limit: Optional[int] = None,
            descending: bool = False,
        ) -> List[JournalEvent]
        "Persist one event; returns the store-assigned seq (cursor)."
    
    def query(
            self,
            job_id: Optional[str] = None,
            run_id: Optional[str] = None,
            composition_id: Optional[str] = None,
            capability_instance_id: Optional[str] = None,
            worker_session_id: Optional[str] = None,
            event_type: Optional[str] = None,
            after_seq: Optional[int] = None,
            since_ts: Optional[datetime] = None,
            until_ts: Optional[datetime] = None,
            limit: Optional[int] = None,
            descending: bool = False,
        ) -> List[JournalEvent]
        "Filtered read; all filters AND-combined; `after_seq` is the tail cursor."
    
    def count(self, event_type: Optional[str] = None) -> int:
            """Total rows (optionally per type) — volume regression checks."""
            ...
    
        def terminal_state_events(self, limit: Optional[int] = None) -> List[JournalEvent]
        "Total rows (optionally per type) — volume regression checks."
    
    def terminal_state_events(self, limit: Optional[int] = None) -> List[JournalEvent]
        "STATE_TRANSITION rows whose payload `to` is terminal — the durable
job history (the `_history` migration rider)."
```

``` python
class LocalJournalStore:
    def __init__(self, db_path: Optional[Path] = None):
        """`db_path=None` uses `~/.cjm/journal.db`; CapabilityManager passes
        `cfg.journal_db_path` (project-scoped) automatically."""
        self.db_path = Path(db_path) if db_path is not None else Path.home() / ".cjm" / "journal.db"
        # Persistent lock-protected connection (stage-7 stress part-1 catch)
    """
    SQLite-backed default `JournalStore` (CR-14).
    
    WAL + busy_timeout for multi-process host writers; per-call
    connections (sibling-store convention). `append` raises on failure
    (loud) — callers never wrap it in a silent try/except.
    """
    
    def __init__(self, db_path: Optional[Path] = None):
            """`db_path=None` uses `~/.cjm/journal.db`; CapabilityManager passes
            `cfg.journal_db_path` (project-scoped) automatically."""
            self.db_path = Path(db_path) if db_path is not None else Path.home() / ".cjm" / "journal.db"
            # Persistent lock-protected connection (stage-7 stress part-1 catch)
        "`db_path=None` uses `~/.cjm/journal.db`; CapabilityManager passes
`cfg.journal_db_path` (project-scoped) automatically."
```

#### Variables

``` python
LIVENESS_EVENT_TYPES: frozenset
_JOURNAL_SCHEMA = "\nCREATE TABLE IF NOT EXISTS journal (\n    seq INTEGER PRIMARY KEY AUTOINCREMENT,\n    event_id TEXT NOT NULL UNIQUE,\n    ts TEXT NOT NULL,\n    event_type TEXT NOT NULL,\n    run_id TEXT,\n    job_id TEXT,\n    composition_id TEXT,\n    node_id TEXT,\n    capability_instance_id TEXT,\n    capability_name TEXT,\n    config_hash TEXT,\n    task_name TEXT,\n    method TEXT,\n    worker_session_id TEXT,\n    actor TEXT,\n    worker_reported INTEGER NOT NULL DEFAULT 0,\n    payload TEXT NOT NULL DEFAULT '{}'\n);\nCREATE INDEX IF NOT EXISTS idx_journal_job ON journal (job_id);\nCREATE INDEX IF NOT EXISTS idx_journal_run ON journal (run_id);\nCREATE INDEX IF NOT EXISTS idx_journal_comp ON journal (composition_id);\nCREATE INDEX IF NOT EXISTS idx_journal_type_ts ON journal (event_type, ts);\nCREATE INDEX IF NOT EXISTS idx_journal_instance_ts ON journal (capability_instance_id, ts);\nCREATE INDEX IF NOT EXISTS idx_journal_wsession ON journal (worker_session_id);\n"
```

### Capability Manager (`manager.ipynb`)

> Capability discovery, loading, and lifecycle management system

#### Import

``` python
from cjm_substrate.core.manager import (
    CapabilityManager,
    CapabilityBinding
)
```

#### Functions

``` python
def _start_diagnostics_retention_sweep(self) -> None:
    """CR-14 follow-up: host-startup diagnostics retention sweep.

    The invocation half of the retention policy (`cjm-ctl retention` is the
    other): fire-and-forget daemon thread so `__init__` stays fast (slow-init
    discipline) and a large backlog never delays capability loading. Disabled
    when `cfg.substrate.diagnostics_retention_days <= 0` and no size budget
    is set. Best-effort: a sweep failure logs at WARNING — the diagnostics
    class is disposable; the JOURNAL has no retention surface at all.
    """
    try
    """
    CR-14 follow-up: host-startup diagnostics retention sweep.
    
    The invocation half of the retention policy (`cjm-ctl retention` is the
    other): fire-and-forget daemon thread so `__init__` stays fast (slow-init
    discipline) and a large backlog never delays capability loading. Disabled
    when `cfg.substrate.diagnostics_retention_days <= 0` and no size budget
    is set. Best-effort: a sweep failure logs at WARNING — the diagnostics
    class is disposable; the JOURNAL has no retention surface at all.
    """
```

``` python
def register_system_monitor(
    self,
    capability_name:str # Name of the system monitor capability
) -> None
    "Bind a loaded capability to act as the hardware system monitor."
```

``` python
def _resolve_system_monitor(
    self,
) -> Optional[Any]: # The bound system-monitor proxy, or None
    """
    Return the system monitor, lazily binding from the constructor's
    `sysmon_capability_name` when `register_system_monitor` was never called.
    
    Stage-3 G11: requiring a SEPARATE `register_system_monitor()` call after
    load was a trap every core CLI fell into — GPU subtree ATTRIBUTION worked
    (the JobQueue queries its own `sysmon_capability_name` directly) while the
    stats path silently returned `{}`, so the scheduler quantity checks AND
    the stage-3 admission ladder saw no telemetry and every GPU-profiled job
    ran exclusive. The constructor parameter now expresses the full intent.
    """
```

``` python
def _get_global_stats(self) -> Dict[str, Any]: # Current system telemetry
    """Fetch real-time stats from the system monitor capability (sync).
    
    CR-3: prefer typed `get_system_status()` over magic-string dispatcher.
    Duck-types because the substrate references `system_monitor` as a
    generic `ToolCapability` — CR-1's host-no-imports rule means substrate
    does not import the monitor capability to type-narrow the reference.
    Proxies after CR-3 expose `get_system_status` as a bound method that
    POSTs to `/get_system_status` and returns `Optional[Dict[str, Any]]`.
    """
    monitor = self._resolve_system_monitor()
    if not monitor
    """
    Fetch real-time stats from the system monitor capability (sync).
    
    CR-3: prefer typed `get_system_status()` over magic-string dispatcher.
    Duck-types because the substrate references `system_monitor` as a
    generic `ToolCapability` — CR-1's host-no-imports rule means substrate
    does not import the monitor capability to type-narrow the reference.
    Proxies after CR-3 expose `get_system_status` as a bound method that
    POSTs to `/get_system_status` and returns `Optional[Dict[str, Any]]`.
    """
```

``` python
async def _get_global_stats_async(self) -> Dict[str, Any]: # Current system telemetry
    """Fetch real-time stats from the system monitor capability (async).
    
    Same CR-3 duck-type semantics as the sync variant. Async variant exists
    because the substrate's `execute_capability_async` path (CR-2 + CR-10) needs
    a non-blocking stats fetch when scheduling under an asyncio event loop.
    """
    monitor = self._resolve_system_monitor()
    if not monitor
    """
    Fetch real-time stats from the system monitor capability (async).
    
    Same CR-3 duck-type semantics as the sync variant. Async variant exists
    because the substrate's `execute_capability_async` path (CR-2 + CR-10) needs
    a non-blocking stats fetch when scheduling under an asyncio event loop.
    """
```

``` python
async def get_global_stats(self) -> Dict[str, Any]: # Current system telemetry
    """
    Public async system-telemetry accessor (stage 3 / CR-16).
    
    The JobQueue's multi-lane admission consumes this through the
    `JobQueueDependencies` protocol (defensively via getattr). Thin wrapper
    over `_get_global_stats_async` — same CR-3 duck-type semantics.
    """
```

``` python
def get_admission_profile(
    self,
    name_or_id:str # Capability name (default instance) or instance_id (multi-instance)
) -> Optional[Dict[str, Any]]: # {'gpu_memory_mb_peak_max','memory_mb_peak_max','sample_count'} or None
    """
    Empirical resource profile for a loaded instance's CURRENT config
    (stage 3 / CR-16 multi-lane admission).
    
    Reads the CR-7 empirical store at the instance's live
    `(instance_id, config_hash)` key — the SAME keying that records samples,
    so the profile always describes the configuration actually being run
    (a config change = a new hash = no record = the queue runs the job
    EXCLUSIVE until its first measurement run graduates it).
    
    None = no evidence (instance unknown / store disabled / no record for
    this config). The manifest's `requires_gpu` is deliberately not part of
    this surface — GPU use is an empirical fact, not a declaration (stage-3
    ledger G2).
    """
```

``` python
def get_instance_concurrency_cap(
    self,
    name_or_id:str # Capability name (default instance) or instance_id (multi-instance)
) -> Optional[int]: # The instance's SG-33 max_concurrent_requests (None = unset)
    """
    Per-instance concurrency cap for queue admission (stage 3 / CR-16).
    
    Surfaces the SG-33 `max_concurrent_requests` setting. The queue treats
    None as 1 (same-worker concurrency is OPT-IN per capability — e.g.
    ffmpeg raises its cap because its sync endpoints run in a threadpool
    and concurrent converts genuinely parallelize as subprocesses; model
    workers stay serial-per-instance).
    """
```

``` python
def _parse_resources(
    self,
    manifest: Dict[str, Any]  # Loaded manifest dict
) -> Optional[ResourceRequirements]
    "Phase 5a: parse the manifest's resources block into a ResourceRequirements."
```

``` python
def discover_manifests(self) -> List[CapabilityMeta]: # List of discovered capability metadata
    """Discover capabilities via JSON manifests in search paths.
    
    CR-8: reads each manifest via `load_manifest`, which parses the v2.0
    nested layout into a typed `ManifestV2`.
    `meta.manifest` is set to a flat-shaped dict view so existing consumers
    (proxy, scheduling, execute path) continue working unchanged; the typed
    `ManifestV2` is also attached as `meta.manifest_v2` so drift detection
    + future typed callers can read `drift_tracking.config_schema_hash`
    without re-parsing.
    """
    self.discovered = []
    self.adapter_manifests = []  # CR-17 pt 2: adapter units discovered beside capabilities
    """
    Discover capabilities via JSON manifests in search paths.
    
    CR-8: reads each manifest via `load_manifest`, which parses the v2.0
    nested layout into a typed `ManifestV2`.
    `meta.manifest` is set to a flat-shaped dict view so existing consumers
    (proxy, scheduling, execute path) continue working unchanged; the typed
    `ManifestV2` is also attached as `meta.manifest_v2` so drift detection
    + future typed callers can read `drift_tracking.config_schema_hash`
    without re-parsing.
    """
```

``` python
def get_adapters_for_task(
    self,
    task_name: str,  # Task name, e.g. "graph-storage"
) -> List[AdapterManifest]:  # Discovered adapter units serving the task
    "CR-17 pt 2: the adapter-registry view — discovered adapter manifests for a task."
```

``` python
def check_adapter_compatibility(
    self,
    adapter: Union[str, AdapterManifest],  # Adapter unit name or manifest
    capability_name: str,  # Discovered capability (capability) name
) -> Dict[str, Any]:  # Match verdict (see match_protocol_against_surface)
    """
    CR-17 pt 2: surface-based compatibility verdict (host-side; works against
    UNLOADED capabilities — manifest-vs-manifest, no protocol imports host-side).
    
    Matches the adapter's recorded protocol members against the capability
    manifest's recorded `structural_surface` (pass-2 Thread 3: the capability
    records only itself; the adapter declares the protocol; the substrate
    matches). A capability without a recorded surface (pre-fracture manifest)
    is NOT compatible until its manifest regenerates — staleness stays visible
    instead of silently mis-answering.
    """
```

``` python
def get_capabilities_compatible_with(
    self,
    adapter: Union[str, AdapterManifest],  # Adapter unit name or manifest
) -> List[str]:  # Discovered capability names whose surface satisfies the protocol
    "CR-17 pt 2: the pass-2 compatibility query, manifest-surface-based."
```

``` python
def _resolve_adapter_specs(
    self,
    capability_meta,  # Capability CapabilityMeta being loaded
    adapters=None,  # Explicit adapter unit names (loud refusal on mismatch); None = auto-bind compatibles
) -> List[str]:  # Worker specs "module:ClassName"
    """
    CR-17 pt 2: resolve which adapter impls bind in-worker at spawn.
    
    AUTO (adapters=None): every discovered adapter whose protocol members match
    the capability's recorded surface binds silently — binding rides
    `load_capability` with no separate manual call (the G11 lesson: a manual
    registration step no CLI makes is silently inert).
    
    EXPLICIT (adapters=[names]): each named unit is verified; an incompatible
    pairing REFUSES LOUDLY with the missing members in the message (the CR-17
    negative check).
    """
```

``` python
def get_capability_meta(
    self,
    capability_name:str # Name of the capability
) -> Optional[CapabilityMeta]: # Capability metadata or None
    "Get metadata for a loaded capability by name."
```

``` python
def get_discovered_meta(
    self,
    capability_name:str # Name of the capability
) -> Optional[CapabilityMeta]: # Capability metadata or None
    "Get metadata for a discovered (not necessarily loaded) capability by name."
```

``` python
def _extract_defaults_from_schema(
    self,
    config_schema:Optional[Dict[str, Any]] # JSON Schema with properties
) -> Dict[str, Any]: # Default values extracted from schema
    "Extract default values from a JSON Schema's properties."
```

``` python
def _validate_config_against_schema(
    """
    SG-5: validate a config dict against the manifest's `config_schema`
    before forwarding to the capability's `initialize()`.
    """
```

``` python
def _check_config_schema_drift(
    self,
    proxy:Any, # RemoteCapabilityProxy with a live worker
    capability_meta:CapabilityMeta, # Metadata to flag if drift is detected
) -> None
    """
    SG-9 + CR-8: compare live worker `/config_schema` to the stored hash.
    
    Reads the stored hash from `capability_meta.manifest_v2.drift_tracking.config_schema_hash`
    (populated by `discover_manifests`). Computes the live hash with
    `compute_config_schema_hash` and compares — drift = hashes differ.
    
    Honors `cfg.substrate.drift_detection` opt-out from `cjm.yaml`: hosts
    that don't want the per-load `/config_schema` HTTP call can disable
    detection there. Default is on.
    
    Test fixtures that stub `meta.manifest = {}` without going through
    `discover_manifests` won't have a `manifest_v2` attribute; the
    `getattr(..., None)` fallback yields `stored_hash=None`, which doesn't
    match any live hash — those tests don't expose a real proxy so the
    drift warning fires harmlessly.
    """
```

``` python
def _check_structural_surface_drift(
    self,
    proxy:Any, # RemoteCapabilityProxy with a live worker
    capability_meta:CapabilityMeta, # Metadata to flag if drift is detected
) -> None
    """
    Pass-2 Thread 3 (stage 2): compare the worker's live-derived structural
    surface to the manifest's stored witness hash — third instance of the
    CR-8 hashed-witness + live-companion idiom (after config_schema and the
    compatibility-transport protocol membership it superseded).
    
    Stage-4 adapter compatibility matches `required_tool_protocol` against
    the RECORDED surface, so a stale recording silently mis-answers
    compatibility queries — this check is what makes that visible.
    
    Skips silently when: drift detection is opted out (same cjm.yaml switch
    as config-schema drift); the manifest predates surface recording
    (stored hash None — `regenerate-manifest` adds it); or the worker
    predates the /structural_surface endpoint (proxy returns None).
    """
```

``` python
def _persist_config(
    self,
    capability_name: str  # Capability to persist
) -> None
    """
    CR-2: write current CapabilityMeta state + live worker config to the store.
    
    Reads `meta.enabled` (the substrate-authoritative flag) and the worker's
    current_config (when reachable). Failures are logged + swallowed —
    persistence is a best-effort side-channel, not a correctness invariant.
    """
```

``` python
def _maybe_fire_disable_hook(
    self,
    name_or_id: str  # instance_id (or legacy capability_name) whose in-flight job just finished
) -> None
    """
    CR-2 + CR-10: fire deferred on_disable for `name_or_id` if pending.
    
    Idempotent. Resolves via self.instances first; falls back to
    self.capabilities[name].instance for legacy code paths.
    """
```

``` python
def _validate_instance_id(self, instance_id: str) -> None:
    """Reject malformed explicit instance_ids at load time.
    
    Pattern: alphanumeric + underscore + hyphen, length 1..64. Raises
    ValueError on invalid input so the caller sees the constraint failure
    immediately rather than at first execute / unload.
    """
    import re as _re
    if not isinstance(instance_id, str)
    """
    Reject malformed explicit instance_ids at load time.
    
    Pattern: alphanumeric + underscore + hyphen, length 1..64. Raises
    ValueError on invalid input so the caller sees the constraint failure
    immediately rather than at first execute / unload.
    """
```

``` python
def _generate_instance_id(self, capability_name: str) -> str:
    """Generate a unique instance_id of form `{capability_name}-{6-char-hex}`.
    
    Used when load_capability is called with new_instance=True and no explicit
    instance_id. Retries up to 16 times if a collision occurs in self.instances.
    """
    import secrets as _secrets
    for _ in range(16)
    """
    Generate a unique instance_id of form `{capability_name}-{6-char-hex}`.
    
    Used when load_capability is called with new_instance=True and no explicit
    instance_id. Retries up to 16 times if a collision occurs in self.instances.
    """
```

``` python
def get_instance(
    self,
    name_or_id: str  # Capability name (default-loaded) or explicit instance_id
) -> Optional[CapabilityInstance]
    """
    Return the CapabilityInstance for `name_or_id`, or None if not loaded.
    
    Lookup is keyed by instance_id (which equals capability_name for default-
    loaded capabilities). Multi-instance IDs only exist in self.instances.
    """
```

``` python
def list_instances(
    "List all loaded instances, optionally filtered by underlying capability name."
```

``` python
def _worker_env_specs(
    self,
    capability_meta: CapabilityMeta  # Capability whose WORKER_ENV contract to read
) -> List[Dict[str, Any]]:  # List of EnvVarSpec-as-dict entries (possibly empty)
    """
    Return a capability's WORKER_ENV contract as spec dicts (CR-12).
    
    Prefers the typed manifest_v2 code section; falls back to the flat manifest
    dict view. Empty list when the capability declares no worker-env contract.
    """
```

``` python
def _resolve_worker_env(
    self,
    capability_meta: CapabilityMeta,        # Capability being loaded
    scope: Optional[str] = None     # SG-55 forward seam: per-principal scope (None = single-user)
) -> Dict[str, str]:  # {ENV_NAME: value} overlay injected into the worker at spawn
    """
    CR-12 + Q1-A: compose the resolved worker-env overlay for a load.
    
    Secrets resolve from the SecretStore keyed by capability_name — so every
    instance of a capability shares one credential (CR-10: two Gemini instances,
    one GEMINI_API_KEY). A missing secret is OMITTED (the worker spawns without
    it; the capability reports the gap at execute) rather than injected empty.
    
    Visible vars resolve from their declared `default`, with Q1-A template
    substitution applied: a default like ``"${CJM_MODELS_DIR}/huggingface"``
    expands to an absolute path using the substrate's current `cfg.models_dir`
    + `cfg.capability_data_dir`. Static defaults (no `${...}` syntax) pass through
    unchanged. A template-substitution failure — unknown placeholder (capability
    author bug) OR unresolved value (operator hasn't configured
    `cfg.models_dir`) — is WARN-and-OMIT: the worker still spawns, and the
    capability can surface the gap via `missing_required_env()` if the field was
    declared `required=True`. This matches secret omission behaviour
    (operator-side concerns don't break load; the capability signals at execute).
    Capability-author-bug-class errors (unknown placeholders) surface at
    install/release time via `cjm-ctl validate` + `template_check_placeholders`,
    not here. All values are fixed at spawn — a change requires `reload_capability`.
    """
```

``` python
def get_worker_env_status(
    self,
    name_or_meta: Any,              # Capability name (loaded/discovered) or a CapabilityMeta
    scope: Optional[str] = None     # SG-55 forward seam
) -> List[Dict[str, Any]]:  # Per-entry status dicts (secret values never returned)
    """
    CR-12: per-entry satisfaction status of a capability's worker-env contract.
    
    Each entry: {name, secret, required, satisfied, label, description}.
    `satisfied` means a value is resolvable (secret present in the store, or a
    visible var has a default/override). Secret VALUES are never returned — only
    whether one is set. The capability-config UI uses this to gate config display on
    required secrets being satisfied.
    """
```

``` python
def missing_required_env(
    self,
    name_or_meta: Any,              # Capability name or CapabilityMeta
    scope: Optional[str] = None     # SG-55 forward seam
) -> List[str]:  # Names of required worker-env entries with no resolvable value
    "CR-12: names of required worker-env entries that are unsatisfied."
```

``` python
def set_capability_secret(
    self,
    name_or_id: str,             # Capability name or instance_id whose secret to set
    key: str,                    # Secret key (the env-var name, e.g. "GEMINI_API_KEY")
    value: str,                  # Secret value (stored via the SecretStore, never config/logs)
    *,
    scope: Optional[str] = None, # SG-55 forward seam: per-principal scope
    reload: bool = True          # Respawn loaded worker(s) so the new env is injected
) -> bool:  # True if the secret was stored
    """
    CR-12: store a capability secret, then respawn its worker(s) to inject it.
    
    Secrets are keyed by the underlying CAPABILITY name (not instance_id), so all
    instances of a capability share one credential — set the Gemini key once and
    every Gemini instance gets it at (re)spawn. Because worker env is fixed at
    spawn, the new value only reaches a *running* worker via a RESPAWN, so this
    reloads each loaded instance of the capability (unless `reload=False`, e.g. when
    provisioning a secret before the capability is loaded). This is the
    actuation seam both the CLI (`cjm-ctl set-secret`) and a future config UI
    call. Reload failures are logged, not raised.
    """
```

``` python
def load_capability(
    self,
    capability_meta:CapabilityMeta, # Capability metadata (with manifest attached)
    config:Optional[Dict[str, Any]]=None, # Initial configuration
    strict:bool=True, # SG-5: reject unknown keys against manifest config_schema (default)
    instance_id:Optional[str]=None, # CR-10: explicit instance_id; None defaults to capability_name
    new_instance:bool=False, # CR-10: auto-generate `{name}-{hex}` instance_id (with instance_id=None)
    max_concurrent_requests:Optional[int]=None, # SG-33 (CR-7): per-instance async concurrency cap; None = unbounded
    adapters:Optional[List[str]]=None # CR-17 pt 2: explicit adapter unit names (loud refusal on mismatch); None = auto-bind discovered compatibles
) -> bool: # True if successfully loaded
    """
    Load a capability by spawning a Worker subprocess.
    
    CR-2: reads the persisted CapabilityConfigRecord from `self.config_store`
    before launching the worker. If a persisted record exists and the
    caller didn't pass an explicit config, the persisted config is used
    as the effective input. The persisted `enabled` flag is applied to
    `capability_meta.enabled` so disabled capabilities stay disabled across
    process restarts.
    
    CR-10: optional `instance_id` allows multi-instance loading.
    - instance_id=None, new_instance=False (default): instance_id =
      capability_meta.name. Populates self.capabilities[capability_name] + self.instances
      [capability_name] together (single-instance backward compat).
    - instance_id="custom": validated against `[A-Za-z0-9_-]{1,64}`. Populates
      self.instances[custom]. Persistence is keyed by capability_name and only
      applied to the default instance.
    - instance_id=None, new_instance=True: auto-generates `{name}-{6-hex}`.
    Idempotent: re-load against an existing instance_id returns True without
    re-spawning.
    
    CR-7: computes `config_hash` from the effective config (post-defaults +
    post-validation) and stores it on the CapabilityInstance so execute_capability*
    can key empirical samples by (instance_id, config_hash). SG-33 stores
    `max_concurrent_requests` on the instance — the actual asyncio.Semaphore
    is lazy-created in execute_capability_async via `_get_concurrent_limiter`.
    """
```

``` python
def load_all(
    self,
    configs:Optional[Dict[str, Dict[str, Any]]]=None # Capability name -> config mapping
) -> Dict[str, bool]: # Capability name -> success mapping
    "Discover and load all available capabilities."
```

``` python
def unload_capability(
    self,
    name_or_id:str # Capability name (default-loaded) or instance_id (multi-instance)
) -> bool: # True if successfully unloaded
    """
    Unload a capability instance and terminate its Worker process (CR-10).
    
    If name_or_id resolves to the default instance (instance_id == capability_name)
    and no other instances remain for the same capability, also removes the
    CapabilityMeta from self.capabilities. Otherwise removes only the instance and
    clears CapabilityMeta.instance if it pointed at the unloaded canonical.
    """
```

``` python
def unload_all(self) -> None:
    """Unload all capability instances and terminate all Worker processes (CR-10).
    
    Iterates self.instances (CR-10 keying) rather than self.capabilities so all
    multi-instance entries get torn down, not just the canonical instances.
    """
    for inst_id in list(self.instances.keys())
    """
    Unload all capability instances and terminate all Worker processes (CR-10).
    
    Iterates self.instances (CR-10 keying) rather than self.capabilities so all
    multi-instance entries get torn down, not just the canonical instances.
    """
```

``` python
def get_capability(
    self,
    name_or_id:str # Capability name (default-loaded) or instance_id (multi-instance)
) -> Optional[ToolCapability]: # Capability proxy instance or None
    """
    Get a loaded capability's proxy by name or instance_id (CR-10).
    
    Lookup order: self.instances first (covers both default capability_name and
    multi-instance IDs), falling back to CapabilityMeta.instance for any
    legacy code path that populated self.capabilities without self.instances
    (defensive — shouldn't happen post-CR-10 since load_capability always
    records the instance).
    """
```

``` python
def list_capabilities(self) -> List[CapabilityMeta]: # List of loaded capability metadata
    "List all loaded capabilities."
```

``` python
def _get_sysmon_capability(self) -> Optional[Any]:
    """Resolve the configured monitor capability (CR-3) for GPU subtree attribution.

    Returns the loaded capability instance keyed by `sysmon_capability_name`, or
    None when no sysmon is configured / hasn't been loaded yet. Lazy
    resolution against `self.capabilities` tolerates load-order: the manager
    can be constructed before the sysmon capability is loaded; later
    `_record_sample_safe` calls pick it up automatically.
    """
    name = getattr(self, "_sysmon_capability_name", None)
    if not name
    """
    Resolve the configured monitor capability (CR-3) for GPU subtree attribution.
    
    Returns the loaded capability instance keyed by `sysmon_capability_name`, or
    None when no sysmon is configured / hasn't been loaded yet. Lazy
    resolution against `self.capabilities` tolerates load-order: the manager
    can be constructed before the sysmon capability is loaded; later
    `_record_sample_safe` calls pick it up automatically.
    """
```

``` python
def _record_sample_safe(self, inst:CapabilityInstance, start_time:float, success:bool) -> None
    """
    CR-7: best-effort empirical sample recording.
    
    Captures worker stats at end-of-execute (proxy of peak), builds a
    ResourceSample, and records it via the EmpiricalResourceStore. Failures
    log + swallow — sample recording must never break the execute path
    (matches CR-2's `_persist_config` best-effort discipline).
    
    Stats fetch can fail naturally (e.g. worker died with WorkerOOMError —
    the proxy is unreachable). The sample still records with zero stats +
    success=False so we have a record of the failed attempt for the
    success_rate aggregate.
    
    GPU memory is attributed across the worker's process subtree via
    `attribute_gpu_to_worker_subtree` (intersecting worker-reported
    `subtree_pids` with sysmon's per-PID GPU enumeration). Pre-fix this
    function read `worker_stats["gpu_memory_mb"]` — a key the worker `/stats`
    endpoint NEVER emits — so EmpiricalResourceRecord.gpu_memory_mb_peak_max
    was silently 0 for every capability since CR-7 shipped, not just for
    subprocess-spawning ones. When no sysmon is configured, GPU memory
    records as 0.0 (honest signal that we can't measure it).
    """
```

``` python
def _get_concurrent_limiter(self, instance_id:str) -> Optional[asyncio.Semaphore]:
    """SG-33 (CR-7): lazy-create the per-instance asyncio.Semaphore.
    
    Returns None when the instance has no `max_concurrent_requests` set (the
    default — unbounded). Otherwise creates the semaphore on first call and
    caches it in `self._concurrent_limiters`. Semaphores are bound to the
    event loop they were created in; lazy creation inside `execute_capability_async`
    ensures we're inside the right loop at construction time (Python 3.10+
    semaphore-loop-binding rules).
    
    Defensive: returns None if the manager was constructed via __new__ without
    `_concurrent_limiters` being populated (test-fixture pattern).
    """
    limiters = getattr(self, '_concurrent_limiters', None)
    if limiters is None
    """
    SG-33 (CR-7): lazy-create the per-instance asyncio.Semaphore.
    
    Returns None when the instance has no `max_concurrent_requests` set (the
    default — unbounded). Otherwise creates the semaphore on first call and
    caches it in `self._concurrent_limiters`. Semaphores are bound to the
    event loop they were created in; lazy creation inside `execute_capability_async`
    ensures we're inside the right loop at construction time (Python 3.10+
    semaphore-loop-binding rules).
    
    Defensive: returns None if the manager was constructed via __new__ without
    `_concurrent_limiters` being populated (test-fixture pattern).
    """
```

``` python
def _reactive_evict_for(
    self,
    needed_meta:CapabilityMeta,
    shortfall:Optional[Any]=None,  # Optional ResourceShortfall from Track B; informational only
) -> bool
    """
    CR-7: try to free resources after a CapabilityResourceError during execute.
    
    Wraps `_evict_for_resources` with reactive-flow logging. `_evict_for_resources`
    itself extends to multi-axis + cost-aware candidate selection (drops the
    GPU-only filter, prefers evicting empirically-expensive idle capabilities).
    
    `shortfall` is recorded for log context but doesn't currently steer
    candidate selection beyond what _evict_for_resources already does via
    its `needed_meta.resources` check. A future enhancement could pass it
    through for axis-specific candidate filtering.
    """
```

``` python
def _evict_for_resources(self, needed_meta:CapabilityMeta) -> bool
    """
    Attempt to free resources by unloading/releasing idle capabilities (LRU).
    
    CR-7: extended from GPU-only LRU to multi-axis cost-aware eviction.
    - Candidate set: any loaded capability that isn't the one we're allocating
      for (drops the pre-CR-7 `requires_gpu` filter).
    - Sort key: primary = idle (older last_executed first, classic LRU);
      secondary = empirical cost when available (highest peak gets evicted
      first among equally-idle candidates). Cost axis follows the needed
      capability's `resources.requires_gpu` flag — GPU peak when we're freeing
      for a GPU capability, system memory peak otherwise.
    
    Without empirical data (no store / unmeasured capability), the secondary
    key is 0.0 and pure LRU applies. Cost-aware selection is opt-in via
    `empirical_tracking: true`.
    """
```

``` python
def execute_capability(
    self,
    name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
    *args,
    _task_name:Optional[str]=None, # CR-17 pt 2: route via the task channel (adapter task) instead of execute
    _method:Optional[str]=None, # CR-17 pt 2: adapter method (set with _task_name)
    **kwargs
) -> Any: # Capability result
    """
    Execute a capability instance's main functionality (sync).
    
    CR-10: resolves `name_or_id` via self.instances; per-instance enabled
    flag gates execution. `_running_executions` tracks by instance_id so
    concurrent multi-instance executes don't collide.
    
    CR-2: raises CapabilityDisabledError (typed) when the instance is disabled.
    
    CR-7: reactive retry on CapabilityResourceError — evicts other capabilities to
    free resources, then ALWAYS reloads the failing capability's worker before
    the retry attempt. Track A (WorkerOOMError — worker died from SIGKILL)
    needs the reload because there's no live worker to retry on. Track B
    (capability-raised CapabilityResourceError — worker still alive) ALSO reloads
    because PyTorch's CUDA caching allocator can fragment post-OOM in ways
    the capability can't clean up from within its own process; a fresh worker
    is the only reliable reset. Bounded by `self.max_retries` (default 1).
    Empirical sample recorded in the finally block — best-effort, doesn't
    break execute on failure.
    """
```

``` python
async def execute_capability_async(
    self,
    name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
    *args,
    _task_name:Optional[str]=None, # CR-17 pt 2: route via the task channel (adapter task) instead of execute
    _method:Optional[str]=None, # CR-17 pt 2: adapter method (set with _task_name)
    **kwargs
) -> Any: # Capability result
    """
    Execute a capability instance's main functionality (async).
    
    CR-10 + CR-2: same semantics as execute_capability, async-flavored. Scheduler
    allocation goes through allocate_async for non-blocking polling.
    
    CR-7 + SG-33: reactive retry on CapabilityResourceError — always reloads
    before retry (Track A + Track B converge on the same reload path; see
    sync variant docstring for the rationale). Per-instance asyncio.Semaphore
    enforces the `max_concurrent_requests` cap (None = unbounded). Empirical
    sample recorded in the finally block.
    """
```

``` python
def execute_capability_task(
    self,
    name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
    task_name:str, # Adapter task, e.g. "graph-storage"
    method:str, # Adapter method, e.g. "query_nodes"
    **kwargs
) -> Any: # Typed task result
    """
    CR-17 pt 2: execute a typed task-adapter method (explicit task channel; sync).
    
    Thin wrapper over `execute_capability` — the whole CR-7 retry / scheduler /
    empirical-sampling machinery applies identically to task-channel calls.
    """
```

``` python
async def execute_capability_task_async(
    self,
    name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
    task_name:str, # Adapter task, e.g. "graph-storage"
    method:str, # Adapter method, e.g. "query_nodes"
    **kwargs
) -> Any: # Typed task result
    """
    CR-17 pt 2: execute a typed task-adapter method (explicit task channel; async).
    
    Thin wrapper over `execute_capability_async` — CR-7 retry, SG-33 semaphore,
    admission and empirical sampling apply identically; this is the method
    the JobQueue's task-addressed jobs invoke.
    """
```

``` python
def enable_capability(
    self,
    name_or_id:str # Capability name (default instance) or instance_id (multi-instance)
) -> bool: # True if instance was enabled
    """
    Enable a capability instance (CR-10 multi-instance aware).
    
    CR-2: persists the new state via `config_store` (default-instance only;
    persistence is per-capability, not per-instance) and fires the capability's
    on_enable hook on state-change. Idempotent for already-enabled instances.
    """
```

``` python
def disable_capability(
    self,
    name_or_id:str # Capability name (default instance) or instance_id (multi-instance)
) -> bool: # True if instance was disabled
    """
    Disable a capability instance without unloading it (CR-10 multi-instance aware).
    
    CR-2: persists the new state (default-instance only) and fires the
    capability's on_disable hook — but defers the hook until any in-flight job
    for THIS instance finishes (the per-instance `_running_executions` key
    is the instance_id, so a concurrent execute on a different instance of
    the same capability doesn't gate this instance's hook).
    """
```

``` python
def get_capability_diagnostics(
    """
    Render a capability's recent diagnostics as text (CR-14; replaces
    the retired flat-log accessor — the flat `.cjm/logs/*.log` files no longer exist).
    
    A convenience TEXT projection over the diagnostics store for operator /
    UI display: structured records (level + logger name + exact job id when
    stamped) merged with the raw stream chunks (prints / tqdm final frames /
    death rattles) from this capability's worker sessions, ordered by time.
    Programmatic consumers query the stores directly
    (`manager.diagnostics_store` / `JobQueue.get_job_diagnostics`).
    """
```

``` python
def get_capability_config(
    self,
    capability_name: str # Name of the capability
) -> Optional[Dict[str, Any]]: # Current configuration or None
    "Get the current configuration of a capability."
```

``` python
def get_capability_config_schema(
    self,
    capability_name: str # Name of the capability
) -> Optional[Dict[str, Any]]: # JSON Schema or None
    "Get the configuration JSON Schema for a capability."
```

``` python
def get_config_options(
    self,
    name_or_id: str # Capability name (default instance) or instance_id (multi-instance)
) -> Dict[str, Any]: # CR-11: live config option domains, or {} if unavailable
    """
    Get a capability instance's runtime config option providers (CR-11).
    
    Forwards to the worker's get_config_options() - live enum domains +
    per-option metadata for dynamic config fields (e.g. an API model list).
    Kept separate from get_capability_config_schema (static, hashed for CR-8 drift);
    these options are the live companion the capability-config UI merges on top.
    
    Degrades to {} if the instance is missing or the worker call fails - the UI
    then falls back to the static schema. Typed-error surfacing for the UI
    consumer is deferred to the capability-config UI library (Path C Step 4).
    """
```

``` python
def get_all_capability_configs(self) -> Dict[str, Dict[str, Any]]: # Capability name -> config mapping
    """Get current configuration for all loaded capabilities."""
    return {
        name: capability.get_current_config()
    "Get current configuration for all loaded capabilities."
```

``` python
def update_capability_config(
    self,
    name_or_id: str, # Capability name (default instance) or instance_id (multi-instance)
    config: Dict[str, Any], # New configuration values
    strict: bool = True # SG-5: reject unknown keys against manifest config_schema (default)
) -> bool: # True if successful
    """
    Update a capability instance's configuration (CR-10 multi-instance aware).
    
    CR-2: on successful reconfigure, persists the new config (default instance
    only; multi-instance loads don't persist). Per-instance `inst.config` is
    updated regardless.
    SG-5: validates against the underlying capability's config_schema (per-capability,
    not per-instance, so all instances share the same schema).
    """
```

``` python
def reload_capability(
    self,
    name_or_id: str, # Capability name (default instance) or instance_id (multi-instance)
    config: Optional[Dict[str, Any]] = None # Optional new configuration
) -> bool: # True if successful
    "Reload a capability instance by terminating and restarting its Worker (CR-10)."
```

``` python
def get_capability_stats(
    self,
    name_or_id: str # Capability name (default instance) or instance_id (multi-instance)
) -> Optional[Dict[str, Any]]: # Resource telemetry or None
    "Get resource usage stats for a capability instance's Worker process (CR-10)."
```

``` python
async def execute_capability_stream(
    self,
    name_or_id: str,  # Capability name (default instance) or instance_id (multi-instance)
    *args,
    **kwargs
) -> AsyncGenerator[Any, None]:  # Async generator yielding results
    """
    Execute a capability instance with streaming response (CR-10 multi-instance aware).
    
    Same per-instance resolution as execute_capability_async; scheduler allocation
    keys off the CapabilityMeta (capability-level), execution + bookkeeping key off
    the CapabilityInstance (per-instance).
    """
```

``` python
async def load_capability_async(
    self,
    capability_meta: CapabilityMeta,
    config: Optional[Dict[str, Any]] = None,
    strict: bool = True,
    instance_id: Optional[str] = None,
    new_instance: bool = False,
) -> bool
    """
    Async variant of `load_capability` (CR-10b).
    
    Runs the existing sync `load_capability` via `asyncio.to_thread` so the
    blocking proxy spawn + `_wait_for_ready` doesn't stall the event loop.
    Backward compat: identical behavior to the sync method, just non-blocking.
    """
```

``` python
async def unload_capability_async(
    self,
    name_or_id: str,
) -> bool
    "Async variant of `unload_capability` (CR-10b)."
```

``` python
def _spec_requested_key(spec: CapabilityLoadSpec, index: int) -> str:
    """Derive the dict key the load_capabilities_concurrent result uses for `spec`.
    
    Resolution: explicit `instance_id` > `meta.name` + `#new[{index}]` suffix
    for ambiguous new_instance=True specs > `meta.name`. The suffix prevents
    key collision when multiple specs request a new instance of the same capability
    without explicit instance_ids.
    """
    if spec.instance_id is not None
    """
    Derive the dict key the load_capabilities_concurrent result uses for `spec`.
    
    Resolution: explicit `instance_id` > `meta.name` + `#new[{index}]` suffix
    for ambiguous new_instance=True specs > `meta.name`. The suffix prevents
    key collision when multiple specs request a new instance of the same capability
    without explicit instance_ids.
    """
```

``` python
async def load_capabilities_concurrent(
    self,
    specs: List[CapabilityLoadSpec],  # Per-capability load specifications
    max_concurrency: Optional[int] = None,  # Cap simultaneous loads; None = unbounded
    fail_fast: bool = False,  # Re-raise first exception (default: collect all results)
) -> Dict[str, Union[str, Exception]]:  # requested_key → instance_id or Exception
    """
    CR-10b: fan out capability loads concurrently via asyncio.gather.
    
    Each spec is loaded via `load_capability_async` (`asyncio.to_thread` under the
    hood). The total wall-clock drops from sum-of-spawns to max-of-spawns when
    `max_concurrency=None`. Capped concurrency uses an asyncio.Semaphore.
    
    Result keys come from `_spec_requested_key`: explicit `instance_id` if set,
    `{capability_name}#new[{index}]` for ambiguous new_instance specs, else
    `capability_name`. Successful entries map to the resolved instance_id (string);
    failures map to the raised exception (caught regardless of fail_fast value
    for non-fail-fast mode; re-raised in fail_fast=True).
    """
```

``` python
async def unload_capabilities_concurrent(
    self,
    name_or_ids: List[str],  # Capability names or instance_ids to unload
    max_concurrency: Optional[int] = None,
    fail_fast: bool = False,
) -> Dict[str, Union[bool, Exception]]:  # name_or_id → True or Exception
    """
    CR-10b: fan out capability unloads concurrently via asyncio.gather.
    
    Same concurrency + fail_fast semantics as load_capabilities_concurrent. Result
    keys are the input `name_or_ids` (deduplication is the caller's
    responsibility; duplicate inputs produce one dict entry per unique key).
    """
```

``` python
def bind(
    self,
    capability_name: str,  # Name of the capability to pre-bind
    default_config: Optional[Dict[str, Any]] = None  # Default config used by binding.load()
) -> CapabilityBinding:  # Bound view ready for instance-style use
    "Create a CapabilityBinding pre-bound to this manager + capability_name."
```

``` python
def get_compatible_for_current_platform(self) -> List[CapabilityMeta]:  # Capabilities compatible with current platform
    """Phase 5a: return discovered capabilities compatible with the host platform.
    
    Filters by `resources.platforms`. Capabilities with an empty (or absent)
    platforms list are considered universally compatible — that's the
    introspection-time convention when a capability author didn't declare a
    platform constraint. Capabilities lacking the entire `resources` block
    (legacy / pre-Phase-5a manifests) also pass through as universal.
    
    Does NOT filter on `requires_gpu` — substrate doesn't know whether a
    GPU is present without invoking a system monitor capability. Callers gate
    on GPU availability separately if needed.
    """
    # Late import: platform module brings in subprocess + json; defer to call time.
    """
    Phase 5a: return discovered capabilities compatible with the host platform.
    
    Filters by `resources.platforms`. Capabilities with an empty (or absent)
    platforms list are considered universally compatible — that's the
    introspection-time convention when a capability author didn't declare a
    platform constraint. Capabilities lacking the entire `resources` block
    (legacy / pre-Phase-5a manifests) also pass through as universal.
    
    Does NOT filter on `requires_gpu` — substrate doesn't know whether a
    GPU is present without invoking a system monitor capability. Callers gate
    on GPU availability separately if needed.
    """
```

#### Classes

``` python
class CapabilityManager:
    def __init__(
        self,
        capability_interface:Type[ToolCapability]=ToolCapability, # Base interface for type checking
        search_paths:Optional[List[Path]]=None, # Custom manifest search paths
        scheduler:Optional[ResourceScheduler]=None, # Resource allocation policy
        config_store:Optional[CapabilityConfigStore]=None, # CR-2: persistence backend; lazy LocalCapabilityConfigStore default per OQ-4
        empirical_store:Optional[EmpiricalResourceStore]=None, # CR-7: resource-usage tracking backend; lazy LocalEmpiricalResourceStore when cfg.substrate.empirical_tracking
        secret_store:Optional[SecretStore]=None, # CR-12: secret backend; lazy LocalSecretStore default (project-local <data_dir>/secrets)
        max_retries:int=1, # CR-7: how many reactive retries to attempt on CapabilityResourceError (default 1 — one retry after eviction)
        sysmon_capability_name:Optional[str]=None, # monitor capability (CR-3) name for GPU subtree attribution; default-None records skip GPU attribution (compute axis only)
        journal_store:Optional[JournalStore]=None, # CR-14: durable account-of-action; lazy LocalJournalStore at <data_dir>/journal.db
        diagnostics_store:Optional[DiagnosticsStore]=None # CR-14: disposable diagnostic narrative; lazy LocalDiagnosticsStore at <data_dir>/diagnostics.db
    )
    "Manages capability discovery, loading, and lifecycle via process isolation."
    
    def __init__(
            self,
            capability_interface:Type[ToolCapability]=ToolCapability, # Base interface for type checking
            search_paths:Optional[List[Path]]=None, # Custom manifest search paths
            scheduler:Optional[ResourceScheduler]=None, # Resource allocation policy
            config_store:Optional[CapabilityConfigStore]=None, # CR-2: persistence backend; lazy LocalCapabilityConfigStore default per OQ-4
            empirical_store:Optional[EmpiricalResourceStore]=None, # CR-7: resource-usage tracking backend; lazy LocalEmpiricalResourceStore when cfg.substrate.empirical_tracking
            secret_store:Optional[SecretStore]=None, # CR-12: secret backend; lazy LocalSecretStore default (project-local <data_dir>/secrets)
            max_retries:int=1, # CR-7: how many reactive retries to attempt on CapabilityResourceError (default 1 — one retry after eviction)
            sysmon_capability_name:Optional[str]=None, # monitor capability (CR-3) name for GPU subtree attribution; default-None records skip GPU attribution (compute axis only)
            journal_store:Optional[JournalStore]=None, # CR-14: durable account-of-action; lazy LocalJournalStore at <data_dir>/journal.db
            diagnostics_store:Optional[DiagnosticsStore]=None # CR-14: disposable diagnostic narrative; lazy LocalDiagnosticsStore at <data_dir>/diagnostics.db
        )
        "Initialize the capability manager."
```

``` python
@dataclass
class CapabilityBinding:
    """
    Pre-bound view of a single capability through a shared CapabilityManager.
    
    Eliminates the wrapper-class duplication audited across 8 consumer services
    (SG-17). Methods forward to the manager with `capability_name` pre-supplied;
    `default_config` is the fallback used when `load()` is called without an
    explicit config (matches the manifest-default behavior in `load_capability`).
    """
    
    manager: 'CapabilityManager'  # The shared CapabilityManager
    capability_name: str  # Name of the capability this binding targets
    default_config: Dict[str, Any] = _field(default_factory=dict)  # Used when load() called without config
    
    def meta(self) -> Optional[CapabilityMeta]:
            """The CapabilityMeta if the capability is loaded, else None."""
            return self.manager.get_capability_meta(self.capability_name)
        
        @property
        def is_loaded(self) -> bool
        "The CapabilityMeta if the capability is loaded, else None."
    
    def is_loaded(self) -> bool:
            """True if the capability is loaded in the bound manager."""
            return self.manager.get_capability(self.capability_name) is not None
        
        @property
        def is_enabled(self) -> bool
        "True if the capability is loaded in the bound manager."
    
    def is_enabled(self) -> bool:
            """True if the capability is loaded AND not currently disabled."""
            m = self.meta
            return m is not None and m.enabled
        
        # --- Lifecycle ---
        
        def load(
            self,
            config: Optional[Dict[str, Any]] = None,  # Override default_config when provided
            strict: bool = True  # SG-5 strict validation
        ) -> bool:  # True if loaded successfully
        "True if the capability is loaded AND not currently disabled."
    
    def load(
            self,
            config: Optional[Dict[str, Any]] = None,  # Override default_config when provided
            strict: bool = True  # SG-5 strict validation
        ) -> bool:  # True if loaded successfully
        "Load via the bound manager. Uses `default_config` if no `config` provided."
    
    def unload(self) -> bool:  # True if unloaded
            """Unload the bound capability."""
            return self.manager.unload_capability(self.capability_name)
        
        def reload(
            self,
            config: Optional[Dict[str, Any]] = None  # Optional new config; current config used if None
        ) -> bool
        "Unload the bound capability."
    
    def reload(
            self,
            config: Optional[Dict[str, Any]] = None  # Optional new config; current config used if None
        ) -> bool
        "Reload the bound capability (terminate + restart worker)."
    
    def enable(self) -> bool:
            """Enable the bound capability."""
            return self.manager.enable_capability(self.capability_name)
        
        def disable(self) -> bool
        "Enable the bound capability."
    
    def disable(self) -> bool:
            """Disable the bound capability (worker stays alive; jobs rejected)."""
            return self.manager.disable_capability(self.capability_name)
        
        # --- Execution ---
        
        def execute(self, *args, **kwargs) -> Any
        "Disable the bound capability (worker stays alive; jobs rejected)."
    
    def execute(self, *args, **kwargs) -> Any:
            """Execute via the bound manager (sync)."""
            return self.manager.execute_capability(self.capability_name, *args, **kwargs)
        
        async def execute_async(self, *args, **kwargs) -> Any
        "Execute via the bound manager (sync)."
    
    async def execute_async(self, *args, **kwargs) -> Any:
            """Execute via the bound manager (async)."""
            return await self.manager.execute_capability_async(self.capability_name, *args, **kwargs)
        
        # --- Configuration ---
        
        def update_config(
            self,
            config: Dict[str, Any],  # New config values
            strict: bool = True  # SG-5 strict validation
        ) -> bool
        "Execute via the bound manager (async)."
    
    def update_config(
            self,
            config: Dict[str, Any],  # New config values
            strict: bool = True  # SG-5 strict validation
        ) -> bool
        "Hot-reload the bound capability's configuration."
    
    def get_config(self) -> Optional[Dict[str, Any]]:
            """Current configuration values (None if not loaded)."""
            return self.manager.get_capability_config(self.capability_name)
        
        def get_config_schema(self) -> Optional[Dict[str, Any]]
        "Current configuration values (None if not loaded)."
    
    def get_config_schema(self) -> Optional[Dict[str, Any]]:
            """JSON Schema describing this capability's configuration."""
            return self.manager.get_capability_config_schema(self.capability_name)
        
        def get_stats(self) -> Optional[Dict[str, Any]]
        "JSON Schema describing this capability's configuration."
    
    def get_stats(self) -> Optional[Dict[str, Any]]
        "Resource telemetry for the bound capability's worker process."
```

``` python
class _CR10StubProxy:
    def __init__(self, name="stub"):
        self._name = name
        self.execute_calls = []
        self.on_disable_calls = 0
        self.on_enable_calls = 0
    @property
    def name(self): return self._name
    "Stand-in proxy tracking execute calls + hook fires for verification."
    
    def __init__(self, name="stub"):
            self._name = name
            self.execute_calls = []
            self.on_disable_calls = 0
            self.on_enable_calls = 0
        @property
        def name(self): return self._name
    
    def name(self): return self._name
        @property
        def version(self): return "0.0.1"
    
    def version(self): return "0.0.1"
        def initialize(self, config): self._config = dict(config or {})
    
    def initialize(self, config): self._config = dict(config or {})
        def execute(self, *args, **kwargs)
    
    def execute(self, *args, **kwargs):
            self.execute_calls.append((args, kwargs))
            return {"who": self._name, "args": args, "kwargs": kwargs}
    
    def get_config_schema(self): return {}
        def get_current_config(self): return {}
    
    def get_current_config(self): return {}
        def cleanup(self): pass
    
    def cleanup(self): pass
        def on_disable(self): self.on_disable_calls += 1
    
    def on_disable(self): self.on_disable_calls += 1
        def on_enable(self): self.on_enable_calls += 1
    
    def on_enable(self): self.on_enable_calls += 1
```

### Manifest Format (v2.0) (`manifest_format.ipynb`)

> Typed parser + writer for the nested v2.0 manifest layout per the
> 2026-05-19 substrate audit’s CR-8. Substrate manifests transitioned
> from a flat top-level JSON object to a four-section nested layout:
> `install` (deployment-specific facts populated at install time),
> `code` (code-derived facts refreshed by
> `cjm-ctl regenerate-manifest`), `drift_tracking` (a config_schema hash
> that records the witness shape so live-vs-stored comparisons can
> detect drift), and `overrides` (an operator-supplied overlay
> placeholder).

#### Import

``` python
from cjm_substrate.core.manifest_format import (
    CURRENT_FORMAT_VERSION,
    InstallSection,
    CodeSection,
    DriftTracking,
    ManifestV2,
    compute_config_schema_hash,
    compute_structural_surface_hash,
    load_manifest,
    manifest_to_dict,
    write_manifest
)
```

#### Functions

``` python
def compute_config_schema_hash(
    schema: Optional[Dict[str, Any]],  # JSON Schema or None
) -> str:                              # "sha256:hexdigest"
    """
    Hash a JSON Schema with stable canonicalization.
    
    None is treated as `{}` — the hash records "no schema declared" rather
    than refusing. This way a capability that lost its config_schema between
    install and load still gets a drift warning rather than a crash.
    """
```

``` python
def compute_structural_surface_hash(
    surface: Optional[Dict[str, Any]],  # derive_structural_surface output or None
) -> str:                               # "sha256:hexdigest"
    """
    Hash a structural surface with stable canonicalization.
    
    Same canonical-JSON + hash_bytes shape as `compute_config_schema_hash`
    (the CR-8 idiom). None hashes as `{}` — but note the drift check skips
    when the STORED hash is None (pre-surface-era manifest ≠ drift);
    `_generate_manifest` only writes a hash when a surface was recorded.
    """
```

``` python
def _parse_resources_dict(d: Optional[Dict[str, Any]]) -> Optional[ResourceRequirements]:
    """Build a `ResourceRequirements` from its JSON sub-dict, or None."""
    if not d
    "Build a `ResourceRequirements` from its JSON sub-dict, or None."
```

``` python
def _from_v2_dict(
    data: Dict[str, Any],  # Parsed JSON dict with `format_version == "2.0"`
) -> ManifestV2
    "Parse a v2.0 nested manifest dict into a typed `ManifestV2`."
```

``` python
def load_manifest(
    path: Union[str, Path],  # Path to manifest JSON file on disk
) -> ManifestV2:             # Parsed manifest in v2.0 typed shape
    """
    Load a manifest file and return a typed `ManifestV2`.
    
    Format detection by top-level `format_version` key:
    - `"2.0"` → nested layout, parse directly.
    - anything else (including missing) → ValueError (fail loud).
    """
```

``` python
def _resources_to_dict(r: Optional[ResourceRequirements]) -> Optional[Dict[str, Any]]:
    """Serialize a `ResourceRequirements` back to its JSON sub-dict, or None."""
    if r is None
    "Serialize a `ResourceRequirements` back to its JSON sub-dict, or None."
```

``` python
def _code_section_to_dict(c: CodeSection) -> Dict[str, Any]:
    """Serialize a `CodeSection` to its JSON sub-dict, renaming `class_name` -> `class`."""
    d: Dict[str, Any] = {
    "Serialize a `CodeSection` to its JSON sub-dict, renaming `class_name` -> `class`."
```

``` python
def manifest_to_dict(
    m: ManifestV2,  # Manifest to serialize
) -> Dict[str, Any]:  # v2.0 nested dict ready for `json.dumps`
    """
    Serialize a `ManifestV2` to a v2.0 dict.
    
    Always emits `format_version == CURRENT_FORMAT_VERSION`.
    """
```

``` python
def write_manifest(
    path: Union[str, Path],  # Output JSON file path
    manifest: ManifestV2,    # Manifest to serialize
) -> None
    "Serialize a `ManifestV2` to disk in v2.0 nested layout (indent=2)."
```

#### Classes

``` python
@dataclass
class InstallSection:
    """
    Deployment-specific facts populated at install time.
    
    These fields are written by `install_all` (paths, conda env, env vars)
    plus `_generate_manifest`'s post-introspection step (installed_at,
    installer_version, package_source). `regenerate-manifest` preserves
    the install section across regeneration so paths survive code-side
    refreshes.
    """
    
    python_path: str = ''  # Absolute path to the capability env's python interpreter
    conda_env: str = ''  # Conda environment name
    db_path: str = ''  # Capability's per-data SQLite path (if any)
    env_vars: Dict[str, str] = field(...)  # Per-capability env vars
    installed_at: str = ''  # ISO-8601 UTC timestamp of install/regen
    installer_version: str = ''  # "cjm-ctl <version>" that wrote this manifest
    package_source: str = ''  # Original install input (git URL or pip spec)
```

``` python
@dataclass
class CodeSection:
    """
    Code-derived facts refreshed by `cjm-ctl regenerate-manifest`.
    
    Everything in this section comes from running the introspection script
    inside the capability's conda env: metadata + config_schema + binary
    platform/hardware hard-facts. Drift detection hashes this section's
    `config_schema` field as its witness shape.
    
    `class_name` serializes as the JSON key `"class"` (Python reserved-word
    workaround).
    """
    
    name: str = ''  # Capability's unique identifier
    version: str = ''  # Capability's version string
    description: str = ''  # Brief description (SG-6 required)
    module: str = ''  # Importable module path for the capability class
    class_name: str = ''  # Capability class name (JSON key: "class")
    resources: Optional[ResourceRequirements]  # Phase 5a hard-facts
    config_schema: Optional[Dict[str, Any]]  # JSON Schema for capability config
    regenerated_at: Optional[str]  # ISO-8601 UTC of last regen
    worker_env: Optional[List[Dict[str, Any]]]  # CR-12 spawn-env contract: asdict(EnvVarSpec) list
    structural_surface: Optional[Dict[str, Any]]  # Pass-2 Thread 3: public surface recorded in-env (methods/properties/attributes)
```

``` python
@dataclass
class DriftTracking:
    """
    Witness hashes for drift detection.
    
    `config_schema_hash` is computed at write time (regenerate-manifest /
    install_all) from a canonical JSON encoding of the code section's
    `config_schema`. The CapabilityManager's drift-check fetches the live
    `/config_schema` from the worker, hashes it the same way, and compares;
    a mismatch raises `CapabilityMeta.config_schema_drift = True` plus a
    warning log.
    """
    
    config_schema_hash: Optional[str]  # "sha256:hexdigest" of canonical config_schema
    structural_surface_hash: Optional[str]  # Pass-2 Thread 3 witness: hash of code.structural_surface (None = pre-surface manifest)
```

``` python
@dataclass
class ManifestV2:
    """
    Top-level v2.0 manifest with four named sections plus `format_version`.
    
    Loaded from a v2.0 nested JSON file as-is; `format_version` is always
    `CURRENT_FORMAT_VERSION`.
    """
    
    install: InstallSection = field(...)
    code: CodeSection = field(...)
    drift_tracking: DriftTracking = field(...)
    overrides: Dict[str, Any] = field(...)
    format_version: str = CURRENT_FORMAT_VERSION
```

#### Variables

``` python
CURRENT_FORMAT_VERSION = '2.0'  # Emitted on every freshly-written manifest
```

### Capability Metadata (`metadata.ipynb`)

> Data structures for capability metadata

#### Import

``` python
from cjm_substrate.core.metadata import (
    ResourceRequirements,
    CapabilityMeta,
    CapabilityInstance,
    CapabilityLoadSpec
)
```

#### Classes

``` python
@dataclass
class ResourceRequirements:
    """
    Binary hard-facts about what a capability needs to run (Phase 5a).
    
    Quantitative resource amounts (min_vram_mb, etc.) deliberately omitted
    per CR-7's reactive resource management reframing — capability authors can't
    reliably estimate model × dtype × quantization combinatorics, and Blender-
    style variable-render capabilities can't estimate at all. The substrate uses
    these binary hard-facts purely for discovery filtering; actual resource
    contention is handled reactively by CR-7's eviction + retry flow.
    
    - `requires_gpu`: True if the capability needs any GPU; the substrate gates
      execution on a system monitor reporting one is present.
    - `platforms`: e.g., ["linux-x64", "darwin-arm64"]. Empty list means no
      platform constraint declared (assume universal compatibility).
    - `accelerators`: e.g., ["cuda", "mps", "cpu"]. Informational; substrate
      doesn't auto-select but consumers can filter on the values.
    """
    
    requires_gpu: bool = False
    platforms: List[str] = field(...)
    accelerators: List[str] = field(...)
```

``` python
@dataclass
class CapabilityMeta:
    "Metadata about a capability."
    
    name: str  # Capability's unique identifier
    version: str  # Capability's version string
    description: str = ''  # Brief description of the capability's functionality
    resources: Optional['ResourceRequirements']
    config_schema: Optional[Dict[str, Any]]  # JSON Schema for capability configuration
    instance: Optional[Any]  # Capability instance (ToolCapability subclass)
    enabled: bool = True  # Whether the capability is enabled
    last_executed: float = 0.0  # Unix timestamp
    config_schema_drift: bool = False
    live_config_schema: Optional[Dict[str, Any]]
    structural_surface_drift: bool = False
```

``` python
@dataclass
class CapabilityInstance:
    """
    Per-instance runtime state for a loaded capability (CR-10 multi-instance).
    
    Differs from CapabilityMeta in scope:
    - CapabilityMeta is per-capability-name discovery + canonical-instance state.
    - CapabilityInstance is per-load-call runtime state.
    
    A capability loaded with no instance_id (default) gets `instance_id == capability_name`
    and is the canonical instance referenced by CapabilityMeta.instance. Multi-instance
    loads (instance_id != capability_name) add entries to CapabilityManager.instances
    without changing the canonical reference.
    """
    
    instance_id: str  # Unique key in CapabilityManager.instances; default = capability_name
    capability_name: str  # The underlying discovered capability's name (CapabilityMeta.name)
    config: Dict[str, Any] = field(...)  # Effective config used at initialize()
    proxy: Optional[Any]
    enabled: bool = True  # Per-instance enable flag; substrate's execute_capability checks this
    last_executed: float = 0.0  # Unix timestamp of the most recent execute on this instance
    created_at: datetime = field(...)
    config_hash: str = ''
    max_concurrent_requests: Optional[int]
```

``` python
@dataclass
class CapabilityLoadSpec:
    """
    One entry in `CapabilityManager.load_capabilities_concurrent`'s batch input (CR-10).
    
    Mirrors the positional arguments of `load_capability` so the concurrent helper
    can fan out load calls without repeating the per-spec instance_id /
    new_instance plumbing.
    
    - `meta`: the discovered CapabilityMeta to load (must have a `.manifest` attached).
    - `config`: initial configuration; falls through to persisted-or-schema-defaults
      when None (default-instance only; multi-instance starts fresh).
    - `instance_id`: explicit instance_id (validated against [A-Za-z0-9_-]{1,64}).
      None defaults to capability_name (single-instance backward compat).
    - `new_instance`: when True with instance_id=None, auto-generate
      `{capability_name}-{6-hex}`.
    """
    
    meta: Any  # CapabilityMeta — typed as Any to avoid forward-reference quirk under nbdev's late binding
    config: Optional[Dict[str, Any]]
    instance_id: Optional[str]
    new_instance: bool = False
```

### Platform Utilities (`platform.ipynb`)

> Cross-platform utilities for process management, path handling, and
> system detection

#### Import

``` python
from cjm_substrate.core.platform import (
    MICROMAMBA_URLS,
    is_windows,
    is_macos,
    is_linux,
    is_apple_silicon,
    get_current_platform,
    get_python_in_env,
    get_popen_isolation_kwargs,
    terminate_process,
    terminate_self,
    run_shell_command,
    conda_env_exists,
    get_micromamba_download_url,
    download_micromamba,
    get_conda_command,
    build_conda_command,
    get_micromamba_binary_path,
    ensure_runtime_available
)
```

#### Functions

``` python
def is_windows() -> bool:
    """Check if running on Windows."""
    return platform.system() == "Windows"


def is_macos() -> bool
    "Check if running on Windows."
```

``` python
def is_macos() -> bool:
    """Check if running on macOS."""
    return platform.system() == "Darwin"


def is_linux() -> bool
    "Check if running on macOS."
```

``` python
def is_linux() -> bool:
    """Check if running on Linux."""
    return platform.system() == "Linux"


def is_apple_silicon() -> bool
    "Check if running on Linux."
```

``` python
def is_apple_silicon() -> bool
    "Check if running on Apple Silicon Mac (for MPS detection)."
```

``` python
def get_current_platform() -> str:
    """Get current platform string for manifest filtering.
    
    Returns strings like 'linux-x64', 'darwin-arm64', 'win-x64'.
    """
    system = platform.system().lower()
    machine = platform.machine().lower()
    
    # Normalize system names
    if system == "darwin"
    """
    Get current platform string for manifest filtering.
    
    Returns strings like 'linux-x64', 'darwin-arm64', 'win-x64'.
    """
```

``` python
def get_python_in_env(
    env_path: Path  # Path to conda environment root
) -> Path:  # Path to Python executable
    """
    Get the Python executable path for a conda environment.
    
    On Windows: env_path/python.exe
    On Unix: env_path/bin/python
    """
```

``` python
def get_popen_isolation_kwargs() -> Dict[str, Any]:
    """Return kwargs for process isolation in subprocess.Popen.
    
    On Unix: Returns {'start_new_session': True}
    On Windows: Returns {'creationflags': CREATE_NEW_PROCESS_GROUP}
    
    Usage:
        process = subprocess.Popen(cmd, **get_popen_isolation_kwargs(), ...)
    """
    if is_windows()
    """
    Return kwargs for process isolation in subprocess.Popen.
    
    On Unix: Returns {'start_new_session': True}
    On Windows: Returns {'creationflags': CREATE_NEW_PROCESS_GROUP}
    
    Usage:
        process = subprocess.Popen(cmd, **get_popen_isolation_kwargs(), ...)
    """
```

``` python
def terminate_process(
    process: subprocess.Popen,  # Process to terminate (must be a session/group leader for subtree kill)
    timeout: float = 2.0  # Seconds to wait before force kill
) -> None
    """
    Terminate a subprocess + its entire process subtree (grandchildren, etc).
    
    Session A 2026-05-27: enhanced from worker-only termination to FULL subtree
    termination. Workers are spawned with `get_popen_isolation_kwargs()` which
    sets `start_new_session=True` on Unix → the worker is its own session leader
    and ALL of its descendants inherit the same process-group ID (unless they
    setsid themselves, which is rare). `os.killpg(worker_pid, SIGTERM/SIGKILL)`
    sends the signal to every process in that group atomically — closes the
    orphan-grandchild bug surfaced by Voxtral-vLLM (vLLM api_server spawned its
    own EngineCore subprocess; pre-fix, the worker terminated cleanly but vLLM
    + EngineCore kept running as orphans, eating GPU memory until manual kill).
    
    Strategy on Unix:
      1. SIGTERM the worker's process group via os.killpg (atomic).
      2. Wait up to `timeout` for the worker to exit.
      3. If anything still alive, SIGKILL the process group.
      4. psutil-based safety sweep for any process that setsid-ed away from the
         original group (rare but possible — e.g., a poorly-isolated subprocess).
    
    Strategy on Windows:
      1. process.terminate() + wait + kill (legacy path). True process-group
         signaling on Windows requires Job Objects which the substrate doesn't
         currently wire — Windows users are advised to avoid capabilities that
         spawn subprocesses until that's added. (TODO: track as substrate gap.)
    """
```

``` python
def terminate_self() -> None:
    """Terminate the current process (for worker suicide pact).
    
    On Unix: Sends SIGTERM to self for graceful shutdown
    On Windows: Calls os._exit() since Windows lacks SIGTERM
    """
    if is_windows()
    """
    Terminate the current process (for worker suicide pact).
    
    On Unix: Sends SIGTERM to self for graceful shutdown
    On Windows: Calls os._exit() since Windows lacks SIGTERM
    """
```

``` python
def run_shell_command(
    cmd: str,  # Shell command to execute
    check: bool = True,  # Whether to raise on non-zero exit
    capture_output: bool = False,  # Whether to capture stdout/stderr
    **kwargs  # Additional kwargs passed to subprocess.run
) -> subprocess.CompletedProcess
    """
    Run a shell command cross-platform.
    
    Unlike using shell=True with executable='/bin/bash', this function
    uses the platform's default shell:
    - Linux/macOS: /bin/sh (or $SHELL)
    - Windows: cmd.exe
    """
```

``` python
def conda_env_exists(
    env_name: str,  # Name of the conda environment
    conda_cmd: str = "conda"  # Conda command (conda, mamba, micromamba)
) -> bool
    """
    Check if a conda environment exists (cross-platform).
    
    Uses 'conda env list --json' instead of piping to grep,
    which doesn't work on Windows.
    """
```

``` python
def get_micromamba_download_url(
    platform_str: Optional[str] = None  # Platform string (e.g., 'linux-x64'), uses current if None
) -> str:  # Download URL for micromamba binary
    "Get the micromamba download URL for the specified or current platform."
```

``` python
def download_micromamba(
    dest_path: Path,  # Destination path for the micromamba binary
    platform_str: Optional[str] = None,  # Platform string, uses current if None
    show_progress: bool = True  # Whether to print progress messages
) -> bool:  # True if download succeeded
    "Download and extract micromamba binary to the specified path."
```

``` python
def get_conda_command(
    config: CJMConfig  # Configuration object with runtime settings
) -> List[str]:  # Base command with prefix args if needed
    "Get the conda/mamba/micromamba base command with prefix args for local mode."
```

``` python
def build_conda_command(
    config: CJMConfig,  # Configuration object with runtime settings
    *args: str  # Additional command arguments
) -> List[str]:  # Complete command ready for subprocess
    "Build a complete conda/mamba/micromamba command."
```

``` python
def get_micromamba_binary_path(
    config: CJMConfig  # Configuration object with runtime settings
) -> Optional[Path]:  # Path to micromamba binary or None
    "Get the configured micromamba binary path for the current platform."
```

``` python
def ensure_runtime_available(
    config: CJMConfig  # Configuration object with runtime settings
) -> bool:  # True if runtime is available
    "Check if the configured conda/micromamba runtime is available."
```

#### Variables

``` python
MICROMAMBA_URLS: Dict[str, str]
```

### Composition Ports (`ports.ipynb`)

> Capability compositions as DAGs of invocation nodes with typed
> input/output

#### Import

``` python
from cjm_substrate.core.ports import (
    NodeState,
    TERMINAL_NODE_STATES,
    OutputRef,
    CompositionNode,
    Composition,
    CompositionValidationError,
    CompositionBindingError,
    validate_composition,
    extract_output_field,
    resolve_node_kwargs,
    CompositionNodeRun,
    CompositionRun,
    new_composition_run
)
```

#### Functions

``` python
def _node_dependencies(
    node: CompositionNode,  # Node whose kwargs are scanned for OutputRef markers
) -> Set[str]:  # Producer node ids this node depends on
    """
    Derive a node's dependencies from the `OutputRef` markers in its kwargs.
    
    Top-level kwarg values only (see `CompositionNode` docstring); duplicate
    references to the same producer collapse into one dependency edge.
    """
```

``` python
def validate_composition(
    comp: Composition,  # Composition to validate
) -> Dict[str, Set[str]]:  # node_id -> set of upstream node ids (the derived DAG)
    """
    Validate a composition and return its derived dependency map.
    
    Raises `CompositionValidationError` on duplicate node ids, `OutputRef`
    targets that name no node in the composition, or dependency cycles.
    An empty composition is valid (returns `{}`) — the queue completes it
    at submit, mirroring the empty-sequence totality precedent.
    """
```

``` python
def extract_output_field(
    """
    Extract a field from an upstream result for binding into a kwarg.
    
    The single substrate-owned successor of the retired `field_of` helpers:
    dicts resolve by KEY (intent for capability-side dict results), everything
    else by ATTRIBUTE (typed wire DTOs). Missing fields raise
    `CompositionBindingError` loudly — silent shape-shifting is what stage 2
    retired (F12 fail-loudly posture).
    """
```

``` python
def resolve_node_kwargs(
    """
    Materialize a node's kwargs by resolving its `OutputRef` markers.
    
    Called by the executor at the moment a node becomes ready (all
    dependencies completed) — this is where execution-time binding actually
    happens. Static kwargs pass through untouched.
    """
```

``` python
def new_composition_run(
    comp: Composition,  # Composition to run (validated here)
    run_id: str,  # Run UUID (assigned by the queue)
) -> CompositionRun:  # Fresh run record with derived topology
    "Validate a composition and build its run record."
```

``` python
def ready_nodes(
    self: CompositionRun,
) -> List[str]:  # Node ids that are pending with all dependencies completed
    """
    Nodes whose member Jobs can be created right now.
    
    Scan order follows the composition's node order (readability +
    deterministic dispatch among equally-ready nodes).
    """
```

``` python
def record_started(
    "Mark a node running and bind it to its member Job."
```

``` python
def record_result(
    self: CompositionRun,
    node_id: str,  # Node whose member Job reached terminal status
    state: NodeState,  # completed / failed / cancelled
    result: Any = None,  # Member job result (if completed)
    error: Optional[JobError] = None,  # Structured failure (if failed/cancelled)
) -> None
    "Record a member job's terminal outcome on its node."
```

``` python
def skip_dependents(
    self: CompositionRun,
    node_id: str,  # Node whose failure/cancellation poisons its downstream
) -> List[str]:  # Node ids newly marked skipped (transitive)
    """
    Mark every still-pending transitive dependent of `node_id` skipped.
    
    Skipped nodes never get a Job — their inputs can never exist. Runs
    regardless of fail_fast (dependents are unrunnable either way; fail_fast
    only governs INDEPENDENT pending members, which the executor cancels).
    """
```

``` python
def all_terminal(
    self: CompositionRun,
) -> bool:  # True when every node is in a terminal state
    "Whether the composition has nothing left to run or wait for."
```

``` python
def derive_terminal_status(
    self: CompositionRun,
) -> NodeState:  # cancelled / failed / completed
    """
    Derive the composition-level terminal status from member outcomes.
    
    Precedence:
    1. USER cancel intent (`cancel_requested`) dominates everything.
    2. A member failure under fail_fast lands the run `failed` — the
       executor's housekeeping cancels of independent pending members do NOT
       flip it to cancelled (that's what `cancel_requested` distinguishes).
    3. A directly-cancelled member (job-level cancel, no failure, no
       composition intent) lands the run `cancelled`.
    4. Otherwise `completed` — including best-effort (fail_fast=False) runs
       with failed members: "we attempted everything", matching the sequence
       semantics this replaces. Per-node outcomes stay inspectable on
       `node_runs` either way. (`skipped` never appears without a failed or
       cancelled member upstream of it, so it needs no clause of its own.)
    """
```

``` python
def results_by_node(
    self: CompositionRun,
) -> Dict[str, Any]:  # node_id -> result, for completed nodes only
    """
    Completed members' results keyed by node id (what host folds consume,
    and what `resolve_node_kwargs` reads at advancement time).
    """
```

#### Classes

``` python
class NodeState(str, Enum):
    """
    State of one composition node (and, for the terminal subset, of a
    whole composition run).
    
    `skipped` is composition-specific: a node whose transitive dependencies
    failed/cancelled can never run (its inputs will never exist) and is
    recorded as skipped rather than getting a Job at all. Composition-level
    status uses the running/completed/failed/cancelled subset.
    """
```

``` python
class OutputRef:
    """
    Binding marker: this kwarg's value comes from an upstream node's result.
    
    Placed directly in a `CompositionNode.kwargs` value position. `field=None`
    binds the WHOLE result (fan-in folds); a field name extracts one field via
    `extract_output_field` (dict key or typed-result attribute). Frozen so
    markers are hashable + safely shareable across nodes.
    """
```

``` python
@dataclass
class CompositionNode:
    """
    One capability invocation in a composition.
    
    `kwargs` mixes static values with `OutputRef` markers; the markers are
    scanned (top-level values only — nested containers are not searched, by
    design: evidence needs single-position bindings, and a nested-marker
    grammar is seam-admitted later) to derive the node's dependencies.
    """
    
    id: str  # Unique node id within the composition
    capability_instance_id: str  # Target capability instance
    kwargs: Dict[str, Any] = field(...)  # Static values + OutputRef markers
    priority: int = 0  # Per-node priority override (0 = inherit composition priority)
    task_name: Optional[str]  # Task-channel address: adapter task (stage 4; None = execute channel)
    method: Optional[str]  # Task-channel address: adapter method (set with task_name)
    control: Dict[str, Any] = field(...)  # Per-call control flags (force/cache-bypass); threaded into the member Job's CallEnvelope.control
```

``` python
@dataclass
class Composition:
    """
    A static DAG of capability-invocation nodes, submitted as one unit.
    
    `fail_fast=True` (default, matching the audit-locked sequence default):
    on a member failure, pending independent members are cancelled, in-flight
    members run to completion, transitive dependents are skipped, and the
    composition lands `failed`. `fail_fast=False` is best-effort: independent
    members continue; only transitive dependents of the failure are skipped.
    
    `run_id` / `actor` (CR-14 follow-up) are host-tier correlation tags
    stamped onto every lazily-created member Job (the `submit(run_id=,
    actor=)` analog for compositions) — NOT the composition run's own id,
    which the queue assigns at submit.
    """
    
    nodes: List[CompositionNode]  # The invocation nodes (order = readability + ready-scan order)
    fail_fast: bool = True  # Halt independent pending members on first failure
    priority: int = 0  # Composition-level priority (per-node override possible)
    run_id: Optional[str]  # Host-tier run correlation for member Jobs (CR-14 follow-up)
    actor: Optional[str]  # Who/what initiated the work (CR-14 follow-up)
```

``` python
class CompositionValidationError(ValueError):
    """
    A composition failed submit-time validation (duplicate ids, unresolved
    `OutputRef` targets, or a dependency cycle).
    """
```

``` python
class CompositionBindingError(RuntimeError):
    """
    An `OutputRef` could not be resolved against the producer's recorded
    result at execution time (missing producer result, missing key/attribute).
    """
```

``` python
@dataclass
class CompositionNodeRun:
    "Live state of one node within a composition run."
    
    node_id: str  # The CompositionNode this tracks
    state: NodeState = NodeState.pending  # Current node state
    job_id: Optional[str]  # Member Job id (set when the node starts)
    result: Any  # Member job result (if completed)
    error: Optional[JobError]  # Structured failure summary (if failed/cancelled)
```

``` python
@dataclass
class CompositionRun:
    """
    Tracks a submitted composition through execution (lives in
    `JobQueue._compositions`).
    
    Carries the validated dependency map (and its reverse) so advancement
    decisions are O(edges) lookups for the rest of the run. Composition-level
    `status` reuses the NodeState terminal subset: starts `running`,
    transitions to completed / failed / cancelled via
    `derive_terminal_status` once `all_terminal()`.
    
    `cancel_requested` records USER cancel intent (`cancel_composition`),
    distinguishing it from the executor's fail-fast HOUSEKEEPING cancels of
    independent pending members after a failure — without the flag, a
    failure-driven run would derive `cancelled` instead of `failed` because
    its housekeeping cancels would dominate.
    """
    
    id: str  # Composition run UUID
    composition: Composition  # The submitted spec (immutable post-submit)
    deps: Dict[str, Set[str]]  # node_id -> upstream node ids (validated)
    dependents: Dict[str, Set[str]]  # node_id -> downstream node ids (reverse of deps)
    nodes_by_id: Dict[str, CompositionNode]  # Spec lookup for the executor
    node_runs: Dict[str, CompositionNodeRun]  # Per-node live state
    status: NodeState = NodeState.running  # Composition-level status
    cancel_requested: bool = False  # True once cancel_composition is called (user intent)
    submitted_at: datetime = field(...)
    completed_at: Optional[datetime]  # Set when the run reaches terminal status
```

#### Variables

``` python
TERMINAL_NODE_STATES
```

### Remote Capability Proxy (`proxy.ipynb`)

> Bridge between Host application and isolated Worker processes

#### Import

``` python
from cjm_substrate.core.proxy import (
    RemoteCapabilityProxy,
    execute_async,
    execute_stream_sync,
    execute_stream,
    execute_with_oom_check,
    execute_async_with_oom_check,
    execute_task,
    execute_task_async,
    get_stats,
    is_alive,
    get_structural_surface,
    cancel,
    cancel_async,
    get_progress,
    get_progress_async,
    on_disable,
    on_enable,
    get_system_status,
    get_system_status_async,
    list_processes,
    list_processes_async,
    prefetch,
    prefetch_async,
    reconfigure,
    reconfigure_async
)
```

#### Functions

``` python
@patch
def _bind_listen_socket(self:RemoteCapabilityProxy) -> Tuple[socket.socket, int]
    """
    Bind a listening socket on a kernel-chosen ephemeral port.
    The socket is kept open so its FD can be inherited by the worker
    subprocess (Unix). Returns (socket, port).
    """
```

``` python
def _pump_stream(
    """
    Pump a worker's raw output to the diagnostics store (CR-14).
    
    The zero-cooperation death-rattle floor: captures everything the worker
    process writes outside the structured handler — bare prints, native-lib
    output, tqdm, argparse/startup failures BEFORE logging exists, and the
    final traceback of a hard crash. Runs as a daemon thread; ends at EOF
    (worker exit). Attribution is the worker SESSION, never a job — raw
    streams cannot be job-attributed honestly under same-worker concurrency
    (the stage-3 lesson that killed the timestamp-window heuristic).
    
    tqdm CR-frames collapse to their final frame via `normalize_stream_line`
    (liveness telemetry is not durable). Failures degrade to dropping chunks
    (diagnostics are the disposable class) — never to breaking the worker.
    """
```

``` python
@patch
def _start_process(self:RemoteCapabilityProxy) -> None:
    """Launch the worker subprocess (CR-14: PIPE-captured + journaled).

    Replaces the pre-CR-14 fd-inherited flat log file (`.cjm/logs/<name>.log`
    + ctime session markers): raw output goes through `_pump_stream` into the
    diagnostics store; structured worker logging writes the diagnostics store
    DIRECTLY via the env contract below; the spawn itself is a journal event.
    """
    python_path = self.manifest['python_path']
    cfg = get_config()

    # CR-14: spawn-scoped worker session id — ties WORKER_SPAWNED/READY/DIED
    """
    Launch the worker subprocess (CR-14: PIPE-captured + journaled).
    
    Replaces the pre-CR-14 fd-inherited flat log file (`.cjm/logs/<name>.log`
    + ctime session markers): raw output goes through `_pump_stream` into the
    diagnostics store; structured worker logging writes the diagnostics store
    DIRECTLY via the env contract below; the spawn itself is a journal event.
    """
```

``` python
@patch
def _wait_for_ready(
    self:RemoteCapabilityProxy,
    timeout:float=30.0 # Max seconds to wait for worker startup
) -> None
    "Wait for worker to become responsive."
```

``` python
@patch
def config_options(self:RemoteCapabilityProxy) -> Dict[str, Any]: # CR-11: live config option domains
    """Get the capability's runtime config option providers (CR-11).

    Returns the worker's get_config_options() output (FieldOptions per
    dynamic field, JSON-serialized to dicts). Empty dict when the capability
    exposes no dynamic options.
    """
    with httpx.Client() as client
    """
    Get the capability's runtime config option providers (CR-11).
    
    Returns the worker's get_config_options() output (FieldOptions per
    dynamic field, JSON-serialized to dicts). Empty dict when the capability
    exposes no dynamic options.
    """
```

``` python
@patch
def cleanup(self:RemoteCapabilityProxy) -> None:
    """Clean up capability resources and terminate worker process."""
    # Send cleanup request to worker
    try
    "Clean up capability resources and terminate worker process."
```

``` python
def _maybe_serialize_input(
    self,
    obj: Any # Object to potentially serialize
) -> Any: # Serialized form (path string or original object)
    "Convert FileBackedDTO objects to file paths for zero-copy transfer."
```

``` python
def _prepare_payload(
    self,
    args: tuple, # Positional arguments
    kwargs: dict # Keyword arguments
) -> Dict[str, Any]: # JSON-serializable payload
    """
    Prepare arguments for HTTP transmission.
    
    CR-14: attaches the current call envelope (set by the JobQueue around
    each job's execution via the `wire` contextvar) as a TOP-LEVEL body key.
    Never inside kwargs — capability signatures never see it; old workers ignore
    unknown top-level keys. Envelope-less calls (direct proxy use) simply
    produce unattributed worker records.
    """
```

``` python
def _harvest_worker_accounts(
    """
    CR-14 follow-up: journal in-worker accounts off the response header.
    
    The host-writes-the-row half of the account contract (`wire.
    record_account` / worker `_accounts_headers`): workers RECORD accounts
    during a call span; the proxy journals them ON RECEIPT with
    `worker_reported=True` + the receiving-side identity (the proxy-side
    call envelope + this spawn's worker session). Called on every unary
    response BEFORE the status checks so a `_job_error` 500's accounts
    (e.g. a save that succeeded before a later crash) are kept.
    
    Header absent (old workers, account-less calls) = no-op. A failure here
    logs at ERROR and never breaks the call — the result is the contract;
    and a wedged journal store also fails the queue's own emission for the
    same job, so the wedge gate still fires loudly.
    """
```

``` python
async def execute_async(
    self,
    *args,
    **kwargs
) -> Any: # Capability result
    """
    Execute the capability asynchronously.
    
    CR-4: HTTP 409 from the worker is mapped to a typed `CapabilityCancelledError`.
    Same 409/200/other semantics as the sync `execute()` variant.
    """
```

``` python
def _raise_from_job_error_chunk(
    """
    SG-52: convert a `_job_error` JobError-shaped dict into the right typed exception.
    
    Mapping rules:
    - `original_exc_repr` starts with `"CapabilityCancelledError"` → raise CapabilityCancelledError
      (preserves the non-retriable semantic that category alone doesn't capture).
    - category == "user_input" → CapabilityInputError (with fields_invalid).
    - category == "transient" → CapabilityTransientError (with retry_after_seconds).
    - category == "resource" → CapabilityResourceError (with reconstructed ResourceShortfall).
    - category == "fatal" → CapabilityFatalError.
    - Unknown category → RuntimeError carrying the chunk for forensic inspection.
    
    This is the streaming-side counterpart to /execute's 409 → CapabilityCancelledError
    detection. Same intent: the typed exception survives the HTTP wire boundary
    so substrate / JobQueue / consumer code can branch on category without
    parsing string messages.
    """
```

``` python
def _raise_typed_execute_error(
    """
    SG-52 parity for the unary execute path (stage-3 ledger G7).
    
    If the worker's error body carries the `{"_job_error": <JobError dict>}`
    sentinel (post-fix workers), raise the corresponding TYPED exception via
    `_raise_from_job_error_chunk` — this is what lets the manager's CR-7
    reactive-retry path see `CapabilityResourceError` from plain `/execute`
    calls (the OOM-backstop stress test caught the unary channel collapsing
    every failure to RuntimeError, leaving the retry blind). Pre-fix workers
    return a bare-string detail → the legacy RuntimeError fallback (version-
    skew tolerance, F12 posture).
    """
```

``` python
def execute_stream_sync(self, *args, **kwargs) -> Generator[Any, None, None]
    "Synchronous wrapper for streaming (blocking)."
```

``` python
async def execute_stream(
    self,
    *args,
    **kwargs
) -> AsyncGenerator[Any, None]: # Yields parsed JSON chunks
    """
    Execute with streaming response (async generator).
    
    SG-52: detects the terminal `{"_job_error": <JobError dict>}` chunk and
    raises the corresponding typed exception client-side instead of yielding
    it to downstream consumers. Mirrors /execute's HTTP 409 → typed-exception
    behavior at the streaming wire boundary. Normal capability output chunks
    pass through unchanged (they never carry the `_job_error` key).
    """
```

``` python
def _check_worker_death(self) -> None:
    """CR-7 Track A: inspect subprocess state after an httpx fault on /execute.
    
    Raises `WorkerOOMError` if the worker died with SIGKILL (POSIX returncode -9 —
    kernel OOM-killer is the dominant cause). Raises `CapabilityTransientError` if
    it died with any other returncode. Returns None silently when the worker
    is still alive (caller re-raises the original httpx error).
    
    `signal.SIGKILL` doesn't exist on Windows, where worker-OOM looks different
    (STATUS_NO_MEMORY 0xC0000017 = -1073741799). Cross-platform OOM detection is
    a future enhancement when there's a concrete Windows substrate consumer.
    """
    if self.process is None
    """
    CR-7 Track A: inspect subprocess state after an httpx fault on /execute.
    
    Raises `WorkerOOMError` if the worker died with SIGKILL (POSIX returncode -9 —
    kernel OOM-killer is the dominant cause). Raises `CapabilityTransientError` if
    it died with any other returncode. Returns None silently when the worker
    is still alive (caller re-raises the original httpx error).
    
    `signal.SIGKILL` doesn't exist on Windows, where worker-OOM looks different
    (STATUS_NO_MEMORY 0xC0000017 = -1073741799). Cross-platform OOM detection is
    a future enhancement when there's a concrete Windows substrate consumer.
    """
```

``` python
def execute_with_oom_check(self, *args, **kwargs) -> Any:
    """CR-7 Track A wrapper around the sync execute path.
    
    Catches httpx connection / protocol faults, calls `_check_worker_death`,
    and either raises a typed `WorkerOOMError` / `CapabilityTransientError` or
    re-raises the original httpx error (worker still alive — caller treats
    as a generic transient network issue).
    """
    payload = self._prepare_payload(args, kwargs)
    try
    """
    CR-7 Track A wrapper around the sync execute path.
    
    Catches httpx connection / protocol faults, calls `_check_worker_death`,
    and either raises a typed `WorkerOOMError` / `CapabilityTransientError` or
    re-raises the original httpx error (worker still alive — caller treats
    as a generic transient network issue).
    """
```

``` python
async def execute_async_with_oom_check(self, *args, **kwargs) -> Any:
    """CR-7 Track A wrapper around the async execute path. Same semantics."""
    payload = self._prepare_payload(args, kwargs)
    try
    "CR-7 Track A wrapper around the async execute path. Same semantics."
```

``` python
def _prepare_task_payload(
    self,
    task_name: str,  # Adapter task address
    method: str,  # Adapter method name
    kwargs: dict,  # Task method kwargs
) -> Dict[str, Any]:  # JSON-serializable /task body
    """
    Build the /task body (CR-17 pt 2) + the CR-14 call envelope rider.
    
    Same envelope semantics as `_prepare_payload`: top-level key, never
    inside kwargs.
    """
```

``` python
def execute_task(self, task_name: str, method: str, **kwargs) -> Any:
    """Invoke a typed task-adapter method in-worker (CR-17 pt 2; sync).

    The explicit task channel: `action=` stays the tool's native in-worker
    dispatch; this addresses the TASK contract (adapter + method). Kwargs-only
    by design. Built ONCE with the CR-7 Track-A worker-death check inside —
    no later wrapper supersedes it (the G7a last-assignment-wins lesson).
    """
    payload = self._prepare_task_payload(task_name, method, kwargs)
    try
    """
    Invoke a typed task-adapter method in-worker (CR-17 pt 2; sync).
    
    The explicit task channel: `action=` stays the tool's native in-worker
    dispatch; this addresses the TASK contract (adapter + method). Kwargs-only
    by design. Built ONCE with the CR-7 Track-A worker-death check inside —
    no later wrapper supersedes it (the G7a last-assignment-wins lesson).
    """
```

``` python
async def execute_task_async(self, task_name: str, method: str, **kwargs) -> Any:
    """Invoke a typed task-adapter method in-worker (CR-17 pt 2; async). Same semantics."""
    payload = self._prepare_task_payload(task_name, method, kwargs)
    try
    "Invoke a typed task-adapter method in-worker (CR-17 pt 2; async). Same semantics."
```

``` python
def get_stats(self) -> Dict[str, Any]: # Process telemetry
    """Get worker process resource usage."""
    with httpx.Client() as client
    "Get worker process resource usage."
```

``` python
def is_alive(self) -> bool: # True if worker is responsive
    """Check if the worker process is still running and responsive."""
    if not self.process or self.process.poll() is not None
    "Check if the worker process is still running and responsive."
```

``` python
def get_structural_surface(self) -> Optional[Dict[str, Any]]:  # Live-derived surface, or None
    """Pass-2 Thread 3 live companion: fetch the worker's runtime-derived
    structural surface (`GET /structural_surface`).

    Returns None when the worker predates the endpoint (a pre-fracture
    substrate in a snapshot env → 404) or on transport failure — callers
    treat None as "skip the drift check", never as an empty surface.
    """
    try
    """
    Pass-2 Thread 3 live companion: fetch the worker's runtime-derived
    structural surface (`GET /structural_surface`).
    
    Returns None when the worker predates the endpoint (a pre-fracture
    substrate in a snapshot env → 404) or on transport failure — callers
    treat None as "skip the drift check", never as an empty surface.
    """
```

``` python
def cancel(self) -> bool: # True if cancel request was sent
    """Request cancellation of running execution."""
    try
    "Request cancellation of running execution."
```

``` python
async def cancel_async(self) -> bool: # True if cancel request was sent
    """Request cancellation asynchronously."""
    try
    "Request cancellation asynchronously."
```

``` python
def get_progress(self) -> Dict[str, Any]: # {progress: float, message: str}
    """Get current execution progress from worker."""
    try
    "Get current execution progress from worker."
```

``` python
async def get_progress_async(self) -> Dict[str, Any]: # {progress: float, message: str}
    """Get current execution progress asynchronously."""
    try
    "Get current execution progress asynchronously."
```

``` python
def on_disable(self) -> bool:  # True if hook signal accepted by worker
    """CR-2: forward the substrate's on_disable signal to the worker process.
    
    Capability can opt in via ToolCapability.on_disable(); default implementation
    is a no-op so silent-pass-through is the norm. Failures to reach the
    worker (already terminated, network blip) are logged-and-swallowed —
    the substrate-side enable/disable bookkeeping doesn't depend on the
    hook actually firing.
    """
    try
    """
    CR-2: forward the substrate's on_disable signal to the worker process.
    
    Capability can opt in via ToolCapability.on_disable(); default implementation
    is a no-op so silent-pass-through is the norm. Failures to reach the
    worker (already terminated, network blip) are logged-and-swallowed —
    the substrate-side enable/disable bookkeeping doesn't depend on the
    hook actually firing.
    """
```

``` python
def on_enable(self) -> bool:  # True if hook signal accepted by worker
    """CR-2: forward the substrate's on_enable signal to the worker process.
    
    Same delivery semantics as on_disable: best-effort, errors logged-and-
    swallowed. The capability's hook (default no-op) decides whether to eagerly
    re-acquire resources or rely on lazy re-load at next execute().
    """
    try
    """
    CR-2: forward the substrate's on_enable signal to the worker process.
    
    Same delivery semantics as on_disable: best-effort, errors logged-and-
    swallowed. The capability's hook (default no-op) decides whether to eagerly
    re-acquire resources or rely on lazy re-load at next execute().
    """
```

``` python
def get_system_status(self) -> Optional[Dict[str, Any]]:  # SystemStats dict, or None on transport / config failure
    """CR-3: typed MonitorToolProtocol accessor. POSTs to worker's `/get_system_status`.
    
    Status code semantics (worker side raises HTTPException with these codes):
    - 200: SystemStats dict returned
    - 404: capability is not a monitor — logged at ERROR (configuration error;
          no amount of retry fixes it) and returns None. Loudly distinguished
          from the substrate's WARN-level transient-failure degradation.
    - 500: real capability failure; propagates as HTTPStatusError
    - ConnectError: worker may have died; returns None silently (substrate
          degrades to empty stats)
    """
    try
    """
    CR-3: typed MonitorToolProtocol accessor. POSTs to worker's `/get_system_status`.
    
    Status code semantics (worker side raises HTTPException with these codes):
    - 200: SystemStats dict returned
    - 404: capability is not a monitor — logged at ERROR (configuration error;
          no amount of retry fixes it) and returns None. Loudly distinguished
          from the substrate's WARN-level transient-failure degradation.
    - 500: real capability failure; propagates as HTTPStatusError
    - ConnectError: worker may have died; returns None silently (substrate
          degrades to empty stats)
    """
```

``` python
async def get_system_status_async(self) -> Optional[Dict[str, Any]]:  # SystemStats dict, or None on transport / config failure
    """Async variant of `get_system_status`. Same 200/404/500/ConnectError semantics."""
    try
    "Async variant of `get_system_status`. Same 200/404/500/ConnectError semantics."
```

``` python
def list_processes(self) -> Optional[List[Dict[str, Any]]]:  # ProcessStats dict list, or None on transport / config failure
    """CR-3: typed MonitorToolProtocol accessor. POSTs to worker's `/list_processes`.
    
    Same 200/404/500/ConnectError semantics as `get_system_status`. Note that
    `MonitorToolProtocol.list_processes()` defaults to returning `[]`, so monitors without
    per-process visibility yield a 200 with an empty list.
    """
    try
    """
    CR-3: typed MonitorToolProtocol accessor. POSTs to worker's `/list_processes`.
    
    Same 200/404/500/ConnectError semantics as `get_system_status`. Note that
    `MonitorToolProtocol.list_processes()` defaults to returning `[]`, so monitors without
    per-process visibility yield a 200 with an empty list.
    """
```

``` python
async def list_processes_async(self) -> Optional[List[Dict[str, Any]]]:  # ProcessStats dict list, or None on transport / config failure
    """Async variant of `list_processes`. Same semantics."""
    try
    "Async variant of `list_processes`. Same semantics."
```

``` python
def prefetch(
    self,
    stall_threshold_seconds: Optional[float] = None,  # Override SubstrateConfig.prefetch_stall_threshold_seconds; None = use config
    poll_interval_seconds: float = 1.0,               # How often to poll /progress for stall detection
) -> bool:  # True if worker accepted the prefetch hook
    """
    CR-4 / Session A 2026-05-27: forward the substrate's prefetch signal with
    progress-based stall detection.
    
    Replaces wall-clock-timeout-based startup waiting (operators racing arbitrary
    timeouts vs. network speeds for model downloads). Approach:
    
      1. POST /prefetch fires in a background thread with httpx timeout=None.
      2. Main thread polls /progress every poll_interval_seconds.
      3. Each (progress, message) change resets the stall counter.
      4. If no change in stall_threshold_seconds AND POST still pending →
         SIGTERM the worker subprocess + raise CapabilityTimeoutError.
    
    Capabilities opt in to fine-grained stall defeat by calling
    self.report_progress(...) periodically during long lifecycle operations
    (model download, server startup, etc.). Capabilities that don't report progress
    are fine as long as the threshold accommodates their slowest plausible
    silent stretch.
    
    Errors raised by the capability (worker 500) propagate as RuntimeError; worker
    unreachable propagates as `False`; stall fires CapabilityTimeoutError.
    """
```

``` python
async def prefetch_async(
    self,
    stall_threshold_seconds: Optional[float] = None,
    poll_interval_seconds: float = 1.0,
) -> bool:  # True if worker accepted the prefetch hook
    "Async variant of `prefetch`. Same stall-detection semantics."
```

``` python
def _resolve_prefetch_stall_threshold() -> float:
    """Resolve the stall threshold from SubstrateConfig with a defensive fallback."""
    try
    "Resolve the stall threshold from SubstrateConfig with a defensive fallback."
```

``` python
def _run_prefetch_with_stall_detection(
    proxy: 'RemoteCapabilityProxy',
    stall_threshold_seconds: float,
    poll_interval_seconds: float,
) -> bool
    """
    Sync stall-detecting prefetch implementation.
    
    Runs POST /prefetch in a daemon thread; main thread polls /progress for
    a (progress, message) advance every poll_interval_seconds. If no advance
    in stall_threshold_seconds AND the POST is still in-flight, SIGTERMs the
    worker subprocess (so its capability.cleanup() can run via the worker's
    shutdown handler — closes the orphan-subprocess-on-stall bug) and raises
    CapabilityTimeoutError client-side.
    """
```

``` python
async def _run_prefetch_with_stall_detection_async(
    proxy: 'RemoteCapabilityProxy',
    stall_threshold_seconds: float,
    poll_interval_seconds: float,
) -> bool
    """
    Async stall-detecting prefetch implementation. Mirrors the sync variant
    using asyncio.gather instead of a daemon thread.
    """
```

``` python
def reconfigure(
    self,
    old_config: Optional[Dict[str, Any]],  # Previous config snapshot
    new_config: Optional[Dict[str, Any]],  # Config being applied
) -> bool:  # True if worker accepted the reconfigure call
    """
    CR-4: forward a reconfigure(old, new) call to the worker process.
    
    The capability's default reconfigure() body delegates to
    reconfigure_with_triggers, which walks RELOAD_TRIGGER metadata on the
    capability's config_class to fire `_release_<trigger>` methods for fields
    whose values changed. Capabilities not opting into the declarative pattern
    land in a silent no-op; the substrate's CapabilityManager.update_capability_config
    then falls back to initialize(new_config) for the actual state change.
    """
```

``` python
async def reconfigure_async(
    self,
    old_config: Optional[Dict[str, Any]],  # Previous config snapshot
    new_config: Optional[Dict[str, Any]],  # Config being applied
) -> bool:  # True if worker accepted the reconfigure call
    "Async variant of `reconfigure`. Same semantics."
```

``` python
def __enter__(self):
    """Enter context manager."""
    return self

def __exit__(self, exc_type, exc_val, exc_tb)
    "Enter context manager."
```

``` python
def __exit__(self, exc_type, exc_val, exc_tb):
    """Exit context manager and cleanup."""
    self.cleanup()
    return False

async def __aenter__(self)
    "Exit context manager and cleanup."
```

``` python
async def __aenter__(self):
    """Enter async context manager."""
    return self

async def __aexit__(self, exc_type, exc_val, exc_tb)
    "Enter async context manager."
```

``` python
async def __aexit__(self, exc_type, exc_val, exc_tb)
    "Exit async context manager and cleanup."
```

#### Classes

``` python
class RemoteCapabilityProxy:
    def __init__(
        self,
        manifest:Dict[str, Any], # Capability manifest with python_path, module, class, etc.
        extra_env:Optional[Dict[str, str]]=None, # CR-12: resolved worker-env overlay (secrets + visible overrides) injected at spawn
        adapter_specs:Optional[List[str]]=None, # CR-17 pt 2: host-matched adapter impl specs ("module:ClassName") bound in-worker at spawn
        journal:Optional[JournalStore]=None, # CR-14: journal sink for worker-lifecycle events; lazy LocalJournalStore at cfg.journal_db_path when None
        diagnostics:Optional[DiagnosticsStore]=None # CR-14: diagnostics sink (raw-stream pump + worker env contract); lazy LocalDiagnosticsStore when None
    )
    "Proxy that forwards capability calls to an isolated Worker subprocess."
    
    def __init__(
            self,
            manifest:Dict[str, Any], # Capability manifest with python_path, module, class, etc.
            extra_env:Optional[Dict[str, str]]=None, # CR-12: resolved worker-env overlay (secrets + visible overrides) injected at spawn
            adapter_specs:Optional[List[str]]=None, # CR-17 pt 2: host-matched adapter impl specs ("module:ClassName") bound in-worker at spawn
            journal:Optional[JournalStore]=None, # CR-14: journal sink for worker-lifecycle events; lazy LocalJournalStore at cfg.journal_db_path when None
            diagnostics:Optional[DiagnosticsStore]=None # CR-14: diagnostics sink (raw-stream pump + worker env contract); lazy LocalDiagnosticsStore when None
        )
        "Initialize proxy and start the worker process."
    
    def name(self) -> str: # Capability name from manifest
            """Capability name."""
            return self.manifest.get('name', 'unknown')
        
        @property
        def version(self) -> str: # Capability version from manifest
        "Capability name."
    
    def version(self) -> str: # Capability version from manifest
            """Capability version."""
            return self.manifest.get('version', '0.0.0')
    
        def _journal_event(
            self,
            event_type:str, # SubstrateEventType value
            payload:Optional[Dict[str, Any]]=None # Per-event structured detail
        ) -> None
        "Capability version."
    
    def initialize(
            self,
            config:Optional[Dict[str, Any]]=None # Configuration dictionary
        ) -> None
        "Initialize or reconfigure the capability."
    
    def execute(
            self,
            *args,
            **kwargs
        ) -> Any: # Capability result
        "Execute the capability synchronously.

CR-4: HTTP 409 from the worker is mapped to a typed
`CapabilityCancelledError` raised in the host process, so substrate /
JobQueue / consumer callers can distinguish cooperative cancellation
from a real capability failure (500 → RuntimeError as before)."
    
    def get_config_schema(self) -> Dict[str, Any]: # JSON Schema
            """Get the capability's configuration schema."""
            with httpx.Client() as client
        "Get the capability's configuration schema."
    
    def get_current_config(self) -> Dict[str, Any]: # Current config values
            """Get the capability's current configuration."""
            with httpx.Client() as client
        "Get the capability's current configuration."
```

### Job Queue (`queue.ipynb`)

> Resource-aware job queue for sequential capability execution with
> cancellation support

#### Import

``` python
from cjm_substrate.core.queue import (
    JobStatus,
    JobEventType,
    CancelPhase,
    Job,
    JobEvent,
    QueueStats,
    ResourceSnapshot,
    JobQueueDependencies,
    JobQueue
)
```

#### Functions

``` python
async def _enqueue_job(
    self,
    job: Job,  # Pre-constructed Job (caller fills composition fields if needed)
) -> str:  # job_id
    """
    Internal: enqueue a pre-constructed Job.
    
    Caller is responsible for validation (disabled-capability check, etc.).
    Used by `submit` and by the composition advancement path (stage 3) —
    the latter populates `composition_id` + `node_id` on the Job before
    enqueueing so the member job appears in composition-tagged event streams
    from its first STATE_TRANSITION.
    """
```

``` python
def _check_journal_wedge(self) -> None:
    """CR-14 wedge gate: refuse new work after a journal-append failure.

    The loud half of the named-tension resolution — a wedged journal must
    never silently drop the audit trail, and the refusal happens at the
    operational boundary (new submissions) rather than mid-finalization
    (which would leak lanes). Clears only by constructing a new queue /
    fixing the journal and resetting `_journal_wedged` deliberately.
    """
    if self._journal_wedged
    """
    CR-14 wedge gate: refuse new work after a journal-append failure.
    
    The loud half of the named-tension resolution — a wedged journal must
    never silently drop the audit trail, and the refusal happens at the
    operational boundary (new submissions) rather than mid-finalization
    (which would leak lanes). Clears only by constructing a new queue /
    fixing the journal and resetting `_journal_wedged` deliberately.
    """
```

``` python
async def submit(
    self,
    capability_instance_id: str,  # Target capability instance (per CR-10)
    *args,
    priority: int = 0,  # Higher = more urgent
    task: Optional[str] = None,  # Task-channel address: adapter task name (stage 4)
    method: Optional[str] = None,  # Task-channel address: adapter method (set with task)
    run_id: Optional[str] = None,  # Host-tier run correlation (CR-14 follow-up; reserved name, never a capability kwarg)
    actor: Optional[str] = None,  # Who/what initiated (CR-14 follow-up; reserved name)
    control: Optional[Dict[str, Any]] = None,  # Per-call control flags (force/cache-bypass); reserved name, never a capability kwarg
    **kwargs
) -> str:  # Returns job_id
    """
    Submit a job to the queue.
    
    CR-2: rejects jobs for disabled capabilities at submit time (typed
    CapabilityDisabledError) so the failure surface matches CapabilityManager.
    execute_capability's disabled gate. Submitting to a disabled capability would
    otherwise sit in the queue until execution, then raise — moving the
    check earlier gives operators an actionable signal immediately.
    
    CR-6: no STATE_TRANSITION event fires at submit because the job is
    already in `pending` state at construction — there's no transition
    to publish. The first STATE_TRANSITION fires when the processor loop
    moves the job pending → running.
    
    CR-14: refuses loudly when the journal is wedged (see
    `_check_journal_wedge`). `run_id`/`actor` join `priority`/`task`/
    `method` as reserved keyword names (they never reach capability kwargs):
    cores pass their run-manifest id + initiating actor so every journal
    row for this job carries the host-tier correlation.
    """
```

``` python
async def cancel(
    self,
    job_id: str  # Job to cancel
) -> bool:  # True if cancelled
    """
    Cancel a pending or running job.
    
    CR-6: publishes STATE_TRANSITION when a pending job moves directly to
    cancelled (no transition through running). Running-job cancellation
    publishes CANCEL_PHASE_CHANGED events from `_execute_with_cancellation`.
    
    Stage 3: when the cancelled job is a composition member,
    `_advance_composition` runs after the lock is released so the cancelled
    status propagates to the composition run (dependents skip; the run
    finalizes when all nodes are terminal). Lock release is required because
    `_advance_composition` may need to enqueue downstream members in some
    flows; asyncio.Lock is not re-entrant.
    """
```

``` python
def reorder(
    self,
    job_id: str,  # Job to move
    new_priority: int  # New priority value
) -> bool:  # True if reordered
    "Change the priority of a pending job."
```

``` python
def get_job(
    self,
    job_id: str  # Job to retrieve
) -> Optional[Job]:  # Job or None
    "Get a job by ID."
```

``` python
async def wait_for_job(
    self,
    job_id: str,  # Job to wait for
    timeout: Optional[float] = None  # Max seconds to wait
) -> Job:  # Completed/failed/cancelled job
    """
    Wait for a job to complete.
    
    Independent of the CR-6 event bus — uses a per-job `asyncio.Event` for
    the simple block-until-done affordance. Streaming consumers should use
    `events(job_id)` instead.
    """
```

``` python
def get_pending(self) -> List[Job]:  # Pending jobs, priority-sorted
    "Get pending jobs, priority-sorted (higher priority first, then FIFO)."
```

``` python
def get_running_jobs(self) -> List[Job]:  # All currently-executing jobs
    "All in-flight jobs (stage 3: the queue is multi-lane)."
```

``` python
def get_history(
    self,
    limit: Optional[int] = None,  # Max jobs to return (most recent N); None = all
) -> List[Job]:  # Completed/failed/cancelled jobs, most recent first
    """
    Get completed jobs, most recent first.
    
    If `limit` is provided, returns the most recent N. The internal history
    list grows append-only up to `max_history`, so older jobs are evicted
    in submission order (oldest first).
    """
```

``` python
def get_stats(self) -> QueueStats:  # Aggregate counts
    "Get aggregate queue stats — total counts by terminal status."
```

``` python
def get_job_diagnostics(
    self,
    job_id: str,  # Job whose diagnostic records to read
    limit: Optional[int] = 200,  # Max records (None = all)
    after_seq: Optional[int] = None,  # Tail cursor for follow-style reads
) -> List[DiagnosticRecord]:  # Job-stamped records, oldest first
    """
    EXACT per-job diagnostics (CR-14; replaces `get_job_logs`).
    
    Records were stamped with the job id IN THE WORKER via the call-envelope
    contextvar — no timestamp windows, no over-fetch, correct under stage-3
    same-worker concurrency and across multi-instance capabilities (both of which
    the deleted `_slice_log_by_job_window` heuristic got wrong). Follow-style
    consumers poll with `after_seq` (the LOG_APPENDED replacement).
    
    Returns [] when no diagnostics store is configured.
    """
```

``` python
def get_history_from_journal(
    self,
    limit: Optional[int] = None,  # Most recent N terminal jobs (None = all)
) -> List[Job]:  # Rehydrated job records, most recent first
    """
    Durable job history (the CR-14 `_history` migration rider).
    
    Rehydrates Job records from terminal STATE_TRANSITION journal rows —
    restart-surviving and unbounded, unlike the in-memory `get_history`
    working set (`max_history` eviction). Rehydrated Jobs are RECORDS:
    args/kwargs/result are not journaled (results live in capability DBs;
    parameters in run manifests) — identity, timing, status, error,
    composition/task fields are present.
    
    Falls back to the in-memory history when no journal is configured.
    """
```

``` python
def _subscriber_keys_for(event: JobEvent) -> List[str]:
    """Return the subscriber keys an event should fan out to (CR-6 / stage 3).

    Every event reaches "all" subscribers + the per-job subscribers.
    Composition-tagged events additionally reach per-composition subscribers.
    """
    keys = ["all", f"job:{event.job_id}"]
    if event.composition_id is not None
    """
    Return the subscriber keys an event should fan out to (CR-6 / stage 3).
    
    Every event reaches "all" subscribers + the per-job subscribers.
    Composition-tagged events additionally reach per-composition subscribers.
    """
```

``` python
def _journal_append_guarded(
    self,
    event: JournalEvent,  # Pre-built journal event (caller fills identity/payload)
) -> None
    """
    Append to the journal under the wedge-gate failure contract (CR-14).
    
    The ONE place the named-tension resolution lives: an append failure logs
    at ERROR and wedges the queue (new submissions refuse via
    `_check_journal_wedge`) instead of raising into dispatch/finalization
    paths — raising there would leak lanes and corrupt in-flight state,
    while continuing silently would drop the audit trail. No-op without a
    configured journal. Used by `_publish_event` (job events) and by the
    direct substrate-event emissions (ADMISSION_DECIDED).
    """
```

``` python
def _publish_event(
    self,
    event: JobEvent,  # Event to emit
) -> None
    """
    The SINGLE emission path (CR-14: journal-primary).
    
    Class routing at the one place every event passes through:
    - journal-class events (everything except `LIVENESS_EVENT_TYPES`)
      become durable journal rows FIRST, then fan out to live subscribers —
      emitting IS writing the record; the bus is a live tail of the journal.
    - liveness-class events (PROGRESS_CHANGED / RESOURCE_SNAPSHOT) fan out
      only; their final values ride the terminal STATE_TRANSITION row.
    
    Journal failures follow the wedge-gate contract — see
    `_journal_append_guarded`.
    
    Fan-out: slow subscribers backpressure themselves via `asyncio.QueueFull`
    drop — publisher never blocks. Each subscriber tracks `dropped_count` so
    operators / future telemetry can surface backpressure visibility.
    """
```

``` python
async def _subscribe(
    self,
    key: str,  # Subscriber key ("all" | "job:<id>" | "comp:<id>")
) -> AsyncIterator[JobEvent]
    """
    Internal: register a subscription; yield events until the consumer
    exits the async generator.
    
    Cleanup runs in `finally` so the subscriber is unregistered even if the
    consumer raises or is cancelled. Empty key lists are deleted to avoid
    memory accumulation across many short-lived subscriptions.
    """
```

``` python
async def events(
    self,
    job_id: str,  # Filter to events for this job
) -> AsyncIterator[JobEvent]
    """
    Subscribe to events for a single job (async generator).
    
    Yields events as they fire. Multiple concurrent subscribers to the same
    job_id each get their own independent stream — useful for multi-tab UIs.
    Late subscribers catch up exactly via the journal: query
    `journal.query(job_id=..., after_seq=cursor)` then follow live — the
    bus is a tail of the journal, not the record itself (CR-14).
    """
```

``` python
async def events_for_composition(
    self,
    composition_id: str,  # Filter to events tagged with this composition
) -> AsyncIterator[JobEvent]
    """
    Subscribe to events for all member jobs of a composition (async
    generator).
    
    Yields the unified per-composition narrative: member-job lifecycle events
    interleaved with `COMPOSITION_ADVANCED` aggregate signals (stage 3).
    """
```

``` python
async def all_events(self) -> AsyncIterator[JobEvent]:
    """Subscribe to all events (firehose; async generator).

    Useful for global dashboards, audit logs, and telemetry sinks that need
    the complete event stream rather than a filtered view.
    """
    async for evt in self._subscribe("all")
    """
    Subscribe to all events (firehose; async generator).
    
    Useful for global dashboards, audit logs, and telemetry sinks that need
    the complete event stream rather than a filtered view.
    """
```

``` python
async def submit_composition(
    self,
    comp: Composition,  # Composition to run (validated at submit)
) -> str:  # composition run id
    """
    Submit a composition — a DAG of capability-invocation nodes with
    execution-time-bound inputs (stage 3; replaces `submit_sequence`).
    
    Validates upfront: structural validation via `new_composition_run`
    (duplicate ids / unknown refs / cycles → `CompositionValidationError`)
    and the disabled-capability gate across all nodes (`CapabilityDisabledError`),
    matching the sequence-era precedent. Member Jobs are created LAZILY —
    only dependency-free nodes have Jobs at submit; downstream nodes get
    their kwargs materialized from upstream results at advancement time.
    
    CR-14: refuses loudly when the journal is wedged (the same gate as
    `submit`).
    
    Consumers wait via `wait_for_composition`, observe via
    `events_for_composition`, inspect via `get_composition`.
    """
```

``` python
async def wait_for_composition(
    self,
    composition_id: str,  # Composition to wait for
    timeout: Optional[float] = None,  # Max seconds to wait
) -> CompositionRun:  # Terminal run record
    """
    Block until a composition reaches terminal status (the
    `wait_for_job` analog for compositions).
    """
```

``` python
async def cancel_composition(
    self,
    composition_id: str  # Composition to cancel
) -> bool:  # True if cancellation was recorded
    """
    Cancel an in-flight composition (USER intent — the run lands
    `cancelled`).
    
    Records intent FIRST (`cancel_requested`) so member-cancel callbacks
    racing through `_advance_composition` derive the right terminal status;
    then marks never-started nodes cancelled and cancels every member whose
    Job is still pending or running (in-flight members resolve through the
    per-job cooperative-cancel machinery and finalize the run on the way
    out). Returns False if the composition is unknown or already terminal.
    """
```

``` python
def get_composition(
    self,
    composition_id: str  # Composition to retrieve
) -> Optional[CompositionRun]:  # CompositionRun or None
    "Get a composition run by id (read-only inspection)."
```

``` python
async def _start_ready_nodes(
    self,
    run: CompositionRun,  # Composition being advanced
) -> List[str]:  # Node ids whose member Jobs were created + enqueued
    """
    Create + enqueue member Jobs for every currently-ready node (stage 3).
    
    This is where execution-time binding happens: each ready node's kwargs
    are materialized from upstream results via `resolve_node_kwargs`. A
    binding failure is recorded as that NODE's failure (dependents skip,
    fail-fast housekeeping applies) rather than raising to the caller — by
    the time bindings resolve, the composition is mid-flight and the failure
    must flow through the same path as a member-job failure.
    
    CR-14 follow-up: member Jobs inherit the composition's `run_id`/`actor`
    correlation tags so every member's journal rows link to the host run.
    """
```

``` python
async def _advance_composition(
    self,
    completed_job: Job  # Member job that just reached terminal status
) -> None
    """
    Advance a composition after a member job completes (stage 3).
    
    Records the member outcome; on success enqueues newly-ready downstream
    nodes (emitting COMPOSITION_ADVANCED); on failure/cancellation skips
    transitive dependents and (fail_fast) cancels independent pending
    members. Finalizes the run when every node is terminal. Always called
    OUTSIDE the queue's main lock — may take it via `_enqueue_job`.
    """
```

``` python
async def _cancel_pending_members(
    self,
    run: CompositionRun,  # Composition whose pending members should stop
) -> None
    """
    Fail-fast housekeeping: stop members that have not actually started
    executing (stage 3, ratified failure semantics).
    
    IN-FLIGHT members run to completion — their results and caches are kept,
    and force-killing a half-done GPU job buys nothing. Nodes that never got
    a Job are marked cancelled directly; members whose Jobs still sit in the
    pending heap go through the per-job cancel path (which re-enters
    `_advance_composition` for each, transitioning them to terminal).
    """
```

``` python
def _maybe_finalize_composition(
    self,
    run: CompositionRun,  # Composition to check for terminal state
) -> None
    "Finalize the run once every node is terminal (idempotent; stage 3)."
```

``` python
def _sample_resource_snapshot(
    self,
    job: Job  # Job to sample resources for
) -> Optional[ResourceSnapshot]
    """
    Sample worker + sysmon stats for a job (CR-6 Stage 3 internal helper).
    
    Returns None if the worker proxy doesn't support `get_stats` or the call
    fails — substrate can't fabricate a snapshot. Sysmon enrichment is
    best-effort: if the named capability isn't loaded / errors / lacks the CR-3
    typed methods, GPU fields stay None and the worker-only snapshot is
    returned.
    
    Subprocess-spawning capabilities (e.g. Voxtral-vLLM's managed vLLM server)
    spawn grandchild PIDs that hold GPU memory the worker itself doesn't.
    GPU attribution delegates to `attribute_gpu_to_worker_subtree`, which
    intersects the worker-reported `subtree_pids` set with sysmon's per-PID
    GPU enumeration. The pre-fix path matched only `worker_pid` and reported
    `gpu_memory_mb=None` for any subprocess-spawning capability.
    """
```

``` python
def get_resource_snapshot(
    self,
    job_id: str  # Job to sample resources for
) -> Optional[ResourceSnapshot]
    """
    Get a point-in-time resource snapshot for a job (CR-6 Stage 3).
    
    Returns None if the job is unknown or the worker proxy doesn't expose
    `get_stats`. Composes worker stats with sysmon GPU stats when the queue
    is configured with a `sysmon_capability_name`.
    """
```

``` python
async def start(self) -> None:
    """Start the queue processor.

    CR-6 Stage 4: installs the substrate-side retry observer on `_deps`
    (typically a CapabilityManager). When CR-7's reactive-retry path fires for
    a running job, the observer updates `Job.retry_count` and publishes a
    RETRY_STARTED event tagged with the in-flight job. Previous observer
    value (if any) is saved + restored in stop() for cooperative coexistence.
    """
    if self._running_flag
    """
    Start the queue processor.
    
    CR-6 Stage 4: installs the substrate-side retry observer on `_deps`
    (typically a CapabilityManager). When CR-7's reactive-retry path fires for
    a running job, the observer updates `Job.retry_count` and publishes a
    RETRY_STARTED event tagged with the in-flight job. Previous observer
    value (if any) is saved + restored in stop() for cooperative coexistence.
    """
```

``` python
async def stop(self) -> None:
    """Stop the queue processor gracefully.

    Stage 3: in-flight job tasks are detached lanes now — drain them with
    the same 5s budget as the processor task; leftovers are cancelled.

    CR-6 Stage 4: restores the previous `_on_retry` observer on deps to
    leave the manager in the state we found it (cooperative with other
    queue instances, tests, etc.).
    """
    self._running_flag = False
    self._job_available.set()  # Wake up the processor

    if self._processor_task
    """
    Stop the queue processor gracefully.
    
    Stage 3: in-flight job tasks are detached lanes now — drain them with
    the same 5s budget as the processor task; leftovers are cancelled.
    
    CR-6 Stage 4: restores the previous `_on_retry` observer on deps to
    leave the manager in the state we found it (cooperative with other
    queue instances, tests, etc.).
    """
```

``` python
def _on_manager_retry(
    """
    Substrate-side retry observer (CR-6 Stage 4; stage-3 multi-lane).
    
    Invoked synchronously by CapabilityManager's CR-7 retry loop just before
    each retry attempt. Updates `Job.retry_count` on the matching in-flight
    job + emits RETRY_STARTED. Best-effort: synchronous callback; emission
    failure shouldn't propagate back into the retry loop.
    
    `attempt` semantics: CapabilityManager's loop iterates
    `for attempt in range(max_retries + 1)`. The first iteration
    (`attempt=0`) is the original try and never invokes this callback —
    so the value PASSED here is already the 1-based retry number.
    
    Match logic (stage 3): scan the multi-lane `_running` dict for a job on
    the retrying instance. With the per-instance cap defaulting to 1, at
    most one in-flight job matches; if an instance opts into same-worker
    concurrency (SG-33 cap > 1), the first match is attributed — a known
    blunt edge recorded in the stage-3 ledger.
    """
```

``` python
def _move_to_history(self, job: Job) -> None:
    """Move a job to history, maintaining max_history limit."""
    self._history.append(job)
    if len(self._history) > self.max_history
    "Move a job to history, maintaining max_history limit."
```

``` python
def _signal_job_completed(self, job_id: str) -> None:
    """Signal that a job has completed."""
    event = self._job_completed_events.get(job_id)
    if event
    "Signal that a job has completed."
```

``` python
def _job_snapshot(job: Job) -> Dict[str, Any]:
    """Serialize a job's RECORD fields for the terminal journal row (CR-14).

    Deliberately excludes args/kwargs/result: results live in capability DBs,
    parameters in run manifests — the journal never duplicates what better
    homes already record (the attempted-vs-happened rule). The error rides as
    the JobError dict (failures are exactly what the journal exists to keep).
    """
    return {
        "id": job.id,
    """
    Serialize a job's RECORD fields for the terminal journal row (CR-14).
    
    Deliberately excludes args/kwargs/result: results live in capability DBs,
    parameters in run manifests — the journal never duplicates what better
    homes already record (the attempted-vs-happened rule). The error rides as
    the JobError dict (failures are exactly what the journal exists to keep).
    """
```

``` python
def _job_from_snapshot(snap: Dict[str, Any]) -> Job:
    """Rehydrate a Job RECORD from a terminal-row snapshot (CR-14).

    Tolerant on both directions: unknown snapshot keys are ignored; a
    JobError dict that no longer matches the current JobError fields
    degrades to None rather than failing the history read.
    """
    def _dt(v)
    """
    Rehydrate a Job RECORD from a terminal-row snapshot (CR-14).
    
    Tolerant on both directions: unknown snapshot keys are ignored; a
    JobError dict that no longer matches the current JobError fields
    degrades to None rather than failing the history read.
    """
```

``` python
def _emit_state_transition(
    self,
    job: Job,
    prev_status: JobStatus,
) -> None
    """
    Emit a STATE_TRANSITION event for `job`'s most recent status change.
    
    Centralized so every transition site (start, completed, failed, cancelled,
    or future cancel-phase-driven transitions) carries identical tag context.
    
    CR-14: TERMINAL transitions carry the job snapshot in the payload — the
    durable journal row becomes the job's record of existence (the `_history`
    migration rider: `get_history_from_journal` rehydrates from these).
    """
```

``` python
def _emit_cancel_phase(
    self,
    job: Job,
    new_phase: CancelPhase,
) -> None
    """
    Emit a CANCEL_PHASE_CHANGED event (CR-6 Stage 4).
    
    Updates `job.cancel_phase` to the new value and publishes an event with
    the prior phase + new phase in the payload. Centralized so every phase
    transition site in `_execute_with_cancellation` produces identically-shaped
    events.
    """
```

``` python
def _emit_block_reason(
    self,
    job: Job,
    new_reason: Optional[str],
) -> None
    """
    Emit a BLOCK_REASON_CHANGED event (CR-6 Stage 4 — reserved).
    
    Updates `job.block_reason` and publishes an event with the prior + new
    reason. Stage 4 ships this helper for future scheduler-integration use;
    the queue's current scheduling logic doesn't surface block reasons, so
    the helper is reserved for the eventual scheduler-coordination wiring.
    """
```

``` python
async def _fetch_admission_stats(self) -> Dict[str, Any]:
    """Fetch live system telemetry for admission decisions (stage 3).

    Defensive-getattr: a deps implementation without `get_global_stats`
    (older test doubles) yields `{}` — admission then runs GPU-profiled jobs
    exclusive (no live headroom to verify against) and CPU-profiled jobs on
    lanes + instance caps alone. Failures degrade the same way.
    """
    fn = getattr(self._deps, 'get_global_stats', None)
    if not callable(fn)
    """
    Fetch live system telemetry for admission decisions (stage 3).
    
    Defensive-getattr: a deps implementation without `get_global_stats`
    (older test doubles) yields `{}` — admission then runs GPU-profiled jobs
    exclusive (no live headroom to verify against) and CPU-profiled jobs on
    lanes + instance caps alone. Failures degrade the same way.
    """
```

``` python
def _pop_next_admissible(
    self,
    stats: Dict[str, Any],  # Live telemetry from _fetch_admission_stats (possibly {})
) -> Optional[Job]:  # The popped job, or None when nothing is dispatchable
    """
    Pop the highest-priority ADMISSIBLE pending job (stage 3).
    
    Scans pending jobs in priority order with SKIP-AHEAD: a blocked job
    (insufficient GPU headroom, instance cap reached) does not stall
    admissible jobs behind it. The admission ladder (stage-3 ledger,
    ratified 2026-06-10):
    
    1. **lanes** — at most `max_concurrent_lanes` in-flight jobs;
    2. **exclusivity** — a job with NO empirical profile runs ALONE: its
       first run IS its measurement run. The store's (instance_id,
       config_hash) keying graduates it automatically after one run and
       demotes it again whenever the config changes — staleness is dissolved
       by the keying, not solved by invalidation;
    3. **per-instance cap** — in-flight jobs per instance ≤ the instance's
       SG-33 `max_concurrent_requests`, DEFAULT 1 when unset (same-worker
       concurrency is opt-in per capability);
    4. **resources** — the empirical `gpu_memory_mb_peak_max` is admitted
       against BOTH a reservation ledger (sum of running GPU peaks ≤ total ×
       `gpu_headroom_fraction`; covers admitted-but-not-yet-loaded models)
       AND live free VRAM (covers resident idle models + external GPU
       users); `memory_mb_peak_max` against live `memory_available_mb`.
       Without sysmon stats, GPU-profiled jobs run exclusive.
    
    The manifest's `requires_gpu` is deliberately NOT consumed — whether a
    config uses the GPU is an empirical fact (`gpu_memory_mb_peak_max > 0`),
    not a declaration (ledger G2: the pre-overhaul scheduler quantity checks
    were dead code against v2 manifests). Worst case (no profiles, no
    sysmon) every job runs exclusive = exact pre-stage-3 single-lane
    behavior. CR-7 reactive retry is the documented backstop for admission
    misses.
    """
```

``` python
async def _process_loop(self) -> None:
    """Main dispatch loop (stage 3: multi-lane ready-set dispatch).

    `_job_available` means "dispatch state may have changed" — set on submit
    AND on every job completion, cleared only after a scan that dispatched
    nothing. Each pass: fetch admission telemetry (outside the lock), pop
    the highest-priority admissible job under the lock, and launch it as an
    independent task tracked in `_running_tasks` (awaited by `stop`). The
    pre-stage-3 loop executed one job at a time inline.

    CR-14 follow-up: each ADMIT is journaled as an ADMISSION_DECIDED row —
    emitted AFTER the lock releases (sqlite I/O never rides the locked fast
    path; the decision detail is recovered from the admission ledgers the
    pop updated synchronously). Blocked jobs are NOT journaled per scan —
    the scan loop re-evaluates them on every pass and would spam rows; the
    reserved BLOCK_REASON_CHANGED transition channel is the place block
    visibility lands when the scheduler-coordination wiring happens.
    """
    while self._running_flag
    """
    Main dispatch loop (stage 3: multi-lane ready-set dispatch).
    
    `_job_available` means "dispatch state may have changed" — set on submit
    AND on every job completion, cleared only after a scan that dispatched
    nothing. Each pass: fetch admission telemetry (outside the lock), pop
    the highest-priority admissible job under the lock, and launch it as an
    independent task tracked in `_running_tasks` (awaited by `stop`). The
    pre-stage-3 loop executed one job at a time inline.
    
    CR-14 follow-up: each ADMIT is journaled as an ADMISSION_DECIDED row —
    emitted AFTER the lock releases (sqlite I/O never rides the locked fast
    path; the decision detail is recovered from the admission ledgers the
    pop updated synchronously). Blocked jobs are NOT journaled per scan —
    the scan loop re-evaluates them on every pass and would spam rows; the
    reserved BLOCK_REASON_CHANGED transition channel is the place block
    visibility lands when the scheduler-coordination wiring happens.
    """
```

``` python
async def _execute_job(self, job: Job) -> None:
    """Execute a single job (runs as an independent task per lane; stage 3)."""
    self.logger.info(f"Starting job {job.id[:8]} ({job.capability_instance_id})")

    # Mark as running + emit transition pending → running
    prev_status = job.status
    job.status = JobStatus.running
    job.started_at = datetime.now(timezone.utc)
    # Lane already reserved at pop time (_pop_next_admissible) — see the
    # admission-state synchronicity note there.
    self._emit_state_transition(job, prev_status)

    try
    "Execute a single job (runs as an independent task per lane; stage 3)."
```

``` python
async def _execute_with_cancellation(
    self,
    job: Job,
    capability: Any
) -> Any
    """
    Execute job with cancellation monitoring.
    
    CR-6 Stage 4 wires CANCEL_PHASE_CHANGED events for the substrate's
    cooperative → force → reloading → completed state machine.
    
    Cooperative-success path (capability acknowledges cancel within timeout):
        COOPERATIVE → COMPLETED
    Force-kill path (cooperative timeout):
        COOPERATIVE → FORCE → RELOADING → COMPLETED
    """
```

``` python
async def _poll_progress(
    self,
    job: Job,
    capability: Any
) -> None
    """
    Poll progress + sample resources from the capability during execution.
    
    Emits PROGRESS_CHANGED events when progress or status_message changes
    from the previous poll (avoids spamming the bus between meaningful
    updates). CR-6 Stage 3: also emits RESOURCE_SNAPSHOT events every
    `resource_snapshot_cadence_polls` iterations; snapshot is also stored
    on `job.last_resource_snapshot` for synchronous inspection.
    
    Liveness-class events (never journaled) still carry run_id/actor so
    live-tail subscribers see the same tag shape as journal-class events.
    """
```

#### Classes

``` python
class JobStatus(str, Enum):
    "Status of a job in the queue."
```

``` python
class JobEventType(str, Enum):
    """
    Push-based job event types (CR-6; stage-3 composition rework; CR-14
    journal-primary emission).
    
    Emitted by JobQueue through the single emission path (`_publish_event`):
    journal-class events become durable journal rows AND fan out to live
    subscribers; liveness-class events (`LIVENESS_EVENT_TYPES` in
    `core.journal_store`) fan out only — their final values ride the
    terminal STATE_TRANSITION row. Consumers subscribe via
    `queue.events(job_id)` / `queue.events_for_composition(comp_id)` /
    `queue.all_events()` and receive `JobEvent` instances asynchronously.
    
    COMPOSITION_ADVANCED replaced the retired SEQUENCE_ADVANCED at execution
    stage 3 (CR-16: compositions replace sequences outright): it fires when a
    member job's completion unlocks downstream composition nodes — payload
    carries the completed node id + the newly enqueued node ids.
    
    The reserved-never-emitted LOG_APPENDED was RETIRED at stage 7 (CR-14):
    log-follow is a diagnostics-store cursor read (`get_job_diagnostics`),
    not a push event — there are no log files or byte offsets anymore.
    Non-job substrate events (worker lifecycle, config, runs) live in
    `core.journal_store.SubstrateEventType`.
    """
```

``` python
class CancelPhase(str, Enum):
    """
    Phase of a cancellation in progress (CR-6 + CR-4 pairing).
    
    Surfaces the substrate's cancel state machine. Stage 4 wires the
    transitions; Stage 1 reserves the enum so `Job.cancel_phase` can be
    typed correctly without dangling forward references.
    """
```

``` python
@runtime_checkable
class JobQueueDependencies(Protocol):
    """
    Substrate dependencies the JobQueue requires (CR-6 + stage 3).
    
    CapabilityManager satisfies this structurally; the Protocol exists so JobQueue
    can be tested in isolation (with a lightweight test double) and so a future
    extraction into a separate library has no API constraint locked in.
    
    The first 4 methods are the CR-6 execute-path surface (CR-14 retired the
    original flat-log accessor — log retrieval is a diagnostics-store query now). The
    stage-3 additions (CR-16 multi-lane admission) are consumed DEFENSIVELY
    via getattr — a deps implementation without them yields no admission
    evidence, so every job runs exclusive = exact pre-stage-3 single-lane
    behavior. Older test doubles keep working unchanged. CR-14 also reads
    `journal_store` / `diagnostics_store` ATTRIBUTES via getattr when the
    queue isn't constructed with explicit stores — a deps without them
    (test doubles) simply yields no journaling.
    """
    
    def get_capability_meta(self, name_or_id: str) -> Optional[Any]: ...
        def get_capability(self, name_or_id: str) -> Optional[Any]: ...
    
    def get_capability(self, name_or_id: str) -> Optional[Any]: ...
        async def execute_capability_async(self, name_or_id: str, *args: Any, **kwargs: Any) -> Any: ...
    
    async def execute_capability_async(self, name_or_id: str, *args: Any, **kwargs: Any) -> Any: ...
        def reload_capability(self, name_or_id: str) -> Any: ...
    
    def reload_capability(self, name_or_id: str) -> Any: ...
        # Stage 3 (CR-16) admission surface
    
    def get_admission_profile(self, name_or_id: str) -> Optional[Dict[str, Any]]: ...
        def get_instance_concurrency_cap(self, name_or_id: str) -> Optional[int]: ...
    
    def get_instance_concurrency_cap(self, name_or_id: str) -> Optional[int]: ...
        async def get_global_stats(self) -> Dict[str, Any]: ...
    
    async def get_global_stats(self) -> Dict[str, Any]: ...
        # Stage 4 (CR-17 pt 2) task channel — invoked only for task-addressed jobs
        # (Job.task_name set); execute-channel jobs never touch it, so older test
        # doubles keep working unchanged.
        async def execute_capability_task_async(self, name_or_id: str, task_name: str, method: str, **kwargs: Any) -> Any: ...
    
    async def execute_capability_task_async(self, name_or_id: str, task_name: str, method: str, **kwargs: Any) -> Any: ...
```

``` python
@dataclass
class Job:
    """
    A queued capability execution request (CR-6 reshape; stage-3 composition
    rework renamed the sequence tags to composition tags).
    
    `composition_id` / `node_id` are set when the job is a lazily-created
    member of a composition (CR-16) — they ride every JobEvent so
    `events_for_composition` subscribers see member lifecycle events.
    
    `run_id` / `actor` (CR-14 follow-up) are host-tier correlation tags:
    cores pass their run-manifest id + initiating actor at submit so every
    journal row for this job links back to the run record (run manifest ↔
    journal linkage) and carries who/what initiated it.
    """
    
    id: str  # Unique job identifier (UUID)
    capability_instance_id: str  # Target capability instance (per CR-10)
    args: Tuple[Any, ...]  # Positional arguments for execute()
    kwargs: Dict[str, Any]  # Keyword arguments for execute()
    status: JobStatus = JobStatus.pending  # Current job status
    priority: int = 0  # Higher = more urgent
    submitted_at: datetime = field(...)  # When submitted
    started_at: Optional[datetime]  # When execution started
    completed_at: Optional[datetime]  # When execution finished
    progress: float = 0.0  # 0.0 to 1.0, or -1.0 for indeterminate
    status_message: str = ''  # Descriptive status message
    result: Any  # Execution result (if completed)
    error: Optional[JobError]  # Structured failure summary (CR-5)
    composition_id: Optional[str]  # Set when part of a composition (stage 3)
    node_id: Optional[str]  # Composition node this job executes (stage 3)
    task_name: Optional[str]  # Task-channel address: adapter task (stage 4, CR-17 pt 2)
    method: Optional[str]  # Task-channel address: adapter method (stage 4)
    run_id: Optional[str]  # Host-tier run correlation (CR-14 follow-up; core run manifests)
    actor: Optional[str]  # Who/what initiated the work (CR-14 follow-up)
    control: Dict[str, Any] = field(...)  # Per-call control flags (force/cache-bypass — CR-15 cat 4); rides CallEnvelope.control
    cancel_requested_at: Optional[datetime]  # When cancel was requested (Stage 4)
    cancel_phase: Optional[CancelPhase]  # Active cancel phase (Stage 4)
    block_reason: Optional[str]  # Why the scheduler is blocking (Stage 4)
    retry_count: int = 0  # Reactive retries attempted (CR-7 + Stage 4)
    last_resource_snapshot: Optional[Any]  # Stage 3 wires this (ResourceSnapshot)
    
```

``` python
@dataclass
class JobEvent:
    """
    A push-based job event (CR-6; stage-3 composition tags).
    
    Carries full tag context so a subscriber to `all_events()`, `events(job_id)`,
    or `events_for_composition(comp_id)` receives identically-shaped instances.
    `payload` is a per-event-type structured dict (e.g., STATE_TRANSITION carries
    `{"from": "pending", "to": "running"}`; COMPOSITION_ADVANCED carries
    `{"completed_node": ..., "enqueued_nodes": [...]}`).
    
    `run_id` / `actor` (CR-14 follow-up) ride from the Job so journal rows
    written by the single emission path carry the host-tier correlation.
    """
    
    type: JobEventType
    job_id: str
    capability_instance_id: str
    composition_id: Optional[str]
    node_id: Optional[str]
    run_id: Optional[str]  # Host-tier run correlation (CR-14 follow-up)
    actor: Optional[str]  # Who/what initiated (CR-14 follow-up)
    timestamp: datetime = field(...)
    payload: Dict[str, Any] = field(...)
```

``` python
@dataclass
class QueueStats:
    "Aggregate counts returned by `JobQueue.get_stats()` (CR-6)."
    
    total_pending: int
    total_completed: int
    total_failed: int
    total_cancelled: int
```

``` python
@dataclass
class _Subscription:
    """
    Internal: per-subscriber bounded queue + drop counter (CR-6 event bus).
    
    Slow subscribers backpressure themselves via `asyncio.QueueFull` drop;
    the publisher never blocks. `dropped_count` surfaces visibility for
    operators / future telemetry.
    """
    
    queue: 'asyncio.Queue[JobEvent]'
    dropped_count: int = 0
```

``` python
@dataclass
class ResourceSnapshot:
    """
    Point-in-time resource usage for one job (CR-6 Stage 3).
    
    Worker stats (cpu_percent, memory_rss_mb) come from the capability proxy's
    `get_stats()`. GPU fields come from the configured system-monitor capability
    (when set on JobQueue) via CR-3's typed `list_processes()` (per-PID
    matching) and `get_system_status()` (global GPU stats). All GPU fields
    are Optional — None if no sysmon configured or the worker isn't running
    on a GPU.
    
    Distinct from CR-7's `EmpiricalResourceRecord` (aggregated profile
    across runs); this is "what's happening right now" for one job.
    """
    
    timestamp: datetime  # When the sample was taken
    worker_pid: int = 0  # OS PID of the capability worker subprocess
    cpu_percent: float = 0.0  # Worker process CPU%
    memory_rss_mb: float = 0.0  # Worker process resident memory (MB)
    gpu_index: Optional[int]  # GPU index the worker is on (None if not GPU-bound)
    gpu_memory_mb: Optional[float]  # Worker's GPU memory usage (MB)
    gpu_type: Optional[str]  # GPU vendor (NVIDIA / AMD / Intel / None)
    gpu_total_mb: Optional[float]  # Total GPU memory available globally (MB)
    gpu_load_percent: Optional[float]  # GPU compute utilization (global)
```

``` python
class JobQueue:
    def __init__(
    """
    Resource-aware multi-lane job queue with journal-primary observability
    (CR-6 + CR-14; stage-3 CR-16 rework: ready-set dispatch + resource-derived
    admission + composition execution).
    """
    
    def __init__(
        "Initialize the job queue.

Stage 3 (CR-16): the queue dispatches multiple admissible jobs
concurrently (`max_concurrent_lanes` is the operator safety valve);
per-job admission derives from empirical resource records + live
sysmon telemetry — see `_pop_next_admissible`. Worst case (no
records, no sysmon) every job runs exclusive, which is exactly the
pre-stage-3 single-lane behavior.

CR-14 (stage 7): emission is journal-primary — `_publish_event`
writes journal-class events as durable rows before fanning out to
live subscribers. The stores default to the deps' (CapabilityManager's)
stores via getattr, so the cores gain journaling with zero host
changes; a deps without them (test doubles) yields no journaling."
    
    def set_run_context(
            self,
            run_id: Optional[str] = None,  # Host-tier run correlation for subsequent submits
            actor: Optional[str] = None,   # Who/what initiated the run
        ) -> None
        "Set the queue-scoped default run context (CR-14 follow-up).

Every subsequent submit without explicit `run_id`/`actor` inherits
these — the one-queue-per-run CLI cores call this once after
generating their run-manifest id, and every journal row for the run
links back to it. Call again (or with None) to change/clear."
```

### Scheduling (`scheduling.ipynb`)

> Resource scheduling policies for capability execution

#### Import

``` python
from cjm_substrate.core.scheduling import (
    ResourceScheduler,
    PermissiveScheduler,
    SafetyScheduler,
    QueueScheduler
)
```

#### Functions

``` python
@patch
def _check_resources(
    self:QueueScheduler,
    capability_meta: CapabilityMeta,  # Capability metadata with manifest
    stats: Dict[str, Any]  # Current system stats
) -> bool:  # True if resources available
    "Check if system has sufficient resources for the capability."
```

``` python
@patch
def get_active_capabilities(self:QueueScheduler) -> Set[str]:  # Set of currently executing capability names
    "Get the set of capabilities with active executions."
```

#### Classes

``` python
class ResourceScheduler(ABC):
    "Abstract base class for resource allocation policies."
    
    def allocate(
            self,
            capability_meta: CapabilityMeta,  # Metadata of the capability requesting resources
            stats_provider: Callable[[], Dict[str, Any]]  # Function that returns fresh stats
        ) -> bool:  # True if execution is allowed
        "Decide if a capability can start based on its requirements and system state."
    
    async def allocate_async(
            self,
            capability_meta: CapabilityMeta,  # Metadata of the capability requesting resources
            stats_provider: Callable[[], Awaitable[Dict[str, Any]]]  # Async function returning stats
        ) -> bool:  # True if execution is allowed
        "Async allocation decision. Default delegates to sync allocate after fetching stats once."
    
    def on_execution_start(
            self,
            capability_name: str  # Name of the capability starting execution
        ) -> None
        "Notify scheduler that a task started (to reserve resources)."
    
    def on_execution_finish(
            self,
            capability_name: str  # Name of the capability finishing execution
        ) -> None
        "Notify scheduler that a task finished (to release resources)."
```

``` python
class PermissiveScheduler(ResourceScheduler):
    "Scheduler that allows all executions (Default / Dev Mode)."
    
    def allocate(
            self,
            capability_meta: CapabilityMeta,  # Metadata of the capability requesting resources
            stats_provider: Callable[[], Dict[str, Any]]  # Stats provider (ignored)
        ) -> bool:  # Always returns True
        "Allow all capability executions without checking resources."
    
    def on_execution_start(
            self,
            capability_name: str  # Name of the capability starting execution
        ) -> None
        "No-op for permissive scheduler."
    
    def on_execution_finish(
            self,
            capability_name: str  # Name of the capability finishing execution
        ) -> None
        "No-op for permissive scheduler."
```

``` python
class SafetyScheduler(ResourceScheduler):
    "Scheduler that prevents execution if resources are insufficient."
    
    def allocate(
            self,
            capability_meta: CapabilityMeta,  # Metadata of the capability requesting resources
            stats_provider: Callable[[], Dict[str, Any]]  # Function returning current stats
        ) -> bool:  # True if resources are available
        "Check resource requirements against system state."
    
    def on_execution_start(
            self,
            capability_name: str  # Name of the capability starting execution
        ) -> None
        "Called when execution starts (for future resource reservation)."
    
    def on_execution_finish(
            self,
            capability_name: str  # Name of the capability finishing execution
        ) -> None
        "Called when execution finishes (for future resource release)."
```

``` python
class QueueScheduler:
    def __init__(
        self,
        timeout: float = 300.0,  # Max seconds to wait for resources
        poll_interval: float = 2.0  # Seconds between resource checks
    )
    "Scheduler that waits for resources to become available."
    
    def __init__(
            self,
            timeout: float = 300.0,  # Max seconds to wait for resources
            poll_interval: float = 2.0  # Seconds between resource checks
        )
        "Initialize queue scheduler with timeout and polling settings."
    
    def allocate(
            self,
            capability_meta: CapabilityMeta,  # Metadata of the capability requesting resources
            stats_provider: Callable[[], Dict[str, Any]]  # Function returning current stats
        ) -> bool:  # True if resources become available before timeout
        "Wait for resources using blocking sleep."
    
    async def allocate_async(
            self,
            capability_meta: CapabilityMeta,  # Metadata of the capability requesting resources
            stats_provider: Callable[[], Awaitable[Dict[str, Any]]]  # Async stats function
        ) -> bool:  # True if resources become available before timeout
        "Wait for resources using non-blocking async sleep."
    
    def on_execution_start(
            self,
            capability_name: str  # Name of the capability starting execution
        ) -> None
        "Track that a capability has started executing."
    
    def on_execution_finish(
            self,
            capability_name: str  # Name of the capability finishing execution
        ) -> None
        "Track that a capability has finished executing."
```

### Capability Secret Store (`secret_store.ipynb`)

> CR-12: project-local secret storage for API-based capabilities
> (file-backed, 0600)

#### Import

``` python
from cjm_substrate.core.secret_store import (
    SecretStore,
    LocalSecretStore
)
```

#### Functions

``` python
def _default_secrets_dir() -> Path:
    """Default secrets directory: `~/.cjm/secrets` (bootstrap fallback)."""
    return Path.home() / ".cjm" / "secrets"


class LocalSecretStore
    "Default secrets directory: `~/.cjm/secrets` (bootstrap fallback)."
```

``` python
@patch
def _load(self:LocalSecretStore) -> Dict[str, Dict[str, Dict[str, str]]]:
    if not self.path.exists()
```

``` python
@patch
def _save(self:LocalSecretStore, data: Dict[str, Dict[str, Dict[str, str]]]) -> None:
    self.secrets_dir.mkdir(parents=True, exist_ok=True)
    try
```

``` python
@patch
def get_secret(
    self:LocalSecretStore,
    capability_name: str,  # Capability the secret belongs to
    key: str,          # Secret key (typically the env-var name, e.g. GEMINI_API_KEY)
    *,
    scope: Optional[str] = None  # Reserved multi-user seam; ignored by the local store
) -> Optional[str]:  # The secret value, or None if absent
    "Resolve a secret value."
```

``` python
@patch
def set_secret(
    self:LocalSecretStore,
    capability_name: str,  # Capability the secret belongs to
    key: str,          # Secret key
    value: str,        # Secret value (stored plaintext at 0600)
    *,
    scope: Optional[str] = None  # Reserved multi-user seam
) -> None
    "Persist a secret value."
```

``` python
@patch
def delete_secret(
    self:LocalSecretStore,
    capability_name: str,  # Capability the secret belongs to
    key: str,          # Secret key
    *,
    scope: Optional[str] = None  # Reserved multi-user seam
) -> bool:  # True if a secret was removed
    "Remove a secret, pruning now-empty capability/scope containers."
```

``` python
@patch
def list_keys(
    self:LocalSecretStore,
    capability_name: str,  # Capability to list secrets for
    *,
    scope: Optional[str] = None  # Reserved multi-user seam
) -> List[str]:  # Secret key NAMES (never values)
    "Return the names of secrets stored for a capability (never the values)."
```

#### Classes

``` python
@runtime_checkable
class SecretStore(Protocol):
    "Protocol for resolving per-capability secrets (API keys, tokens)."
    
    def get_secret(self, capability_name: str, key: str, *, scope: Optional[str] = None) -> Optional[str]:
            """Return the secret value for (capability, key) under `scope`, or None."""
            ...
    
        def set_secret(self, capability_name: str, key: str, value: str, *, scope: Optional[str] = None) -> None
        "Return the secret value for (capability, key) under `scope`, or None."
    
    def set_secret(self, capability_name: str, key: str, value: str, *, scope: Optional[str] = None) -> None:
            """Persist a secret value for (capability, key) under `scope`."""
            ...
    
        def delete_secret(self, capability_name: str, key: str, *, scope: Optional[str] = None) -> bool
        "Persist a secret value for (capability, key) under `scope`."
    
    def delete_secret(self, capability_name: str, key: str, *, scope: Optional[str] = None) -> bool:
            """Remove (capability, key) under `scope`. Returns True if a secret was deleted."""
            ...
    
        def list_keys(self, capability_name: str, *, scope: Optional[str] = None) -> List[str]
        "Remove (capability, key) under `scope`. Returns True if a secret was deleted."
    
    def list_keys(self, capability_name: str, *, scope: Optional[str] = None) -> List[str]
        "Return the NAMES of secrets stored for a capability under `scope` — never values."
```

``` python
class LocalSecretStore:
    def __init__(
        self,
        secrets_dir: Optional[Path] = None  # Directory for secrets.json; None -> ~/.cjm/secrets
    )
    "File-backed default `SecretStore` (0600 JSON under `secrets_dir`)."
    
    def __init__(
            self,
            secrets_dir: Optional[Path] = None  # Directory for secrets.json; None -> ~/.cjm/secrets
        )
        "Initialize the store. `secrets_dir=None` uses `~/.cjm/secrets`."
```

#### Variables

``` python
_SECRETS_FILENAME = 'secrets.json'
_DEFAULT_SCOPE = '__default__'
```

### Substrate Telemetry Helpers (`telemetry.ipynb`)

> Shared GPU/CPU attribution helpers used by both
> `JobQueue._sample_resource_snapshot` (CR-6 Stage 3) and
> `CapabilityManager._record_sample_safe` (CR-7).

#### Import

``` python
# No corresponding Python module found for core.telemetry
```

#### Functions

``` python
def _proc_field(proc: Any, key: str, default: Any = None) -> Any:
    """Read a field from a sysmon process record, accepting dict or dataclass.

    `MonitorToolProtocol.list_processes()` returns `ProcessStats` dataclasses (CR-3),
    but proxy round-trips frequently coerce to dicts. Accept both so the
    helper works against either form without the caller pre-normalizing.
    """
    if isinstance(proc, dict)
    """
    Read a field from a sysmon process record, accepting dict or dataclass.
    
    `MonitorToolProtocol.list_processes()` returns `ProcessStats` dataclasses (CR-3),
    but proxy round-trips frequently coerce to dicts. Accept both so the
    helper works against either form without the caller pre-normalizing.
    """
```

``` python
def _worker_subtree_pids(stats: Dict[str, Any]) -> set:
    """Build the worker subtree PID set from a `/stats` dict.

    Falls back to a single-pid set when `subtree_pids` is absent (pre-fix
    workers, mock test fixtures). The worker pid itself is always included.
    """
    tree: set = set()
    """
    Build the worker subtree PID set from a `/stats` dict.
    
    Falls back to a single-pid set when `subtree_pids` is absent (pre-fix
    workers, mock test fixtures). The worker pid itself is always included.
    """
```

``` python
def attribute_gpu_to_worker_subtree(
    stats: Dict[str, Any],  # Worker `/stats` payload (must include 'pid'; uses 'subtree_pids' if present)
    sysmon: Any,            # The configured monitor capability (or None)
) -> Optional[Dict[str, Any]]
    """
    Attribute GPU memory across the worker's process subtree.
    
    Returns `{'gpu_memory_mb': float, 'gpu_index': Optional[int]}` when sysmon
    is reachable, or `None` when sysmon isn't configured / doesn't expose
    `list_processes()` / errors out. Callers treat `None` as "sysmon
    unavailable" and leave GPU snapshot fields as their defaults; a 0.0 sum
    means sysmon worked but no subtree PID holds GPU memory (CPU-only capability
    on a GPU box).
    """
```

### Configuration Validation (`validation.ipynb`)

> Validation helpers for capability configuration dataclasses

#### Import

``` python
from cjm_substrate.utils.validation import (
    T,
    SCHEMA_TITLE,
    SCHEMA_DESC,
    SCHEMA_MIN,
    SCHEMA_MAX,
    SCHEMA_ENUM,
    SCHEMA_MIN_LEN,
    SCHEMA_MAX_LEN,
    SCHEMA_PATTERN,
    SCHEMA_FORMAT,
    validate_field_value,
    validate_config,
    config_to_dict,
    dict_to_config,
    extract_defaults,
    dataclass_to_jsonschema
)
```

#### Functions

``` python
def validate_field_value(
    value:Any, # Value to validate
    metadata:Dict[str, Any], # Field metadata containing constraints
    field_name:str="" # Field name for error messages
) -> Tuple[bool, Optional[str]]: # (is_valid, error_message)
    "Validate a value against field metadata constraints."
```

``` python
def validate_config(
    config:Any # Configuration dataclass instance to validate
) -> Tuple[bool, Optional[str]]: # (is_valid, error_message)
    "Validate all fields in a configuration dataclass against their metadata constraints."
```

``` python
def config_to_dict(
    config:Any # Configuration dataclass instance
) -> Dict[str, Any]: # Dictionary representation of the configuration
    "Convert a configuration dataclass instance to a dictionary."
```

``` python
def dict_to_config(
    config_class:Type[T], # Configuration dataclass type
    data:Optional[Dict[str, Any]]=None, # Dictionary with configuration values
    validate:bool=False, # Whether to validate against metadata constraints
    strict:bool=True # SG-8: reject unknown keys (default); set False to log+filter for forward-compat
) -> T: # Instance of the configuration dataclass
    """
    Create a configuration dataclass instance from a dictionary.
    
    SG-8: by default, unknown keys raise `CapabilityConfigError`. The previous
    behavior (silently filter unknowns) is available via `strict=False`,
    which logs a warning so the drift is still visible in operator logs.
    """
```

``` python
def extract_defaults(
    config_class:Type # Configuration dataclass type
) -> Dict[str, Any]: # Default values from the dataclass
    "Extract default values from a configuration dataclass type."
```

``` python
def _python_type_to_json_type(
    python_type:type # Python type annotation to convert
) -> Dict[str, Any]: # JSON schema type definition
    "Convert Python type to JSON schema type."
```

``` python
def dataclass_to_jsonschema(
    cls:type # Dataclass with field metadata
) -> Dict[str, Any]: # JSON schema dictionary
    "Convert a dataclass to a JSON schema for form generation."
```

#### Variables

``` python
T
SCHEMA_TITLE = 'title'  # Display title for the field
SCHEMA_DESC = 'description'  # Help text description
SCHEMA_MIN = 'minimum'  # Minimum value for numbers
SCHEMA_MAX = 'maximum'  # Maximum value for numbers
SCHEMA_ENUM = 'enum'  # Allowed values for dropdowns
SCHEMA_MIN_LEN = 'minLength'  # Minimum string length
SCHEMA_MAX_LEN = 'maxLength'  # Maximum string length
SCHEMA_PATTERN = 'pattern'  # Regex pattern for strings
SCHEMA_FORMAT = 'format'  # String format (email, uri, date, etc.)
```

### Typed Wire Layer (`wire.ipynb`)

> Typed data transfer at the worker boundary — the zero-copy
> `FileBackedDTO`

#### Import

``` python
from cjm_substrate.core.wire import (
    WIRE_KIND_KEY,
    WIRE_DATA_KEY,
    ENVELOPE_BODY_KEY,
    ACCOUNTS_HEADER,
    FileBackedDTO,
    flat_from_dict,
    wire_type,
    wire_encode,
    wire_decode,
    CallEnvelope,
    set_call_envelope,
    reset_call_envelope,
    get_call_envelope,
    begin_account_capture,
    record_account,
    drain_accounts
)
```

#### Functions

``` python
def flat_from_dict(
    """
    Default reconstruction for FLAT wire DTOs (no nested-DTO fields).
    
    Filters the payload to the dataclass's declared fields (unknown extras
    are dropped with a debug log — transport-terminus tolerance, see the
    envelope design note) and lets the constructor enforce required fields
    (a missing required field raises TypeError loudly). DTOs with nested
    DTO fields (e.g. a result carrying a list of typed items) must define
    their own `from_dict` classmethod; `@wire_type` only attaches this
    default when the class has none.
    """
```

``` python
def wire_type(
    kind: str  # Stable wire discriminator, e.g. "transcription.result"
) -> Callable[[type], type]:  # Class decorator
    """
    Register a dataclass as a typed wire DTO under `kind`.
    
    - The class must be a dataclass (encode falls back to
      `dataclasses.asdict` when it defines no `to_dict`).
    - If the class defines no `from_dict`, the flat default
      (`flat_from_dict`) is attached; nested DTOs define their own.
    - Re-registering the same LOGICAL class (qualname match; the module is
      ignored because nbdev's literate workflow defines each class twice —
      in-notebook `__main__` + the exported module) replaces the decode
      entry; a DIFFERENT class claiming an already-registered kind raises
      ValueError.
    """
```

``` python
def wire_encode(
    obj: Any  # A task result (any shape)
) -> Any:     # Tagged envelope dict for registered DTOs; `obj` unchanged otherwise
    """
    Wrap a registered wire DTO in its tagged envelope (worker side).
    
    Exact-type lookup: subclasses are NOT encoded under the parent's kind
    (they pass through unregistered, preserving today's behavior).
    Payload preference: the DTO's own `to_dict()` when defined, else
    `dataclasses.asdict` (recursive — nested dataclasses flatten).
    """
```

``` python
def wire_decode(
    obj: Any  # A JSON-decoded response body (any shape)
) -> Any:     # The typed DTO for known kinds; `obj` unchanged otherwise
    """
    Reconstruct a typed result from its tagged envelope (host side).
    
    Known kind -> the registered class's `from_dict` (strict: a missing
    required field raises). Unknown kind -> the dict passes through
    UNCHANGED with the envelope intact (tolerant degradation for hosts
    without the result's interface library). Untagged values pass through.
    """
```

``` python
def set_call_envelope(env: Optional[CallEnvelope]) -> contextvars.Token:
    """Set the current call envelope; returns the token for `reset_call_envelope`."""
    return _CALL_ENVELOPE.set(env)


def reset_call_envelope(token: contextvars.Token) -> None
    "Set the current call envelope; returns the token for `reset_call_envelope`."
```

``` python
def reset_call_envelope(token: contextvars.Token) -> None:
    """Restore the prior envelope (always pair with `set_call_envelope` in finally)."""
    _CALL_ENVELOPE.reset(token)


def get_call_envelope() -> Optional[CallEnvelope]
    "Restore the prior envelope (always pair with `set_call_envelope` in finally)."
```

``` python
def get_call_envelope() -> Optional[CallEnvelope]
    "The current call envelope, or None outside any call span."
```

``` python
def begin_account_capture() -> None:
    """Start a fresh account list for the current call span (worker endpoint
    entry; same no-reset semantics as the envelope — the ASGI request task's
    context dies with the request)."""
    _CALL_ACCOUNTS.set([])


def record_account(
    event_type: str,  # SubstrateEventType value (task_account / result_saved / cache_hit / ...)
    payload: Optional[dict] = None,  # Structured detail (references + hashes, never content)
) -> None
    """
    Start a fresh account list for the current call span (worker endpoint
    entry; same no-reset semantics as the envelope — the ASGI request task's
    context dies with the request).
    """
```

``` python
def record_account(
    event_type: str,  # SubstrateEventType value (task_account / result_saved / cache_hit / ...)
    payload: Optional[dict] = None,  # Structured detail (references + hashes, never content)
) -> None
    """
    Record one substrate-family account for the current call span.
    
    Called by the worker itself and by interface-lib storage helpers.
    Silent no-op outside a capture span (standalone runs, host imports) —
    the envelope-less-call posture applied to accounts.
    """
```

``` python
def drain_accounts() -> list:
    """Return + clear the current span's recorded accounts ([] outside a span
    or when nothing was recorded). The worker response path calls this once
    when building the response headers."""
    lst = _CALL_ACCOUNTS.get()
    if not lst
    """
    Return + clear the current span's recorded accounts ([] outside a span
    or when nothing was recorded). The worker response path calls this once
    when building the response headers.
    """
```

#### Classes

``` python
@runtime_checkable
class FileBackedDTO(Protocol):
    "Protocol for Data Transfer Objects that serialize to disk for zero-copy transfer."
    
    def to_temp_file(self) -> str: # Absolute path to the temporary file
        "Save the data to a temporary file and return the absolute path."
```

``` python
@dataclass
class CallEnvelope:
    """
    Substrate-owned per-call identity + control block (CR-14 / CR-15).
    
    Travels as a top-level `"envelope"` key on every proxy→worker call body.
    All fields optional — an envelope-less call (direct proxy use, old hosts)
    simply yields unattributed records, never a failure.
    """
    
    job_id: Optional[str]  # Queue job identity
    run_id: Optional[str]  # Host-tier run correlation (core run manifests)
    composition_id: Optional[str]  # Stage-3 composition correlation
    node_id: Optional[str]  # Composition node correlation
    actor: Optional[str]  # Who/what initiated (operator / agent / host id)
    control: dict = field(...)  # Per-call flags (force/cache-bypass — the 4th CR-15 category)
    
    def to_wire(self) -> dict:
            """Compact wire form: None fields dropped; empty control dropped."""
            out = {}
            for f in _dc_fields(self)
        "Compact wire form: None fields dropped; empty control dropped."
    
    def from_wire(cls, d: dict) -> "CallEnvelope"
        "Tolerant decode: unknown keys ignored (forward compat)."
```

#### Variables

``` python
WIRE_KIND_KEY = '__wire__'
WIRE_DATA_KEY = 'data'
_WIRE_TYPES: Dict[str, type]
_WIRE_KINDS: Dict[type, str]
ENVELOPE_BODY_KEY = 'envelope'  # Top-level request-body key (never inside kwargs)
_CALL_ENVELOPE: contextvars.ContextVar[Optional[CallEnvelope]]
ACCOUNTS_HEADER = 'X-CJM-Accounts'  # Response header carrying recorded accounts (ASCII JSON)
_CALL_ACCOUNTS: contextvars.ContextVar[Optional[list]]
```

### Universal Worker (`worker.ipynb`)

> FastAPI server that runs inside isolated capability environments

#### Import

``` python
from cjm_substrate.core.worker import (
    EnhancedJSONEncoder,
    parent_monitor,
    create_app,
    run_worker
)
```

#### Functions

``` python
def parent_monitor(
    ppid: int # Parent process ID to monitor
) -> None
    """
    Monitor parent process and terminate self if parent dies.
    
    This implements the "Suicide Pact" pattern: if the Host process dies,
    the Worker must terminate itself to prevent zombie processes.
    """
```

``` python
def _load_capability_instance(
    module_name: str, # Python module path (e.g., "my_capability.capability")
    class_name: str   # Capability class name (e.g., "WhisperCapability")
):                    # Instantiated capability object
    """
    Dynamically load + instantiate the capability class.
    
    Runs synchronously before app construction so a load failure terminates
    the worker process with exit code 1 (matches pre-lifespan behavior;
    loading must succeed for the worker to be useful at all).
    """
```

``` python
def _make_lifespan(
    capability_instance  # The loaded capability object (closure-captured for shutdown cleanup)
):                   # FastAPI lifespan async context manager
    "Build the FastAPI lifespan that invokes capability.cleanup() on shutdown."
```

``` python
def _register_identity_endpoints(
    app,             # FastAPI app under construction
    capability_instance, # The loaded capability object
) -> None
    "/health + /stats: worker identity + process-subtree telemetry."
```

``` python
def _register_lifecycle_endpoints(
    app,             # FastAPI app under construction
    capability_instance, # The loaded capability object
) -> None
    """
    /initialize /prefetch /reconfigure /on_disable /on_enable /cleanup:
    the tool-capability lifecycle surface.
    """
```

``` python
def _register_config_endpoints(
    app,             # FastAPI app under construction
    capability_instance, # The loaded capability object
) -> None
    "/config_schema /config /config_options: the config surface."
```

``` python
def _load_adapters(
    capability_instance,  # The loaded tool-capability instance
    adapter_specs,    # List of "module:ClassName" impl specs (host-matched)
) -> Dict[str, Any]:  # task_name -> bound adapter instance
    """
    Instantiate task-adapter impls bound to this worker's tool (CR-17 pt 2).
    
    Each spec was matched HOST-side (adapter-manifest protocol members vs the
    capability's recorded structural surface) before reaching the worker, so a
    spec failing HERE is an INSTALL gap (interface lib missing from this env),
    not a compatibility miss — log loudly, skip, keep serving /execute.
    Binding convention: `AdapterClass(capability_instance)`; keyed by the class's
    `task_name` ClassVar.
    """
```

``` python
def _apply_call_envelope(
    data: Dict[str, Any],  # The decoded request body
) -> None
    """
    CR-14: decode the wire envelope into the worker-side contextvar.
    
    Set WITHOUT reset: ASGI handles each request in its own asyncio task, so
    the context (and the var) dies with the request — and for
    /execute_stream the response iteration runs in the SAME request task
    AFTER the endpoint returns, which is exactly why a reset-before-return
    would lose the identity (Starlette's iterate_in_threadpool copies the
    request task's context per chunk). An absent envelope leaves the var
    None — records stay honestly unattributed.
    
    CR-14 follow-up: also begins a fresh account-capture span (same no-reset
    semantics) so in-worker substrate-family accounts (TASK_ACCOUNT /
    RESULT_SAVED / CACHE_HIT) accumulate per call and ride back on the
    response header — see `_accounts_headers`.
    """
```

``` python
def _accounts_headers() -> Dict[str, str]:
    """Drain the call span's recorded accounts into the response header dict.

    Empty dict when nothing was recorded (header absent — old hosts and
    account-less calls see byte-identical responses). ASCII JSON
    (`ensure_ascii` default) keeps the header latin-1-safe.
    """
    accounts = drain_accounts()
    if not accounts
    """
    Drain the call span's recorded accounts into the response header dict.
    
    Empty dict when nothing was recorded (header absent — old hosts and
    account-less calls see byte-identical responses). ASCII JSON
    (`ensure_ascii` default) keeps the header latin-1-safe.
    """
```

``` python
def _register_task_endpoints(
    app,             # FastAPI app under construction
    capability_instance, # The loaded capability object
    adapters=None,   # task_name -> bound adapter instance (CR-17 pt 2; stage 4)
) -> None
    """
    /execute /execute_stream /cancel /progress /task: the task channel.
    
    Stage 2 (typed wire layer): both result-serialization sites pass
    through `wire_encode`, so results whose DTO classes are registered
    via `@wire_type` cross the boundary in the tagged envelope and arrive
    typed at the proxy; unregistered results serialize exactly as before.
    
    CR-14 (stage 7): each call decodes the per-call envelope into the
    contextvar and carries it into the executor thread via
    `contextvars.copy_context()` (run_in_executor does NOT copy context by
    itself) — the diagnostics handler stamps every capability log record with
    exact job identity, replacing the timestamp-window heuristic.
    
    CR-14 follow-up: the unary paths (/execute, /task) return recorded
    accounts on the `X-CJM-Accounts` response header (success AND the
    `_job_error` 500 — a failed call still reports the accounts recorded
    before the failure). The host journals them with `worker_reported=True`.
    """
```

``` python
def _register_monitor_endpoints(
    app,             # FastAPI app under construction
    capability_instance, # The loaded capability object
    class_name: str, # Capability class name (for 404 detail messages)
) -> None
    "/get_system_status /list_processes: CR-3 typed MonitorToolProtocol accessors."
```

``` python
def create_app(
    module_name: str, # Python module path (e.g., "my_capability.capability")
    class_name: str,  # Capability class name (e.g., "WhisperCapability")
    adapter_specs=None # CR-17 pt 2: "module:ClassName" adapter impl specs to bind in-worker
) -> FastAPI: # Configured FastAPI application
    """
    Create FastAPI app that hosts the specified capability.
    
    NB-2 reshape (stage 2): a thin assembler — load the capability, build the
    lifespan, register the endpoint groups. Endpoint behavior lives in the
    module-level `_register_*` helpers above.
    """
```

``` python
def run_worker() -> None:
    """CLI entry point for running the worker."""
    parser = argparse.ArgumentParser(description="Universal Capability Worker")
    parser.add_argument("--module", required=True, help="Capability module path")
    parser.add_argument("--class", dest="class_name", required=True, help="Capability class name")
    parser.add_argument("--adapters", required=False, default="",
                        help="Comma-separated adapter impl specs 'module:ClassName' (CR-17 pt 2)")
    # SG-4: parent-bound listening-socket FD inheritance closes the
    "CLI entry point for running the worker."
```

#### Classes

``` python
class EnhancedJSONEncoder(JSONEncoder):
    """
    JSON encoder that handles dataclasses and other common types.
    
    SG-52: datetime support added so JobError.occurred_at serializes cleanly
    when emitted via the typed `_job_error` terminal chunk in /execute_stream.
    """
    
    def default(
            self,
            o: Any # Object to encode
        ) -> Any: # JSON-serializable representation
        "Convert non-serializable objects to serializable form."
```
