Metadata-Version: 2.4
Name: cjm-media-plugin-system
Version: 0.0.14
Summary: Defines standardized interfaces and data structures for media analysis (VAD, Scene Detection) and processing (FFmpeg, Conversion) plugins within the cjm-plugin-system ecosystem.
Author-email: "Christian J. Mills" <9126128+cj-mills@users.noreply.github.com>
License: Apache-2.0
Project-URL: Repository, https://github.com/cj-mills/cjm-media-plugin-system
Project-URL: Documentation, https://cj-mills.github.io/cjm-media-plugin-system
Keywords: nbdev,jupyter,notebook,python
Classifier: Natural Language :: English
Classifier: Intended Audience :: Developers
Classifier: Development Status :: 3 - Alpha
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3 :: Only
Requires-Python: >=3.12
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: cjm_plugin_system>=0.0.39
Dynamic: license-file

# cjm-media-plugin-system


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

## Install

``` bash
pip install cjm_media_plugin_system
```

## Project Structure

    nbs/
    ├── analysis_interface.ipynb   # Domain-specific plugin interface for media analysis (read-only / signal extraction)
    ├── core.ipynb                 # DTOs for media analysis and processing with FileBackedDTO support for zero-copy transfer
    ├── processing_interface.ipynb # Domain-specific plugin interface for media processing (write / file manipulation)
    └── storage.ipynb              # Standardized SQLite storage for media analysis and processing results with content hashing

Total: 4 notebooks

## Module Dependencies

``` mermaid
graph LR
    analysis_interface["analysis_interface<br/>Media Analysis Plugin Interface"]
    core["core<br/>Core Data Structures"]
    processing_interface["processing_interface<br/>Media Processing Plugin Interface"]
    storage["storage<br/>Media Storage"]

    analysis_interface --> core
    processing_interface --> core
```

*2 cross-module dependencies detected*

## CLI Reference

No CLI commands found in this project.

## Module Overview

Detailed documentation for each module in the project:

### Media Analysis Plugin Interface (`analysis_interface.ipynb`)

> Domain-specific plugin interface for media analysis (read-only /
> signal extraction)

#### Import

``` python
from cjm_media_plugin_system.analysis_interface import (
    MediaAnalysisPlugin
)
```

#### Classes

``` python
class MediaAnalysisPlugin(PluginInterface):
    """
    Abstract base class for plugins that analyze media files.
    
    Analysis plugins perform read-only operations that extract temporal segments
    from media files (VAD, scene detection, beat detection, etc.).
    """
    
    def execute(
            self,
            media_path: Union[str, Path],  # Path to media file to analyze
            **kwargs
        ) -> MediaAnalysisResult:  # Analysis result with detected TimeRanges
        "Analyze the media file and return detected temporal segments."
```

### Core Data Structures (`core.ipynb`)

> DTOs for media analysis and processing with FileBackedDTO support for
> zero-copy transfer

#### Import

``` python
from cjm_media_plugin_system.core import (
    TimeRange,
    MediaMetadata,
    MediaAnalysisResult
)
```

#### Classes

``` python
@dataclass
class TimeRange:
    "Represents a temporal segment within a media file."
    
    start: float  # Start time in seconds
    end: float  # End time in seconds
    label: str = 'segment'  # Segment type (e.g., 'speech', 'silence', 'scene')
    confidence: Optional[float]  # Detection confidence (0.0 to 1.0)
    payload: Dict[str, Any] = field(...)  # Extra data (e.g., speaker embedding)
    
    def to_dict(self) -> Dict[str, Any]:  # Serialized representation
        "Convert to dictionary for JSON serialization."
```

``` python
@dataclass
class MediaMetadata:
    "Container for media file metadata."
    
    path: str  # File path
    duration: float  # Duration in seconds
    format: str  # Container format (e.g., 'mp4', 'mkv')
    size_bytes: int  # File size in bytes
    video_streams: List[Dict[str, Any]] = field(...)  # Video stream info
    audio_streams: List[Dict[str, Any]] = field(...)  # Audio stream info
    
    def to_dict(self) -> Dict[str, Any]:  # Serialized representation
        "Convert to dictionary for JSON serialization."
```

``` python
@dataclass
class MediaAnalysisResult:
    "Standard output for media analysis plugins."
    
    ranges: List[TimeRange]  # Detected temporal segments
    metadata: Dict[str, Any] = field(...)  # Global analysis stats
    
    def to_temp_file(self) -> str:  # Absolute path to temporary JSON file
            """Save results to a temp JSON file for zero-copy transfer."""
            tmp = tempfile.NamedTemporaryFile(suffix=".json", delete=False, mode='w')
            
            data = {
                "ranges": [r.to_dict() for r in self.ranges],
        "Save results to a temp JSON file for zero-copy transfer."
    
    def from_file(
            cls,
            filepath: str  # Path to JSON file
        ) -> "MediaAnalysisResult":  # Loaded result instance
        "Load results from a JSON file."
```

### Media Processing Plugin Interface (`processing_interface.ipynb`)

> Domain-specific plugin interface for media processing (write / file
> manipulation)

#### Import

``` python
from cjm_media_plugin_system.processing_interface import (
    MediaProcessingPlugin
)
```

#### Classes

``` python
class MediaProcessingPlugin(PluginInterface):
    """
    Abstract base class for plugins that modify, convert, or extract media.
    
    Processing plugins perform write operations that produce new files
    (format conversion, segment extraction, source separation, speech
    enhancement, etc.). They are **dispatcher-style** (SG-44): callers invoke
    ``execute(action=..., **kwargs)`` and introspect ``supported_actions`` to
    discover which operations a given plugin implements. Only ``execute`` and
    the universal ``get_info`` read are mandated by this interface; each plugin
    declares its own write actions with the substrate's ``@plugin_action``
    decorator and sets ``supported_actions = collect_plugin_actions(cls)``.
    """
    
    def execute(
            self,
            action: str = "get_info",  # Operation: 'get_info' plus plugin-specific actions
            **kwargs
        ) -> Dict[str, Any]:  # JSON-serializable result (usually containing 'output_path')
        "Execute a media processing operation by dispatching on `action`."
    
    def get_info(
            self,
            file_path: Union[str, Path]  # Path to media file
        ) -> MediaMetadata:  # File metadata (duration, codec, streams)
        "Get metadata for a media file. Universal across media-processing plugins."
```

### Media Storage (`storage.ipynb`)

> Standardized SQLite storage for media analysis and processing results
> with content hashing

#### Import

``` python
from cjm_media_plugin_system.storage import (
    MediaAnalysisRow,
    MediaAnalysisStorage,
    MediaProcessingRow,
    MediaProcessingStorage
)
```

#### Classes

``` python
@dataclass
class MediaAnalysisRow:
    "A single row from the analysis_jobs table."
    
    file_path: str  # Path to the analyzed media file
    file_hash: str  # Hash of source file in "algo:hexdigest" format
    config_hash: str  # Hash of the analysis config used
    ranges: Optional[List[Dict[str, Any]]]  # Detected temporal segments
    metadata: Optional[Dict[str, Any]]  # Analysis metadata
    created_at: Optional[float]  # Unix timestamp
```

``` python
class MediaAnalysisStorage:
    def __init__(
        self,
        db_path: str  # Absolute path to the SQLite database file
    )
    "Standardized SQLite storage for media analysis results."
    
    def __init__(
            self,
            db_path: str  # Absolute path to the SQLite database file
        )
        "Initialize storage and create table if needed."
    
    def save(
            self,
            file_path: str,     # Path to the analyzed media file
            file_hash: str,     # Hash of source file in "algo:hexdigest" format
            config_hash: str,   # Hash of the analysis config
            ranges: Optional[List[Dict[str, Any]]] = None,  # Detected temporal segments
            metadata: Optional[Dict[str, Any]] = None        # Analysis metadata
        ) -> None
        "Save or replace an analysis result (upsert by file_path + config_hash)."
    
    def save_with_logging(
            self,
            *,
            file_path: str,     # Path to the analyzed media file
            file_hash: str,     # Hash of source file in "algo:hexdigest" format
            config_hash: str,   # Hash of the analysis config
            ranges: Optional[List[Dict[str, Any]]] = None,  # Detected temporal segments
            metadata: Optional[Dict[str, Any]] = None,       # Analysis metadata
            logger: Optional[logging.Logger] = None           # Optional logger for success/failure messages
        ) -> bool:  # True if saved; False if the save failed (error logged, not raised)
        "Save a result, logging success/failure. Failures are logged and swallowed (returns False).

Centralizes the try/save/log/except block media-analysis plugins reimplement.
Returns True on success so callers can gate post-save side effects on the result."
    
    def get_cached(
            self,
            file_path: str,   # Path to the media file
            file_hash: str,   # Content hash of the file (cache miss if the file changed)
            config_hash: str  # Config hash to match
        ) -> Optional[MediaAnalysisRow]:  # Cached row or None
        "Retrieve a content-correct cached analysis result.

Matches on file_path + file_hash + config_hash, so a changed file (new
file_hash) misses the cache even though a stale row may still exist at the
same (file_path, config_hash) — the next save() replaces it."
    
    def list_jobs(
            self,
            limit: int = 100  # Maximum number of rows to return
        ) -> List[MediaAnalysisRow]:  # List of analysis rows
        "List analysis jobs ordered by creation time (newest first)."
    
    def verify_file(
            self,
            file_path: str,   # Path to the media file
            config_hash: str  # Config hash to look up
        ) -> Optional[bool]:  # True if file matches, False if changed, None if not found
        "Verify the source media file still matches the hash stored for (file_path, config_hash)."
```

``` python
@dataclass
class MediaProcessingRow:
    "A single row from the processing_jobs table."
    
    job_id: str  # Unique job identifier
    action: str  # Operation performed: 'convert', 'extract_segment', etc.
    input_path: str  # Path to the source media file
    input_hash: str  # Hash of source file in "algo:hexdigest" format
    config_hash: str  # Hash of the action parameters / effective config used
    output_path: str  # Path to the produced output file
    output_hash: str  # Hash of output file in "algo:hexdigest" format
    parameters: Optional[Dict[str, Any]]  # Action-specific parameters
    metadata: Optional[Dict[str, Any]]  # Processing metadata
    created_at: Optional[float]  # Unix timestamp
```

``` python
class MediaProcessingStorage:
    def __init__(
        self,
        db_path: str  # Absolute path to the SQLite database file
    )
    "Standardized SQLite storage for media processing results."
    
    def __init__(
            self,
            db_path: str  # Absolute path to the SQLite database file
        )
        "Initialize storage, create table, run migrations, and build indexes."
    
    def save(
            self,
            job_id: str,        # Unique job identifier
            action: str,        # Operation performed: 'convert', 'extract_segment', etc.
            input_path: str,    # Path to the source media file
            input_hash: str,    # Hash of source file in "algo:hexdigest" format
            config_hash: str,   # Hash of the action parameters / effective config
            output_path: str,   # Path to the produced output file
            output_hash: str,   # Hash of output file in "algo:hexdigest" format
            parameters: Optional[Dict[str, Any]] = None,  # Action-specific parameters
            metadata: Optional[Dict[str, Any]] = None       # Processing metadata
        ) -> None
        "Save or replace a processing result (upsert by action + input_path + config_hash)."
    
    def save_with_logging(
            self,
            *,
            job_id: str,        # Unique job identifier
            action: str,        # Operation performed
            input_path: str,    # Path to the source media file
            input_hash: str,    # Hash of source file in "algo:hexdigest" format
            config_hash: str,   # Hash of the action parameters / effective config
            output_path: str,   # Path to the produced output file
            output_hash: str,   # Hash of output file in "algo:hexdigest" format
            parameters: Optional[Dict[str, Any]] = None,  # Action-specific parameters
            metadata: Optional[Dict[str, Any]] = None,      # Processing metadata
            logger: Optional[logging.Logger] = None          # Optional logger for success/failure messages
        ) -> bool:  # True if saved; False if the save failed (error logged, not raised)
        "Save a result, logging success/failure. Failures are logged and swallowed (returns False).

Centralizes the try/save/log/except block media-processing plugins reimplement.
Returns True on success so callers can gate post-save side effects on the result."
    
    def get_cached(
            self,
            action: str,       # Operation performed
            input_path: str,   # Path to the source media file
            input_hash: str,   # Content hash of the input (cache miss if the file changed)
            config_hash: str   # Hash of the action parameters / effective config
        ) -> Optional[MediaProcessingRow]:  # Cached row or None
        "Retrieve a content-correct cached processing result.

Matches on action + input_path + input_hash + config_hash. A changed input
file (new input_hash) misses even if a stale row exists at the same
(action, input_path, config_hash) — the next save() replaces it."
    
    def get_by_job_id(
            self,
            job_id: str  # Job identifier to look up
        ) -> Optional[MediaProcessingRow]:  # Row or None if not found
        "Retrieve a processing result by job ID."
    
    def list_jobs(
            self,
            limit: int = 100  # Maximum number of rows to return
        ) -> List[MediaProcessingRow]:  # List of processing rows
        "List processing jobs ordered by creation time (newest first)."
    
    def verify_input(
            self,
            job_id: str  # Job identifier to verify
        ) -> Optional[bool]:  # True if input matches, False if changed, None if not found
        "Verify the source media file still matches its stored hash."
    
    def verify_output(
            self,
            job_id: str  # Job identifier to verify
        ) -> Optional[bool]:  # True if output matches, False if changed, None if not found
        "Verify the output media file still matches its stored hash."
```
