Metadata-Version: 2.4
Name: fastermqtt
Version: 0.1.0
Summary: A FastAPI-style MQTT framework with dependency injection, topic path parameters, and hierarchical routing
Project-URL: Homepage, https://github.com/Foxerine/fastermqtt
Project-URL: Documentation, https://github.com/Foxerine/fastermqtt#readme
Project-URL: Repository, https://github.com/Foxerine/fastermqtt
Project-URL: Issues, https://github.com/Foxerine/fastermqtt/issues
Author-email: Foxerine <foxerine@users.noreply.github.com>
License-Expression: MIT
License-File: LICENSE
Keywords: async,fastapi,framework,iot,messaging,mqtt,pubsub
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: FastAPI
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Internet
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Typing :: Typed
Requires-Python: >=3.10
Requires-Dist: fastapi>=0.100.0
Requires-Dist: gmqtt>=0.6.0
Requires-Dist: orjson>=3.0.0
Requires-Dist: pydantic>=2.0.0
Provides-Extra: dev
Requires-Dist: pytest-asyncio>=0.21.0; extra == 'dev'
Requires-Dist: pytest>=7.0.0; extra == 'dev'
Requires-Dist: ruff>=0.1.0; extra == 'dev'
Description-Content-Type: text/markdown

# FasterMQTT

A FastAPI-style MQTT framework inspired by [FastStream](https://github.com/airtai/faststream) architecture.

FasterMQTT brings the elegant router pattern from FastAPI to MQTT, enabling clean subscription management with dependency injection, topic path parameters, and hierarchical routing.

## Features

- **FastAPI Integration**: Seamlessly integrates with FastAPI through lifespan management
- **Decorator-based Subscriptions**: Define handlers with `@router.subscribe("topic/{param}")`
- **Topic Path Parameters**: Automatic extraction like `client/{client_id}/control` → `client_id="abc123"`
- **Dependency Injection**: Full FastAPI `Depends()` support in MQTT handlers
- **Hierarchical Routing**: Nested routers with prefix accumulation via `include_router()`
- **Shared Subscriptions**: MQTT 5.0 `$share/{group}/{topic}` consumer groups
- **Middleware System**: Onion model middleware for message interception
- **Pydantic/SQLModel Support**: Automatic serialization/deserialization of message payloads
- **Type Safety**: Full type hints throughout the codebase

## Installation

```bash
pip install fastermqtt
```

## Quick Start

### Basic Usage

```python
from fastapi import FastAPI
from fastermqtt import MqttRouter

# Create root router with MQTT connection config
mqtt_router = MqttRouter(
    host="localhost",
    port=1883,
    username="user",
    password="password",
)

# Subscribe to a topic
@mqtt_router.subscribe("sensors/temperature")
async def handle_temperature(payload: bytes):
    temperature = float(payload.decode())
    print(f"Temperature: {temperature}")

# Integrate with FastAPI
app = FastAPI()
app.include_router(mqtt_router)
```

### Topic Path Parameters

Extract values from topic segments automatically:

```python
@mqtt_router.subscribe("client/{client_id}/control")
async def handle_control(client_id: str, payload: bytes):
    print(f"Command for client {client_id}: {payload}")
```

### Hierarchical Routing

Organize subscriptions with nested routers:

```python
# Root router (manages MQTT connection)
mqtt_router = MqttRouter(host="localhost", port=1883)

# Sub-router (no connection config, shares parent's broker)
client_router = MqttRouter(prefix="client")

@client_router.subscribe("{client_id}/status")
async def handle_status(client_id: str, payload: bytes):
    # Subscribes to: client/{client_id}/status
    pass

# Include sub-router
mqtt_router.include_router(client_router)
```

### Dependency Injection

Use FastAPI's dependency injection in MQTT handlers:

```python
from fastapi import Depends
from sqlmodel.ext.asyncio.session import AsyncSession

async def get_session() -> AsyncSession:
    async with async_session_maker() as session:
        yield session

SessionDep = Annotated[AsyncSession, Depends(get_session)]

@mqtt_router.subscribe("events/{event_type}")
async def handle_event(
    event_type: str,
    payload: bytes,
    session: SessionDep,
):
    # Save event to database
    event = Event(type=event_type, data=payload.decode())
    session.add(event)
    await session.commit()
```

### Publishing Messages

```python
# Publish from router (uses router's prefix)
client_router = MqttRouter(prefix="client/{client_id}/response")

await client_router.publish(
    payload=b"OK",
    client_id="abc123",  # Replaces {client_id}
    qos=1,
)
# Publishes to: client/abc123/response

# Publish directly via broker
from fastermqtt import MQTTBroker

await MQTTBroker.publish(
    topic="notifications/alert",
    payload=b"System alert!",
    qos=2,
    retain=True,
)
```

### Shared Subscriptions (Consumer Groups)

Distribute messages across multiple service instances:

```python
# Global default consumer group
mqtt_router = MqttRouter(
    host="localhost",
    port=1883,
    default_consumer_group="workers",  # All subscriptions use this group
)

# Per-subscription consumer group
@mqtt_router.subscribe("tasks/heavy", group="heavy-workers")
async def handle_heavy_task(payload: bytes):
    # Only one instance in "heavy-workers" group receives each message
    pass

# Force no shared subscription (override default)
@mqtt_router.subscribe("broadcast/all", group="")
async def handle_broadcast(payload: bytes):
    # All instances receive every message
    pass
```

### Pydantic Model Serialization

```python
from pydantic import BaseModel
from fastermqtt import encode_payload, decode_payload

class SensorData(BaseModel):
    sensor_id: str
    value: float
    timestamp: int

# Encode for publishing
data = SensorData(sensor_id="temp-1", value=23.5, timestamp=1234567890)
payload = encode_payload(data)  # Returns JSON bytes

# Decode in handler
@mqtt_router.subscribe("sensors/data")
async def handle_sensor_data(payload: bytes):
    data = decode_payload(payload, SensorData)
    print(f"Sensor {data.sensor_id}: {data.value}")
```

### Middleware

Add cross-cutting concerns like logging and error handling:

```python
from fastermqtt import (
    BaseMQTTMiddleware,
    MiddlewareChain,
    LoggingMiddleware,
    ErrorHandlingMiddleware,
    MQTTMessage,
)

class MetricsMiddleware(BaseMQTTMiddleware):
    async def on_receive(self, message: MQTTMessage, call_next):
        start = time.time()
        result = await call_next(message)
        duration = time.time() - start
        metrics.record("mqtt_message_duration", duration)
        return result

# Build middleware chain
chain = MiddlewareChain()
chain.add(ErrorHandlingMiddleware())
chain.add(LoggingMiddleware(log_payload=True))
chain.add(MetricsMiddleware())
```

## API Reference

### MqttRouter

The main router class that inherits from FastAPI's `APIRouter`.

```python
MqttRouter(
    host: str | None = None,          # MQTT broker address (root router only)
    port: int = 8883,                  # MQTT broker port
    username: str | None = None,      # Authentication username
    password: str | None = None,      # Authentication password
    client_id: str | None = None,     # Client ID (auto-generated if not provided)
    keepalive: int = 60,              # Heartbeat interval (seconds)
    ssl_ca_cert: str | None = None,   # SSL CA certificate path
    clean_session: bool = True,       # Whether to clean session on connect
    default_consumer_group: str | None = None,  # Default shared subscription group
    prefix: str = "",                 # Topic prefix
)
```

#### Methods

- `subscribe(topic, qos=0, group=None)` - Decorator to register a subscription handler
- `publish(payload, qos=0, retain=False, **path_params)` - Publish a message
- `include_router(router, prefix="", ...)` - Include a sub-router

### MQTTBroker

Singleton manager for the MQTT connection (pure classmethod pattern).

```python
# Lifecycle (called automatically by MqttRouter)
await MQTTBroker.start(config)
await MQTTBroker.stop()

# Publishing
await MQTTBroker.publish(topic, payload, qos=0, retain=False)

# Status
MQTTBroker.is_connected()  # bool
MQTTBroker.is_initialized()  # bool
```

### Dependency Functions

```python
from fastermqtt import (
    get_mqtt_message,   # Get MQTTMessage object
    get_mqtt_topic,     # Get topic string
    get_mqtt_payload,   # Get raw payload bytes
    get_mqtt_qos,       # Get QoS level
    get_topic_param,    # Extract topic segment by index
)

# Type aliases for convenience
from fastermqtt import (
    MqttMessageDep,  # Annotated[MQTTMessage, Depends(get_mqtt_message)]
    MqttTopicDep,    # Annotated[str, Depends(get_mqtt_topic)]
    MqttPayloadDep,  # Annotated[bytes, Depends(get_mqtt_payload)]
    MqttQosDep,      # Annotated[int, Depends(get_mqtt_qos)]
)
```

### Types

```python
from fastermqtt import (
    MQTTMessage,       # Message container (topic, payload, qos, properties)
    SubscriptionInfo,  # Subscription metadata
    MQTTConfig,        # Connection configuration
)
```

### Exceptions

```python
from fastermqtt import (
    MQTTException,           # Base exception
    MQTTConnectionError,     # Connection failures
    MQTTSubscriptionError,   # Subscription failures
    MQTTPublishError,        # Publish failures
    MQTTSerializationError,  # Serialization/deserialization errors
    MQTTTopicError,          # Topic pattern errors
    MQTTMiddlewareError,     # Middleware errors
    MQTTRouterError,         # Router configuration errors
    MQTTNotInitializedError, # Broker not initialized
)
```

## Architecture

FasterMQTT follows [FastStream](https://github.com/airtai/faststream)'s architecture:

```
MqttRouter (inherits APIRouter)
    ├── Manages MQTTBroker lifecycle via lifespan
    ├── Supports include_router() for hierarchical routing
    ├── Prefix accumulation: sub-router topics prepend parent prefix
    └── Shares broker across all routers

MQTTBroker (Singleton, pure classmethod)
    ├── Manages gmqtt Client connection
    ├── Dispatches messages to subscribers
    ├── FastAPI-style dependency injection via solve_dependencies
    └── Topic parameter extraction via regex
```

## Configuration

### SSL/TLS

```python
mqtt_router = MqttRouter(
    host="mqtt.example.com",
    port=8883,
    ssl_ca_cert="/path/to/ca.crt",
)
```

### Clean Session

```python
mqtt_router = MqttRouter(
    host="localhost",
    port=1883,
    clean_session=False,  # Persist subscriptions across reconnects
)
```

## Requirements

- Python 3.10+
- FastAPI
- gmqtt
- pydantic
- orjson (for JSON serialization)

## License

MIT License

## Acknowledgments

- [FastStream](https://github.com/airtai/faststream) - Inspiration for the router pattern architecture
- [FastAPI](https://github.com/tiangolo/fastapi) - Dependency injection and router patterns
- [gmqtt](https://github.com/wialon/gmqtt) - Underlying MQTT client
