Metadata-Version: 2.4
Name: cjm-fasthtml-sse
Version: 0.0.24
Summary: Real-time Server-Sent Events (SSE) and HTMX integration library for FastHTML with cross-tab synchronization support.
Home-page: https://github.com/cj-mills/cjm-fasthtml-sse
Author: Christian J. Mills
Author-email: 9126128+cj-mills@users.noreply.github.com
License: Apache Software License 2.0
Keywords: nbdev jupyter notebook python
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Natural Language :: English
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: License :: OSI Approved :: Apache Software License
Requires-Python: >=3.11
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: python-fasthtml
Provides-Extra: dev
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: keywords
Dynamic: license
Dynamic: license-file
Dynamic: provides-extra
Dynamic: requires-dist
Dynamic: requires-python
Dynamic: summary

# cjm-fasthtml-sse


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

## Install

``` bash
pip install cjm_fasthtml_sse
```

## Project Structure

    nbs/
    ├── core.ipynb       # Core SSE broadcast management system for FastHTML applications. Provides connection pooling, message distribution, and lifecycle hooks without UI dependencies.
    ├── dispatcher.ipynb # Event routing system with namespace support, pattern matching, and middleware pipeline. Enables decoupled event handling with priority-based execution and wildcard routing.
    ├── helpers.ipynb    # Utility functions and decorators for common SSE patterns in FastHTML. Includes the @sse_element
    ├── htmx.ipynb       # HTMX-specific SSE integration helpers for FastHTML. Simplifies adding SSE attributes, creating SSE-enabled elements, and managing HTMX SSE connections.
    ├── monitoring.ipynb # Connection monitoring and debugging tools for SSE applications. Provides configurable status indicators, automatic reconnection, and visibility change handling.
    └── updater.ipynb    # Flexible element update system for building out-of-band (OOB) swap elements. Register handlers by event type and compose updates without coupling to specific UI components.

Total: 6 notebooks

## Module Dependencies

``` mermaid
graph LR
    core[core<br/>Core SSEBroadcastManager]
    dispatcher[dispatcher<br/>SSEEventDispatcher]
    helpers[helpers<br/>UI helpers & utilities]
    htmx[htmx<br/>HTMXSSEConnector]
    monitoring[monitoring<br/>Connection monitoring & config]
    updater[updater<br/>SSEElementUpdater]

    helpers --> htmx
    monitoring --> htmx
```

*2 cross-module dependencies detected*

## CLI Reference

No CLI commands found in this project.

## Module Overview

Detailed documentation for each module in the project:

### Core SSEBroadcastManager (`core.ipynb`)

> Core SSE broadcast management system for FastHTML applications.
> Provides connection pooling, message distribution, and lifecycle hooks
> without UI dependencies.

#### Import

``` python
from cjm_fasthtml_sse.core import (
    SSEBroadcastManager
)
```

#### Classes

``` python
class SSEBroadcastManager:
    def __init__(self, 
                 max_queue_size: int = 100,    # Maximum number of messages per connection queue
                 history_size: int = 50,    # Number of broadcast messages to keep in history
                 default_timeout: float = 0.1   # Default timeout in seconds for queue operations
                )
    "Manages SSE connections and broadcasting without UI dependencies."
    
    def __init__(self,
                     max_queue_size: int = 100,    # Maximum number of messages per connection queue
                     history_size: int = 50,    # Number of broadcast messages to keep in history
                     default_timeout: float = 0.1   # Default timeout in seconds for queue operations
                    )
        "Initialize the broadcast manager with connection pooling and message history."
    
    async def register_connection(
            self,
            queue: Optional[asyncio.Queue] = None  # Optional pre-existing queue, creates new one if not provided
        ) -> asyncio.Queue:  # The queue associated with this connection
        "Register a new SSE connection."
    
    async def unregister_connection(
            self,
            queue: asyncio.Queue  # The queue to unregister
        )
        "Unregister an SSE connection."
    
    async def broadcast(self, 
                           event_type: str,   # Type of event being broadcast
                           data: Dict[str, Any], # Data to broadcast
                           timeout: Optional[float] = None # Optional timeout override for this broadcast
                           ) -> int: # Number of successfully notified connections
        "Broadcast a message to all connected clients."
    
    def on_connect(
            self,
            callback: Callable  # Function to call when a new connection is registered
        )
        "Register a callback for new connections."
    
    def on_disconnect(
            self,
            callback: Callable  # Function to call when a connection is unregistered
        )
        "Register a callback for disconnections."
    
    def on_broadcast(
            self,
            callback: Callable  # Function to call before broadcasting (can modify messages)
        )
        "Register a callback for broadcasts (can modify messages)."
    
    def connection_count(
            self
        ) -> int:  # Number of active connections
        "Get the current number of active connections."
    
    def get_history(
            self,
            limit: Optional[int] = None  # Optional limit on number of messages to return
        ) -> list[Dict[str, Any]]:  # List of historical broadcast messages
        "Get broadcast history."
```

### SSEEventDispatcher (`dispatcher.ipynb`)

> Event routing system with namespace support, pattern matching, and
> middleware pipeline. Enables decoupled event handling with
> priority-based execution and wildcard routing.

#### Import

``` python
from cjm_fasthtml_sse.dispatcher import (
    SSEEvent,
    SSEEventDispatcher
)
```

#### Classes

``` python
@dataclass
class SSEEvent:
    "Represents an SSE event with metadata."
    
    type: str
    data: Dict[str, Any]
    namespace: Optional[str]
    priority: int = 0
    timestamp: Optional[str]
    
    def full_type(self):
            """Get the full event type including namespace."""
            if self.namespace
        "Get the full event type including namespace."
```

``` python
class SSEEventDispatcher:
    def __init__(self):
        """Initialize the event dispatcher with empty handler registry."""
        self._handlers: Dict[str, List[tuple[int, Callable]]] = {}
    "Decoupled event routing system with namespace support, middleware, filtering, and priority-based handling."
    
    def __init__(self):
            """Initialize the event dispatcher with empty handler registry."""
            self._handlers: Dict[str, List[tuple[int, Callable]]] = {}
        "Initialize the event dispatcher with empty handler registry."
    
    def register_namespace(
            self,
            namespace: str  # Namespace name to register for event organization
        )
        "Register a namespace for event organization."
    
    def on(
            self,
            event_pattern: str,  # Event pattern (supports wildcards: *, **)
            priority: int = 0  # Handler priority (higher runs first)
        )
        "Decorator to register an event handler with pattern matching."
    
    def add_handler(
            self,
            event_pattern: str,  # Event pattern (e.g., "job:*", "**:completed")
            handler: Callable,  # Handler function
            priority: int = 0  # Handler priority
        )
        "Add an event handler with pattern matching support."
    
    def add_middleware(
            self,
            middleware: Callable  # Function that takes (event, next) and calls next(event)
        )
        "Add middleware that processes events before handlers."
    
    def add_filter(
            self,
            filter_func: Callable[[SSEEvent], bool]  # Function that returns True to process event
        )
        "Add a filter to control which events are processed."
    
    def add_transformer(
            self,
            transformer: Callable[[SSEEvent], SSEEvent]  # Function that transforms an event
        )
        "Add a transformer to modify events before processing."
    
    async def dispatch(
            self,
            event: Union[SSEEvent, Dict[str, Any]]  # Event to dispatch (SSEEvent or dict)
        ) -> List[Any]:  # List of handler results
        "Dispatch an event through the processing pipeline."
    
    def clear_handlers(
            self,
            pattern: Optional[str] = None  # Specific pattern to clear, or None for all
        )
        "Clear handlers for a specific pattern or all handlers."
```

### UI helpers & utilities (`helpers.ipynb`)

> Utility functions and decorators for common SSE patterns in FastHTML.
> Includes the @sse_element

#### Import

``` python
from cjm_fasthtml_sse.helpers import (
    oob_swap,
    oob_element,
    sse_element,
    oob_update,
    cleanup_sse_on_unload,
    get_htmx_idx,
    insert_htmx_sse_ext
)
```

#### Functions

``` python
def oob_swap(
    "Add OOB swap attributes to an element."
```

``` python
def oob_element(
    element_id: str,  # ID of the target element
    content: Any,    # Content to swap
    swap_type: str = "innerHTML"  # Type of swap
)
    "Create a wrapper element for OOB swap."
```

``` python
def sse_element(
    htmx_sse: HTMXSSEConnector,
    endpoint: str, 
    events: Optional[Union[str, List[str]]] = None, # Event name(s) to listen for from SSE stream
    auto_close: bool = True,  # Whether to auto-close on completion
    swap_type: str = "message" # How to swap content
)
    "Decorator to add SSE capabilities to any element."
```

``` python
def oob_update(
    element_id: str,  # Target element ID
    content: Any,  # Content to swap
    swap_type: str = "innerHTML"  # Type of swap (innerHTML, outerHTML, etc.)
)
    "Create an out-of-band update element."
```

``` python
def cleanup_sse_on_unload(
) -> FT:  # FastHTML element (Script) for cleanup
    "Add script to cleanup SSE connections on page unload."
```

``` python
def get_htmx_idx(
    hdrs: List  # List of header elements to search
) -> int:  # Index of HTMX script or -1 if not found
    "Find the index of HTMX script in headers list."
```

``` python
def insert_htmx_sse_ext(
    hdrs: List  # List of header elements to modify
)
    "Add HTMX SSE extension after HTMX script"
```

### HTMXSSEConnector (`htmx.ipynb`)

> HTMX-specific SSE integration helpers for FastHTML. Simplifies adding
> SSE attributes, creating SSE-enabled elements, and managing HTMX SSE
> connections.

#### Import

``` python
from cjm_fasthtml_sse.htmx import (
    HTMXSSEConnector
)
```

#### Classes

``` python
class HTMXSSEConnector:
    "Provides helper functions for setting up HTMX SSE connections without hardcoding specific implementations."
    
    def add_sse_attrs(element,
                          endpoint: str,  # SSE endpoint URL to connect to
                          events: Optional[Union[str, List[str]]] = None,
                          swap_type: str = "message",  # How to swap content (message, innerHTML, outerHTML, etc.)
                          auto_reconnect: bool = True)
        "Add SSE connection attributes to an element."
    
    def create_sse_element(element_type=Div,
                              endpoint: str = None,  # SSE endpoint URL to connect to
                              element_id: str = None,  # Optional ID for the element
                              events: Optional[Union[str, List[str]]] = None,
                              swap_type: str = "message",  # How to swap content when messages are received
                              hidden: bool = False,  # Whether to hide the element initially
                              **kwargs)
        "Create an element with SSE connection configured."
    
    def sse_progress_element(job_id: str,
                                endpoint_template: str = "/stream_job_progress?job_id={job_id}",  # URL template for the SSE endpoint
                                element_id_template: str = "progress-span-{job_id}",  # Template for generating element ID
                                initial_content=None)
        "Create an SSE-enabled progress element."
    
    def sse_status_element(job_id: str,
                              endpoint_template: str = "/stream_job_status?job_id={job_id}",  # URL template for the SSE endpoint
                              element_id_template: str = "status-span-{job_id}",  # Template for generating element ID
                              initial_content=None)
        "Create an SSE-enabled status element."
    
    def create_sse_monitor_script(
            config: Dict[str, Any]  # Configuration dictionary for monitoring setup
        ) -> FT:  # Script element with monitoring code
        "Create a monitoring script for SSE connections."
```

### Connection monitoring & config (`monitoring.ipynb`)

> Connection monitoring and debugging tools for SSE applications.
> Provides configurable status indicators, automatic reconnection, and
> visibility change handling.

#### Import

``` python
from cjm_fasthtml_sse.monitoring import (
    SSEMonitorConfig,
    create_sse_monitor
)
```

#### Functions

``` python
def create_sse_monitor(
    htmx_sse: HTMXSSEConnector,
    config: SSEMonitorConfig  # SSEMonitorConfig instance
) -> FT:  # Script element with monitoring code
    "Create a connection monitor with the specified configuration."
```

#### Classes

``` python
@dataclass
class SSEMonitorConfig:
    "Configuration for SSE connection monitoring."
    
    sse_element_id: str = 'sse-connection'
    status_element_id: str = 'connection-status'
    auto_reconnect: bool = True
    reconnect_delay: int = 3000
    debug: bool = False
    heartbeat_timeout: int = 30000
    status_indicators: Optional[Dict[str, str]]
```

### SSEElementUpdater (`updater.ipynb`)

> Flexible element update system for building out-of-band (OOB) swap
> elements. Register handlers by event type and compose updates without
> coupling to specific UI components.

#### Import

``` python
from cjm_fasthtml_sse.updater import (
    SSEElementUpdater
)
```

#### Classes

``` python
class SSEElementUpdater:
    def __init__(self):
        """Initialize the updater with empty handler registry."""
        self._handlers: Dict[str, List[Callable]] = {}
    "Builds OOB swap elements without hardcoding UI components."
    
    def __init__(self):
            """Initialize the updater with empty handler registry."""
            self._handlers: Dict[str, List[Callable]] = {}
        "Initialize the updater with empty handler registry."
    
    def register(
            self,
            event_type: str,  # The event type to handle
            priority: int = 0  # Handler priority (higher numbers run first)
        ): # Decorator function
        "Decorator to register an update handler for a specific event type."
    
    def register_handler(
            self,
            event_type: str,  # The event type to handle
            handler: Callable,  # The handler function
            priority: int = 0  # Handler priority (higher numbers run first)
        )
        "Register an update handler programmatically."
    
    def set_default_handler(
            self,
            handler: Callable  # The default handler function
        )
        "Set a default handler for unregistered event types."
    
    def add_preprocessor(
            self,
            processor: Callable  # Function that processes (event_type, data) and returns modified data
        )
        "Add a preprocessor that runs before handlers."
    
    def add_postprocessor(
            self,
            processor: Callable  # Function that processes elements list and returns modified elements
        )
        "Add a postprocessor that runs after handlers."
    
    def create_elements(
            self,
            event_type: str,  # The type of event
            data: Dict[str, Any]  # Event data
        ) -> List[Any]:  # List of elements to be sent via SSE
        "Create elements for a given event type and data."
    
    def clear_handlers(
            self,
            event_type: Optional[str] = None  # Optional specific event type to clear
        )
        "Clear handlers for a specific event type or all handlers."
    
    def get_registered_events(
            self
        ) -> List[str]:  # List of event types with registered handlers
        "Get list of registered event types."
```
