Metadata-Version: 2.4
Name: apache-airflow-providers-mqtt
Version: 0.2.0
Summary: Provider package apache-airflow-providers-mqtt for Apache Airflow
Author-email: gregatm <108521916+gregatm@users.noreply.github.com>
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-Expression: Apache-2.0
License-File: LICENSE
Requires-Dist: apache-airflow>=2.11.0
Requires-Dist: apache-airflow-providers-common-compat>=1.12.0
Requires-Dist: apache-airflow-providers-common-messaging>=2.0.0
Requires-Dist: paho-mqtt>=2.1.0
Requires-Dist: PySocks
Project-URL: Homepage, https://github.com/gregatm/apache-airflow-providers-mqtt
Project-URL: Issues, https://github.com/gregatm/apache-airflow-providers-mqtt/issues

# apache-airflow-providers-mqtt

Provider for using MQTT with Apache Airflow

## Installation

```shell
pip install apache-airflow-providers-mqtt
```

## Usage

```python
from __future__ import annotations

from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.sdk import Asset, AssetWatcher, dag, task

trigger = MessageQueueTrigger(scheme="mqtt", topics="topic/+")

asset = Asset("mqtt_queue_asset", watchers=[AssetWatcher(name="mqtt_watcher", trigger=trigger)])

@dag(schedule=[asset])
def mqtt_example():
    @task()
    def extract_message(triggering_asset_events=None):
        message = list(triggering_asset_events.values())[0][0].extra['payload']
        print(f"Received message: {message}")
        return message
    
    mqtt_msg = extract_message()

mqtt_example()
```

