Metadata-Version: 2.4
Name: cjm-fasthtml-sse
Version: 0.0.14
Summary: Real-time Server-Sent Events (SSE) and HTMX integration library for FastHTML with cross-tab synchronization support.
Home-page: https://github.com/cj-mills/cjm-fasthtml-sse
Author: Christian J. Mills
Author-email: 9126128+cj-mills@users.noreply.github.com
License: Apache Software License 2.0
Keywords: nbdev jupyter notebook python
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Natural Language :: English
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: License :: OSI Approved :: Apache Software License
Requires-Python: >=3.11
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: python-fasthtml
Provides-Extra: dev
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: keywords
Dynamic: license
Dynamic: license-file
Dynamic: provides-extra
Dynamic: requires-dist
Dynamic: requires-python
Dynamic: summary

# cjm-fasthtml-sse


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

## Install

``` bash
pip install cjm_fasthtml_sse
```

## Project Structure

    nbs/
    └── core/ (6)
        ├── broadcast.ipynb     # Broadcasting infrastructure for SSE cross-tab synchronization
        ├── connections.ipynb   # Connection management for SSE clients
        ├── multi_stream.ipynb  # Manage multiple related SSE streams for a single entity
        ├── response.ipynb      # SSE response builders for complex UI updates
        ├── routes.ipynb        # SSE route helpers and decorators for FastHTML
        └── streaming.ipynb     # SSE streaming utilities and helpers

Total: 6 notebooks across 4 directories

## Module Dependencies

``` mermaid
graph LR
    core_broadcast[core.broadcast<br/>Broadcast]
    core_connections[core.connections<br/>Connections]
    core_multi_stream[core.multi_stream<br/>Multi Stream]
    core_response[core.response<br/>Response]
    core_routes[core.routes<br/>Routes]
    core_streaming[core.streaming<br/>Streaming]

    core_multi_stream --> core_connections
    core_multi_stream --> core_streaming
    core_response --> core_streaming
    core_routes --> core_connections
    core_routes --> core_streaming
```

*5 cross-module dependencies detected*

## CLI Reference

No CLI commands found in this project.

## Module Overview

Detailed documentation for each module in the project:

### Broadcast (`broadcast.ipynb`)

> Broadcasting infrastructure for SSE cross-tab synchronization

#### Import

``` python
from cjm_fasthtml_sse.core.broadcast import (
    BroadcastMessage,
    BroadcastManager,
    create_broadcast_handler,
    setup_broadcast_routes
)
```

#### Functions

``` python
def create_broadcast_handler(manager: BroadcastManager,
                            element_builder: Optional[Callable] = None)
    "Create a broadcast handler function that can be used with FastHTML routes."
```

``` python
def setup_broadcast_routes(app, 
                          manager: BroadcastManager,  # The broadcast manager instance
                          prefix: str = "/sse",  # URL prefix for SSE endpoints
                          element_builder: Optional[Callable] = None)
    "Setup broadcast routes on a FastHTML app."
```

#### Classes

``` python
@dataclass
class BroadcastMessage:
    "Standard broadcast message format for SSE communication"
    
    type: str
    data: Dict[str, Any]
    timestamp: str = field(...)
    metadata: Optional[Dict[str, Any]]
    
    def to_dict(
            self
        ) -> Dict[str, Any]:  # Dictionary representation of the message
        "Convert message to dictionary format"
    
    def to_json(
            self
        ) -> str:  # JSON string representation of the message
        "Convert message to JSON string"
    
    def to_sse(
            self,
            event_type: Optional[str] = 'message'  # SSE event type for the message
        ) -> str:  # SSE formatted message string
        "Convert to SSE message format using FastHTML's sse_message"
```

``` python
class BroadcastManager:
    def __init__(self,
    "Manages SSE broadcast connections across multiple tabs/clients"
    
    def __init__(self,
        "Initialize the broadcast manager."
    
    async def register(self, 
                          connection_id: Optional[str] = None,  # Optional ID for the connection (auto-generated if not provided)
                          metadata: Optional[Dict[str, Any]] = None # Optional metadata for the connection
                          ) -> tuple[str, asyncio.Queue]: # Tuple of (connection_id, queue)
        "Register a new connection and return its queue."
    
    async def unregister(
            self,
            connection_id: str  # ID of the connection to unregister
        )
        "Unregister a connection."
    
    async def broadcast(self, 
                           message_type: str,  # Type of the message
                           data: Dict[str, Any], # Message data
                           metadata: Optional[Dict[str, Any]] = None, # Optional metadata
                           exclude: Optional[Set[str]] = None # Set of connection IDs to exclude from broadcast
                           ) -> int: # Number of successful broadcasts
        "Broadcast a message to all connected clients."
    
    async def send_to(self,
                         connection_id: str,  # Target connection ID
                         message_type: str,  # Type of the message
                         data: Dict[str, Any], # Message data
                         metadata: Optional[Dict[str, Any]] = None # Optional metadata
                         ) -> bool: # True if successful, False otherwise
        "Send a message to a specific connection."
    
    def get_connection_count(
            self
        ) -> int:  # Number of active connections
        "Get the number of active connections."
    
    def get_history(
            self,
            limit: Optional[int] = None  # Maximum number of messages to return
        ) -> list[BroadcastMessage]:  # List of broadcast messages from history
        "Get broadcast history."
```

### Connections (`connections.ipynb`)

> Connection management for SSE clients

#### Import

``` python
from cjm_fasthtml_sse.core.connections import (
    ConnectionState,
    SSEConnection,
    ConnectionRegistry,
    create_sse_element,
    cleanup_sse_on_unload,
    create_reconnection_script,
    create_connection_manager_script
)
```

#### Functions

``` python
def create_sse_element(endpoint: str,
                      element_id: Optional[str] = None,  # Optional element ID
                      swap_strategy: str = "message",  # HTMX swap strategy (message, innerHTML, outerHTML, etc.)
                      hidden: bool = False,  # Whether to hide the element
                      **attrs # Additional attributes for the element
                      ) -> Div:  # SSE-enabled Div element configured for HTMX
    "Create an SSE-enabled HTML element."
```

``` python
def cleanup_sse_on_unload(
) -> Script:  # Script element for cleanup
    "Create a script to clean up SSE connections on page unload."
```

``` python
def create_reconnection_script(check_interval: int = 5000,
                              max_retries: int = 5,  # Maximum number of reconnection attempts
                              debug: bool = False  # Enable debug logging
                              ) -> Script:  # Script element with reconnection logic
    "Create a script for automatic SSE reconnection."
```

``` python
def create_connection_manager_script(registry_endpoint: str = "/sse/connections",
                                    update_interval: int = 10000  # Interval for updating connection stats (in milliseconds)
                                    ) -> Script:  # Script element with connection management logic
    "Create a script to manage and monitor connections."
```

#### Classes

``` python
class ConnectionState(Enum):
    "States for SSE connections"
```

``` python
@dataclass
class SSEConnection:
    "Represents a single SSE connection"
    
    connection_id: str
    queue: asyncio.Queue
    connection_type: str = 'global'
    state: ConnectionState = ConnectionState.CONNECTING
    metadata: Dict[str, Any] = field(...)
    created_at: datetime = field(...)
    last_activity: datetime = field(...)
    message_count: int = 0
    
    async def send(
            self,
            data: Any,  # Data to send
            timeout: float = 1.0  # Timeout for the send operation
        ) -> bool:  # True if successful, False otherwise
        "Send data through the connection queue."
    
    async def heartbeat(
            self
        ) -> str:  # SSE formatted heartbeat message
        "Generate a heartbeat message."
    
    def close(self):
            """Mark the connection as closed."""
            self.state = ConnectionState.DISCONNECTED
        
        def is_active(
            self
        ) -> bool:  # True if connection is active, False otherwise
        "Mark the connection as closed."
    
    def is_active(
            self
        ) -> bool:  # True if connection is active, False otherwise
        "Check if connection is active."
```

``` python
class ConnectionRegistry:
    def __init__(
        self,
        debug: bool = False  # Enable debug logging
    )
    "Registry to track and manage SSE connections"
    
    def __init__(
            self,
            debug: bool = False  # Enable debug logging
        )
        "Initialize the connection registry."
    
    async def add_connection(self,
                                conn_id: Optional[str] = None,  # Optional connection ID (auto-generated if not provided)
                                conn_type: str = "global",  # Type of connection (e.g., 'global', 'job', 'user')
                                queue_size: int = 100,  # Size of the message queue
                                metadata: Optional[Dict[str, Any]] = None # Optional metadata for the connection
                                ) -> SSEConnection: # The created SSEConnection
        "Add a new connection to the registry."
    
    async def remove_connection(
            self,
            conn_id: str  # Connection ID to remove
        )
        "Remove a connection from the registry."
    
    def get_connection(
            self,
            conn_id: str  # Connection ID
        ) -> Optional[SSEConnection]:  # The connection if found, None otherwise
        "Get a specific connection."
    
    def get_connections(
            self,
            conn_type: Optional[str] = None  # Optional connection type to filter by
        ) -> list[SSEConnection]:  # List of connections
        "Get connections, optionally filtered by type."
    
    def get_active_connections(
            self,
            conn_type: Optional[str] = None  # Optional connection type to filter by
        ) -> list[SSEConnection]:  # List of active connections
        "Get active connections."
    
    def get_stats(
            self
        ) -> Dict[str, Any]:  # Dictionary with connection statistics
        "Get registry statistics."
```

### Multi Stream (`multi_stream.ipynb`)

> Manage multiple related SSE streams for a single entity

#### Import

``` python
from cjm_fasthtml_sse.core.multi_stream import (
    StreamEndpoint,
    MultiStreamManager,
    create_multi_stream_row
)
```

#### Functions

``` python
def create_multi_stream_row(
    manager: MultiStreamManager,  # Multi-stream manager
    entity_id: str,  # Entity ID
    streams: List[Dict[str, Any]],  # List of stream configurations
    row_builder: Callable,  # Function to build the row
    active: bool = True  # Whether streams should be active
) -> FT:  # Table row or similar element
    "Create a row element with multiple SSE streams."
```

#### Classes

``` python
@dataclass
class StreamEndpoint:
    "Configuration for a single SSE stream endpoint"
    
    name: str  # Stream name (e.g., 'progress', 'status')
    path_suffix: str  # URL path suffix (e.g., '_progress', '_status')
    data_source: Callable  # Async generator or callable that produces data
    transform_fn: Optional[Callable]  # Optional transform function
    config: Optional[StreamConfig]  # Stream configuration
    metadata: Dict[str, Any] = field(...)  # Additional metadata
```

``` python
class MultiStreamManager:
    def __init__(
        self,
        base_path: str = "/stream",  # Base path for stream endpoints
        connection_registry: Optional[ConnectionRegistry] = None,  # Optional shared registry
        default_config: Optional[StreamConfig] = None,  # Default config for all streams
        debug: bool = False  # Enable debug logging
    )
    "Manages multiple related SSE streams for entities"
    
    def __init__(
            self,
            base_path: str = "/stream",  # Base path for stream endpoints
            connection_registry: Optional[ConnectionRegistry] = None,  # Optional shared registry
            default_config: Optional[StreamConfig] = None,  # Default config for all streams
            debug: bool = False  # Enable debug logging
        )
        "Initialize the multi-stream manager."
    
    def register_endpoint(
            self,
            endpoint: StreamEndpoint  # Stream endpoint configuration
        ) -> 'MultiStreamManager':  # Self for chaining
        "Register a stream endpoint."
    
    def create_element(
            self,
            entity_id: str,  # Entity ID (e.g., job_id, user_id)
            stream_name: str,  # Stream name to connect to
            element_id: Optional[str] = None,  # Optional element ID
            wrapper: Optional[Callable] = None,  # Optional wrapper function for the element
            **attrs  # Additional attributes
        ) -> FT:  # HTMX-enabled SSE element
        "Create an SSE-enabled element for a specific stream."
    
    def create_handler(
            self,
            stream_name: str,  # Stream name
            get_entity_fn: Callable,  # Function to get entity state
            is_active_fn: Callable  # Function to check if entity is active
        ) -> Callable:  # Route handler function
        "Create a route handler for a specific stream."
    
    def setup_routes(
            self,
            app,  # FastHTML app
            get_entity_fn: Callable,  # Function to get entity state
            is_active_fn: Callable  # Function to check if entity is active
        )
        "Setup all stream routes on the app."
```

### Response (`response.ipynb`)

> SSE response builders for complex UI updates

#### Import

``` python
from cjm_fasthtml_sse.core.response import (
    UpdateRule,
    SSEResponseBuilder,
    create_conditional_response,
    create_state_response_builder
)
```

#### Functions

``` python
def create_conditional_response(
    conditions: List[Tuple[Callable, Callable]],  # List of (condition, builder) tuples
    always_include: Optional[List[Callable]] = None,  # Builders that always run
    context: Optional[Dict[str, Any]] = None  # Initial context
) -> SSEResponseBuilder:  # Configured response builder
    "Create a response builder with predefined conditions."
```

``` python
def create_state_response_builder(
    state_builders: Dict[str, Callable],  # Mapping of state names to builders
    get_state_fn: Callable,  # Function to determine current state
    default_builder: Optional[Callable] = None  # Default builder if no state matches
) -> SSEResponseBuilder:  # Configured response builder
    "Create a response builder for state-based updates."
```

#### Classes

``` python
@dataclass
class UpdateRule:
    "Rule for conditional element updates"
    
    condition: Callable  # Function that returns True if rule should apply
    builder: Callable  # Function that builds the element(s)
    target_id: Optional[str]  # Target element ID for OOB swap
    swap_mode: str = 'innerHTML'  # Swap mode
    priority: int = 0  # Higher priority rules are evaluated first
```

``` python
class SSEResponseBuilder:
    def __init__(
        self,
        debug: bool = False  # Enable debug logging
    )
    "Builder for complex SSE responses with conditional updates"
    
    def __init__(
            self,
            debug: bool = False  # Enable debug logging
        )
        "Initialize the response builder."
    
    def add_rule(
            self,
            condition: Callable,  # Condition function
            builder: Callable,  # Element builder function
            target_id: Optional[str] = None,  # Target ID for OOB
            swap_mode: str = "innerHTML",  # Swap mode
            priority: int = 0  # Rule priority
        ) -> 'SSEResponseBuilder':  # Self for chaining
        "Add a conditional update rule."
    
    def add_always(
            self,
            builder: Callable  # Element builder that always runs
        ) -> 'SSEResponseBuilder':  # Self for chaining
        "Add a builder that always runs."
    
    def set_context(
            self,
            **kwargs  # Context variables
        ) -> 'SSEResponseBuilder':  # Self for chaining
        "Set context variables for builders."
    
    def build(
            self,
            **kwargs  # Additional context for this build
        ) -> FT:  # Built response
        "Build the response based on rules and context."
    
    def clear_rules(
            self
        ) -> 'SSEResponseBuilder':  # Self for chaining
        "Clear all rules."
    
    def clear_always(
            self
        ) -> 'SSEResponseBuilder':  # Self for chaining
        "Clear always-include builders."
```

### Routes (`routes.ipynb`)

> SSE route helpers and decorators for FastHTML

#### Import

``` python
from cjm_fasthtml_sse.core.routes import (
    SSERouteConfig,
    create_sse_route,
    async_error_generator,
    sse_route,
    setup_sse_routes,
    create_conditional_sse_route
)
```

#### Functions

``` python
def create_sse_route(
    data_generator: Callable,  # Async generator function
    transform_fn: Optional[Callable] = None,  # Optional transform function
    config: Optional[SSERouteConfig] = None  # Route configuration
) -> Callable:  # Route handler
    "Create an SSE route handler with automatic connection management."
```

``` python
async def async_error_generator(message: str)
    "Generate an error message for SSE."
```

``` python
def sse_route(
    path: Optional[str] = None,  # Route path
    transform_fn: Optional[Callable] = None,  # Transform function
    config: Optional[SSERouteConfig] = None  # Route configuration
) -> Callable:  # Decorator
    "Decorator to create SSE routes."
```

``` python
def setup_sse_routes(
    app,  # FastHTML app
    *handlers,  # SSE route handlers created with @sse_route
    prefix: str = "",  # URL prefix
    registry: Optional[ConnectionRegistry] = None  # Shared registry
)
    "Setup multiple SSE routes on an app."
```

``` python
def create_conditional_sse_route(
    active_generator: Callable,  # Generator for active entities
    inactive_content: Callable,  # Function to generate content for inactive entities
    is_active_fn: Callable,  # Function to check if entity is active
    transform_fn: Optional[Callable] = None,  # Transform function
    config: Optional[SSERouteConfig] = None  # Route configuration
) -> Callable:  # Route handler
    "Create an SSE route that handles both active and inactive states."
```

#### Classes

``` python
@dataclass
class SSERouteConfig:
    "Configuration for SSE route"
    
    connection_type: str = 'sse'  # Connection type for registry
    stream_config: Optional[StreamConfig]  # Stream configuration
    registry: Optional[ConnectionRegistry]  # Connection registry
    validate_fn: Optional[Callable]  # Validation function
    metadata_fn: Optional[Callable]  # Function to generate metadata
    error_handler: Optional[Callable]  # Error handler function
    debug: bool = False  # Enable debug logging
```

### Streaming (`streaming.ipynb`)

> SSE streaming utilities and helpers

#### Import

``` python
from cjm_fasthtml_sse.core.streaming import (
    StreamConfig,
    SSEStream,
    OOBStreamBuilder
)
```

#### Classes

``` python
@dataclass
class StreamConfig:
    "Configuration for SSE streaming"
    
    heartbeat_interval: float = 30.0
    timeout: Optional[float]
    send_initial_message: bool = True
    initial_message: str = 'Connected'
    send_close_message: bool = True
    close_message: str = 'Connection closed'
    debug: bool = False
```

``` python
class SSEStream:
    def __init__(
        self,
        config: Optional[StreamConfig] = None  # Stream configuration
    )
    "Generic SSE stream handler"
    
    def __init__(
            self,
            config: Optional[StreamConfig] = None  # Stream configuration
        )
        "Initialize the SSE stream."
    
    async def stream(self,
                        data_source: Union[AsyncGenerator, Callable], # Async generator or callable that produces data
                        transform_fn: Optional[Callable] = None # Optional function to transform data before sending
                        ) -> AsyncGenerator[str, None]: # SSE formatted strings
        "Stream data from a source through SSE."
    
    def stop(self)
        "Stop the stream."
```

``` python
class OOBStreamBuilder:
    def __init__(self):
        """Initialize the OOB stream builder."""
        self.elements: List[Any] = []
    "Build SSE messages with OOB (Out-of-Band) swaps"
    
    def __init__(self):
            """Initialize the OOB stream builder."""
            self.elements: List[Any] = []
        "Initialize the OOB stream builder."
    
    def add_element(self,
                       element: Any,  # The element to add
                       target_id: Optional[str] = None,  # Target element ID for OOB swap
                       swap_mode: str = "innerHTML",  # Swap mode (innerHTML, outerHTML, beforeend, afterbegin, etc.)
                       wrap: bool = True  # If True and target_id is provided, wrap content in a Div with OOB attributes. If False, add OOB attributes directly to the element
                       ) -> 'OOBStreamBuilder':  # Self for chaining
        "Add an element with OOB swap configuration."
    
    def add_elements(
            self,
            elements: List[tuple]  # List of tuples: (element, target_id, swap_mode, wrap) or (element, target_id, swap_mode) or (element, target_id) or (element,)
        ) -> 'OOBStreamBuilder':  # Self for chaining
        "Add multiple elements with OOB configurations."
    
    def build(
            self
        ) -> FT:  # Div with all elements
        "Build the Div element with all elements."
    
    def clear(
            self
        ) -> 'OOBStreamBuilder':  # Self for chaining
        "Clear all elements."
```
