Metadata-Version: 2.4
Name: cjm-media-processing-adapter-interface
Version: 0.0.2
Summary: Typed media-processing task-adapter interface — the multi-method MediaProcessingAdapter ABC + GenericMediaProcessingAdapter (cache/persist bookends around a pure-compute tool: convert, segment_audio, extract_audio, plus an uncached get_info probe) + the MediaProcessingToolProtocol. Result DTOs live in cjm-capability-primitives.
Author-email: "Christian J. Mills" <9126128+cj-mills@users.noreply.github.com>
License: Apache-2.0
Project-URL: Repository, https://github.com/cj-mills/cjm-media-processing-adapter-interface
Project-URL: Documentation, https://cj-mills.github.io/cjm-media-processing-adapter-interface/
Keywords: nbdev
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3 :: Only
Requires-Python: >=3.12
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: cjm_plugin_system>=0.0.46
Requires-Dist: cjm_capability_primitives>=0.0.6
Dynamic: license-file

# cjm-media-processing-adapter-interface


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

## Install

``` bash
pip install cjm_media_processing_adapter_interface
```

## Project Structure

    nbs/
    ├── adapter.ipynb # The typed media-processing task contract — the multi-method `MediaProcessingAdapter` ABC + the `MediaProcessingToolProtocol` structural contract (capability-unit Option C, pass-2 Thread 3). ONE multi-method adapter for the media-processing tool's whole repertoire (the graph-storage precedent), NOT per-action adapters.
    ├── generic.ipynb # The generic (tool-agnostic) multi-method media-processing adapter — cache-bookended artifact ops (`convert` / `segment_audio` / `extract_audio`) + an uncached `get_info` probe. Reused across every tool capability satisfying `MediaProcessingToolProtocol` (ffmpeg today).
    └── storage.ipynb # Standardized SQLite storage for media-processing results (produced-artifact pointers) with content hashing — born-final, action-aware, no `job_id`.

Total: 3 notebooks

## Module Dependencies

``` mermaid
graph LR
    adapter["adapter<br/>Media Processing Adapter"]
    generic["generic<br/>Generic Media Processing Adapter"]
    storage["storage<br/>Media Processing Storage"]

    generic --> adapter
    generic --> storage
```

*2 cross-module dependencies detected*

## CLI Reference

No CLI commands found in this project.

## Module Overview

Detailed documentation for each module in the project:

### Media Processing Adapter (`adapter.ipynb`)

> The typed media-processing task contract — the multi-method
> `MediaProcessingAdapter` ABC + the `MediaProcessingToolProtocol`
> structural contract (capability-unit Option C, pass-2 Thread 3). ONE
> multi-method adapter for the media-processing tool’s whole repertoire
> (the graph-storage precedent), NOT per-action adapters.

#### Import

``` python
from cjm_media_processing_adapter_interface.adapter import (
    MediaProcessingToolProtocol,
    MediaProcessingAdapter
)
```

#### Classes

``` python
@runtime_checkable
class MediaProcessingToolProtocol(Protocol):
    """
    Structural contract for media-processing tool capabilities (ffmpeg today;
    born-final at stage 8 — derived from the native tool surface).
    
    Artifact-producing ops (`convert`, `segment_audio`, `extract_audio`) are pure
    compute that WRITE audio file(s) into the adapter-chosen `output_dir` (the
    artifact-write seam) and return a typed pointer. `get_info` is a pure-query
    probe returning inline `MediaMetadata` (no artifact). `get_current_config`
    supplies the effective config the generic adapter hashes (together with the
    per-call request params) for its cache key.
    
    The `output_dir` parameter on the artifact ops is the adapter telling the
    tool WHERE to persist (the `db_path`-off-the-tool rule); encoding the audio
    stays tool-side compute. Per-call request params (`output_format`,
    `sample_rate`, `channels`, `boundaries`, `filename_template`) define WHAT
    artifact to produce and are hashed into the cache key.
    """
    
    def convert(self, input_path: Union[str, Path], output_dir: str,
                    output_format: str, sample_rate: Optional[int] = None,
                    channels: Optional[int] = None, **kwargs) -> MediaArtifactResult: ...
    
    def segment_audio(self, input_path: Union[str, Path], output_dir: str,
                          boundaries: List[Dict[str, float]], output_format: Optional[str] = None,
                          filename_template: str = "segment_{index:03d}", **kwargs) -> MediaSegmentationResult: ...
    
    def extract_audio(self, input_path: Union[str, Path], output_dir: str,
                          output_format: Optional[str] = None, **kwargs) -> MediaArtifactResult: ...
    
    def get_info(self, file_path: Union[str, Path]) -> MediaMetadata: ...
        def get_current_config(self) -> Dict[str, Any]: ...
    
    def get_current_config(self) -> Dict[str, Any]: ...
```

``` python
class MediaProcessingAdapter:
    def __init__(
        self,
        tool: MediaProcessingToolProtocol,  # The bound tool capability instance (worker-side binding)
    )
    """
    Typed media-processing task adapter — ONE multi-method adapter for the
    media-processing tool's whole repertoire (the graph-storage multi-method
    precedent, NOT per-action adapters: the `task_name` key answers "which tools
    do media-processing", and there is one such tool — ffmpeg — that will gain
    MORE ops over time, e.g. building video from images + audio, so per-action
    would only multiply task families + libs without aiding discovery).
    
    Native-surface model (stage 8 / PILLAR 1c): the TOOL is pure compute; the
    ADAPTER owns the cache + persistence bookends (see
    `GenericMediaProcessingAdapter`) + the per-call `force` control. The
    artifact ops write audio file(s) under the substrate-injected
    `PLUGIN_DATA_DIR`; the cache row maps (action, input, config) -> output(s).
    The adapter chooses the output location and passes it to the tool; `db_path`
    is not on the tool protocol. `get_info` is an UNCACHED pure-query
    pass-through (a probe; no artifact).
    
    Input contract: `convert`/`segment_audio` receive a source (or an upstream
    artifact, e.g. a vocals stem) path; the produced WAVs ARE the model-ready
    audio downstream consumers (VAD/transcription) use. `extract_audio` pulls an
    audio stream from a video container (the non-minimal video-input path).
    Implementations run in-worker beside their tool, constructed
    `AdapterClass(tool)` (mirrors `GraphStorageAdapter`). Result DTOs are
    wire-registered (`media_processing.{artifact,segmentation,metadata}`) so
    returned values cross the worker boundary typed.
    """
    
    def __init__(
            self,
            tool: MediaProcessingToolProtocol,  # The bound tool capability instance (worker-side binding)
        )
    
    def convert(
            self,
            input_path: Union[str, Path],     # Source (or upstream artifact) audio/video to convert
            output_format: str,               # Target audio format (e.g. 'wav', 'mp3')
            sample_rate: Optional[int] = None,  # Target sample rate (e.g. 16000 for model input); None = source rate
            channels: Optional[int] = None,     # Target channel count (e.g. 1 = mono); None = source channels
            **kwargs,                         # Provenance + tool options
        ) -> MediaArtifactResult:  # The produced converted-audio artifact (cached)
        "Convert media to target/model-ready audio (cached artifact)."
    
    def segment_audio(
            self,
            input_path: Union[str, Path],         # Source audio to cut
            boundaries: List[Dict[str, float]],   # [{start, end}, ...] cut points (seconds)
            output_format: Optional[str] = None,  # Output format; None = same as input
            filename_template: str = "segment_{index:03d}",  # Per-segment filename pattern
            **kwargs,                             # Provenance + tool options
        ) -> MediaSegmentationResult:  # The produced batch of segment files (cached at batch level)
        "Cut a source into a batch of segment files at the given boundaries (cached batch)."
    
    def extract_audio(
            self,
            input_path: Union[str, Path],         # Source video container
            output_format: Optional[str] = None,  # Audio format; None = codec-derived
            **kwargs,                             # Provenance + tool options
        ) -> MediaArtifactResult:  # The produced extracted-audio artifact (cached)
        "Extract the audio stream from a video container (cached artifact)."
    
    def get_info(
            self,
            file_path: Union[str, Path],  # Media file to probe
        ) -> MediaMetadata:  # Probed metadata (uncached)
        "Probe media metadata (uncached pure-query)."
```

### Generic Media Processing Adapter (`generic.ipynb`)

> The generic (tool-agnostic) multi-method media-processing adapter —
> cache-bookended artifact ops (`convert` / `segment_audio` /
> `extract_audio`) + an uncached `get_info` probe. Reused across every
> tool capability satisfying `MediaProcessingToolProtocol` (ffmpeg
> today).

#### Import

``` python
from cjm_media_processing_adapter_interface.generic import (
    GenericMediaProcessingAdapter
)
```

#### Classes

``` python
class GenericMediaProcessingAdapter(MediaProcessingAdapter):
    """
    Generic multi-method media-processing adapter, reused across any tool
    satisfying `MediaProcessingToolProtocol`.
    
    Artifact ops (`convert`/`extract_audio`/`segment_audio`) share the
    source-separation bookend (cache-check + artifact-existence recheck ->
    adapter-chosen `output_dir` -> tool -> persist). `get_info` is an uncached
    pass-through. The cache key hashes `get_current_config()` together with the
    per-call request params, and `force` rides `CallEnvelope.control` (keeping
    the task methods pure). Storage + artifacts live under the substrate-injected
    `PLUGIN_DATA_DIR` (`media_processing.db` + `cache_dir_for_config` dirs).
    """
    
    def convert(self, input_path, output_format, sample_rate=None, channels=None, **kwargs) -> MediaArtifactResult:
            """Convert media to target/model-ready audio (cached artifact)."""
            request_params = {"output_format": output_format, "sample_rate": sample_rate, "channels": channels}
        "Convert media to target/model-ready audio (cached artifact)."
    
    def extract_audio(self, input_path, output_format=None, **kwargs) -> MediaArtifactResult:
            """Extract the audio stream from a video container (cached artifact)."""
            request_params = {"output_format": output_format}
        "Extract the audio stream from a video container (cached artifact)."
    
    def segment_audio(self, input_path, boundaries, output_format=None,
                          filename_template="segment_{index:03d}", **kwargs) -> MediaSegmentationResult
        "Cut a source into a batch of segment files (cached at BATCH level).

One cache row per (segment_audio, input, config): output_path = the batch
dir, metadata = the whole typed result (segments + batch_key + counts), so
a hit reconstructs the `MediaSegmentationResult` — but only if EVERY
produced segment file still exists (the batch analog of the single-artifact
existence recheck). The per-segment resume-skip the fused tool did is
intentionally NOT ported (born-final simplicity); watch the journal for
batch re-compute cost before reinstating it."
    
    def get_info(self, file_path) -> MediaMetadata
        "Probe media metadata — UNCACHED pure-query pass-through (no artifact)."
```

### Media Processing Storage (`storage.ipynb`)

> Standardized SQLite storage for media-processing results
> (produced-artifact pointers) with content hashing — born-final,
> action-aware, no `job_id`.

#### Import

``` python
from cjm_media_processing_adapter_interface.storage import (
    MediaProcessingRow,
    MediaProcessingStorage
)
```

#### Classes

``` python
@dataclass
class MediaProcessingRow:
    "A single row from the media_processing_results table."
    
    action: str  # The media-processing op: 'convert', 'segment_audio', 'extract_audio'
    input_path: str  # Path to the source media file
    input_hash: str  # Hash of the input file in "algo:hexdigest" format
    config_hash: str  # Hash of the effective config + per-call request params
    output_path: str  # Produced artifact path (a file for convert/extract_audio; the batch dir for segment_audio)
    output_hash: Optional[str]  # Hash of the produced artifact (None for a batch)
    metadata: Optional[Dict[str, Any]]  # Processing metadata (for segment_audio: the full typed batch result)
    created_at: Optional[float]  # Unix timestamp
```

``` python
class MediaProcessingStorage:
    def __init__(
        self,
        db_path: str  # Absolute path to the SQLite database file
    )
    "Standardized SQLite storage for media-processing results (artifact pointers), born-final."
    
    def __init__(
            self,
            db_path: str  # Absolute path to the SQLite database file
        )
        "Initialize storage and create table if needed."
    
    def save(
            self,
            action: str,        # Media-processing op: 'convert', 'segment_audio', 'extract_audio'
            input_path: str,    # Path to the source media file
            input_hash: str,    # Hash of the input file in "algo:hexdigest" format
            config_hash: str,   # Hash of the effective config + per-call request params
            output_path: str,   # Produced artifact path (file, or batch dir for segment_audio)
            output_hash: Optional[str] = None,        # Hash of the produced artifact (None for a batch)
            metadata: Optional[Dict[str, Any]] = None  # Processing metadata (batch result for segment_audio)
        ) -> None
        "Save or replace a media-processing result (upsert by action + input_path + config_hash)."
    
    def save_with_logging(
            self,
            *,
            action: str,        # Media-processing op
            input_path: str,    # Path to the source media file
            input_hash: str,    # Hash of the input file in "algo:hexdigest" format
            config_hash: str,   # Hash of the effective config + per-call request params
            output_path: str,   # Produced artifact path
            output_hash: Optional[str] = None,         # Hash of the produced artifact
            metadata: Optional[Dict[str, Any]] = None,  # Processing metadata
            logger: Optional[logging.Logger] = None      # Optional logger for success/failure messages
        ) -> bool:  # True if saved; False if the save failed (error logged, not raised)
        "Save a result, logging success/failure. Failures are logged and swallowed (returns False).

CR-14 follow-up: records a RESULT_SAVED account either way (ok flag +
action/input/output/config references — the journal never carries
content) so saves AND swallowed save-failures become auditable journal rows."
    
    def get_cached(
            self,
            action: str,       # Media-processing op
            input_path: str,   # Path to the source media file
            input_hash: str,   # Content hash of the input (cache miss if the file changed)
            config_hash: str   # Config hash to match
        ) -> Optional[MediaProcessingRow]:  # Cached row or None
        "Retrieve a content-correct cached media-processing result.

Matches on action + input_path + input_hash + config_hash, so a changed
input (new input_hash) misses even though a stale row may still exist at
the same (action, input_path, config_hash) — the next save() replaces it.
The CALLER must still confirm the artifact(s) at `output_path` exist
before serving them (they live outside the DB).

CR-14 follow-up: a hit records a CACHE_HIT account (the cache-serving
decision is an account-of-action)."
    
    def list_jobs(
            self,
            limit: int = 100  # Maximum number of rows to return
        ) -> List[MediaProcessingRow]:  # List of media-processing rows
        "List media-processing results ordered by creation time (newest first)."
    
    def verify_input(
            self,
            action: str,      # Media-processing op
            input_path: str,  # Path to the source media file
            config_hash: str  # Config hash to look up
        ) -> Optional[bool]:  # True if input matches, False if changed, None if not found
        "Verify the source file still matches the hash stored for (action, input_path, config_hash)."
```
