Metadata-Version: 2.4
Name: cjm-tqdm-capture
Version: 0.0.4
Summary: Intercept and capture progress information from tqdm progress bars via callbacks without modifying existing code.
Home-page: https://github.com/cj-mills/cjm-tqdm-capture
Author: Christian J. Mills
Author-email: 9126128+cj-mills@users.noreply.github.com
License: Apache Software License 2.0
Keywords: nbdev jupyter notebook python
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Natural Language :: English
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: License :: OSI Approved :: Apache Software License
Requires-Python: >=3.11
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: fastcore
Requires-Dist: tqdm
Provides-Extra: dev
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: keywords
Dynamic: license
Dynamic: license-file
Dynamic: provides-extra
Dynamic: requires-dist
Dynamic: requires-python
Dynamic: summary

# cjm-tqdm-capture


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

## Install

``` bash
pip install cjm-tqdm-capture
```

## Project Structure

    nbs/
    ├── job_runner.ipynb       # Executes functions in background threads with automatic tqdm progress capture.
    ├── patch_tqdm.ipynb       # Provides the patching mechanism to intercept tqdm and emit callbacks with that progress information
    ├── progress_info.ipynb    # Defines the data structure used to represent progress state
    ├── progress_monitor.ipynb # Thread-safe monitor for tracking and aggregating progress from multiple concurrent jobs.
    └── streaming.ipynb        # Server-Sent Events (SSE) generator for real-time progress streaming to web clients.

Total: 5 notebooks

## Module Dependencies

``` mermaid
graph LR
    job_runner[job_runner<br/>job runner]
    patch_tqdm[patch_tqdm<br/>patch tqdm]
    progress_info[progress_info<br/>progress info]
    progress_monitor[progress_monitor<br/>progress monitor]
    streaming[streaming<br/>streaming]

    job_runner --> patch_tqdm
    job_runner --> progress_monitor
    job_runner --> progress_info
    patch_tqdm --> progress_info
    progress_monitor --> patch_tqdm
    progress_monitor --> progress_info
    streaming --> progress_monitor
    streaming --> job_runner
```

*8 cross-module dependencies detected*

## CLI Reference

No CLI commands found in this project.

## Module Overview

Detailed documentation for each module in the project:

### job runner (`job_runner.ipynb`)

> Executes functions in background threads with automatic tqdm progress
> capture.

#### Import

``` python
from cjm_tqdm_capture.job_runner import (
    JobRunner
)
```

#### Classes

``` python
class JobRunner:
    def __init__(
        self,
        monitor: ProgressMonitor  # Progress monitor instance to receive updates
    )
    """
    Runs a callable in a background thread, patches tqdm inside the job,
    and forwards ProgressInfo updates to a ProgressMonitor under job_id.
    """
    
    def __init__(
            self,
            monitor: ProgressMonitor  # Progress monitor instance to receive updates
        )
        "Initialize a job runner with a progress monitor"
    
    def start(
            self,
            job_id: str,  # Unique identifier for this job
            fn: Callable[..., Any],
            *args,
            patch_kwargs: Optional[Dict[str, Any]] = None,
            **kwargs
        ) -> threading.Thread:  # The thread running the job
        "Start a job in a background thread with automatic tqdm patching"
    
    def is_alive(
            self,
            job_id: str  # Unique identifier of the job to check
        ) -> bool:  # True if the job thread is still running
        "Check if a job's thread is still running"
    
    def join(
            self,
            job_id: str,  # Unique identifier of the job to wait for
            timeout: Optional[float] = None  # Maximum seconds to wait (None for indefinite)
        ) -> None:  # Returns when thread completes or timeout expires
        "Wait for a job's thread to complete"
```

### patch tqdm (`patch_tqdm.ipynb`)

> Provides the patching mechanism to intercept tqdm and emit callbacks
> with that progress information

#### Import

``` python
from cjm_tqdm_capture.patch_tqdm import (
    patch_tqdm
)
```

#### Functions

``` python
def _make_callback_class(
    BaseTqdm: type,  # Base tqdm class to extend with callback functionality
    default_cb: Optional[Callable[[ProgressInfo], None]],
    min_update_interval: float = 0.1,  # Minimum time between callback invocations (seconds)
    min_delta_pct: float = 1.0,      # emit only if pct moves by >= this
    emit_initial: bool = False       # whether to emit at 0%
)
    "Create a tqdm subclass that emits progress callbacks during iteration"
```

``` python
@contextmanager
def patch_tqdm(
    progress_callback: Optional[Callable[[ProgressInfo], None]],  # Function to call with progress updates
    min_update_interval: float = 0.1,  # Minimum time between callback invocations (seconds)
    min_delta_pct: float = 10.0,   # e.g., only every ~10%
    emit_initial: bool = False  # Whether to emit callback at 0% progress
)
    "Context manager that patches tqdm to emit progress callbacks"
```

#### Variables

``` python
_BAR_COUNTER
```

### progress info (`progress_info.ipynb`)

> Defines the data structure used to represent progress state

#### Import

``` python
from cjm_tqdm_capture.progress_info import (
    ProgressInfo,
    serialize_job_snapshot,
    serialize_all_jobs
)
```

#### Functions

``` python
def serialize_job_snapshot(
    snapshot: Optional[Dict[str, Any]]  # Job snapshot dictionary from ProgressMonitor
) -> Optional[Dict[str, Any]]:  # JSON-serializable dictionary or None if input is None
    "Convert a job snapshot with ProgressInfo objects to a JSON-serializable format."
```

``` python
def serialize_all_jobs(
    jobs: Dict[str, Dict[str, Any]]  # Dictionary mapping job IDs to job snapshots
) -> Dict[str, Optional[Dict[str, Any]]]:  # Dictionary mapping job IDs to serialized snapshots
    "Convert all jobs from monitor.all() to JSON-serializable format."
```

#### Classes

``` python
@dataclass
class ProgressInfo:
    "Structured progress information"
    
    progress: float  # Percentage completion (0-100)
    current: Optional[int]  # Current iteration count
    total: Optional[int]  # Total iterations expected
    rate: Optional[str]  # Processing rate (e.g., "50.5 it/s")
    elapsed: Optional[str]  # Time elapsed since start
    remaining: Optional[str]  # Estimated time remaining
    description: Optional[str]  # Progress bar description/label
    raw_output: str = ''  # Raw output string (if any)
    timestamp: float  # Unix timestamp when created
    bar_id: Optional[str]  # Unique identifier for this progress bar
    position: Optional[int]  # Display position for multi-bar scenarios
    
    def to_dict(self):
            """Convert to dictionary for JSON serialization"""
            return {
                'progress': self.progress,
        "Convert to dictionary for JSON serialization"
```

### progress monitor (`progress_monitor.ipynb`)

> Thread-safe monitor for tracking and aggregating progress from
> multiple concurrent jobs.

#### Import

``` python
from cjm_tqdm_capture.progress_monitor import (
    ProgressMonitor
)
```

#### Classes

``` python
class ProgressMonitor:
    def __init__(
        self,
        keep_history: bool = False,  # Whether to maintain a history of progress updates
        history_limit: int = 500  # Maximum number of historical updates to keep per job
    )
    "Thread-safe monitor for tracking progress of multiple concurrent jobs"
    
    def __init__(
            self,
            keep_history: bool = False,  # Whether to maintain a history of progress updates
            history_limit: int = 500  # Maximum number of historical updates to keep per job
        )
        "Initialize a new progress monitor with optional history tracking"
    
    def update(
            self,
            job_id: str,  # Unique identifier for the job being tracked
            info: ProgressInfo  # Progress information update for the job
        )
        "TODO: Add function description"
    
    def snapshot(
            self,
            job_id: str  # Unique identifier of the job to snapshot
        ) -> Optional[Dict[str, Any]]:  # Job state dictionary or None if job not found
        "Get a point-in-time snapshot of a specific job's progress state"
    
    def all(
            self
        ) -> Dict[str, Dict[str, Any]]:  # Dictionary mapping job IDs to their state snapshots
        "Get snapshots of all tracked jobs"
    
    def clear_completed(
            self,
            older_than_seconds: float = 3600  # Age threshold in seconds for removing completed jobs
        )
        "Remove completed jobs that finished more than the specified seconds ago"
```

### streaming (`streaming.ipynb`)

> Server-Sent Events (SSE) generator for real-time progress streaming to
> web clients.

#### Import

``` python
from cjm_tqdm_capture.streaming import (
    sse_stream
)
```

#### Functions

``` python
def sse_stream(
    monitor: ProgressMonitor,  # Progress monitor instance to read job updates from
    job_id: str,  # Unique identifier of the job to stream
    interval: float = 0.25,  # Polling interval in seconds for checking progress updates
    heartbeat: float = 15.0,  # Seconds between keep-alive messages when no updates
    wait_for_start: bool = True,  # Whether to wait for job to start before ending stream
    start_timeout: float = 5.0,  # Maximum seconds to wait for job to start if wait_for_start is True
) -> Iterator[str]:  # SSE-formatted strings ready to send to client
    """
    Framework-agnostic SSE generator.
    - Yields 'data: {json}\n\n' when progress changes.
    - Sends ': keep-alive' comments every `heartbeat` seconds when idle.
    - If `wait_for_start` is True, it will wait up to `start_timeout` for
      the first snapshot before ending (avoids race at job startup).
    """
```
