Metadata-Version: 2.4
Name: emqx-client-monitor
Version: 0.3.1
Summary: Prometheus-compatible exporter/monitor for watching EMQX MQTT clients.
Project-URL: Documentation, https://github.com/danielskowronski/emqx-client-monitor
Project-URL: Issues, https://github.com/danielskowronski/emqx-client-monitor/issues
Project-URL: Source, https://github.com/danielskowronski/emqx-client-monitor
Author-email: Daniel Skowroński <daniel@skowron.ski>
License-Expression: BSD-3-Clause
License-File: LICENSE
Keywords: emqx,exporter,iot,monitoring,mqtt,prometheus
Classifier: Development Status :: 4 - Beta
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
Requires-Python: >=3.11
Requires-Dist: click~=8.3.1
Requires-Dist: human-readable~=2.0.0
Requires-Dist: prometheus-client~=0.22.1
Requires-Dist: pydantic~=2.12.5
Requires-Dist: pyyaml~=6.0.3
Requires-Dist: requests~=2.32.5
Requires-Dist: rich~=14.2.0
Description-Content-Type: text/markdown

# EMQX Client Monitor

Prometheus-compatible exporter/monitor for watching EMQX MQTT clients.

**This is work in progress.**

[EMQX](https://www.emqx.com) doesn't expose Prometheus endpoint for watching individual client connections and relying on [`SYS` subscribed events](https://docs.emqx.com/en/emqx/latest/observability/mqtt-system-topics.html#client-subscribed-and-unsubscribed-events) may not be reliable in some scenarios. This agent uses [EMQX's REST API](https://docs.emqx.com/en/emqx/latest/admin/api.html) to monitor connection state of configured clients and expose Prometheus-style endpoint for further ingestion.

Primary use-case is monitoring of IoT devices, which connect to network for short periods of time to publish and receive MQTT messages, especially those that do not transmit any heartbeats.

It is intended to be deployed on Kubernetes cluster and automatically ingested to Prometheus. Some manual modes are available for convenience. 


## Installation and prerequisites

### Local environment

[![PyPI:emqx-client-monitor](https://img.shields.io/pypi/v/emqx-client-monitor?style=flat-square&label=PyPI%3A%20emqx-client-monitor)](https://pypi.org/project/emqx-client-monitor/)

```bash
pipx install emqx-client-monitor
```

This tool requires EMQX v5 and API key + secret from any admin EMQX user.

### Docker image and Helm chart

Docker image is hosted at [ghcr.io/danielskowronski/emqx-client-monitor](https://github.com/danielskowronski/EMQX-Client-Monitor/pkgs/container/emqx-client-monitor) and Helm chart is in this repo at [charts/emqx-client-monitor](./charts/emqx-client-monitor/).

At minimum, following options has to be set:

```yaml
config:
  targetEmqx:
    api_key: "..."
    api_secret: "..."
  monitoredClients:
    - alias: some_alias
      client_id: "some_client_id"
```

This will target EMQX in same namespace, configure Prometheus to scrape metrics and create `EmqxClientDisconnectedTooLong` alert rule.

In future, this chart will be documented better and published as OCI artifact. Some automation to get API key/secret is also considered.

## Configuration

Prepare configuration file based on [`examples/config.yaml`](./examples/config.yaml). By default, this program uses `~/.config/emqx-client-monitor/config.yaml`, but it can be overridden with `--cfg` flag. 

#### `emqx`

```yaml
emqx:
  api_key: "01234567890abcde"
  api_secret: "exampleSecretKey1234567890ABCDEFGHIJKLMNOPQRS"
  api_url: "http://emqx:18083/api/"
  ssl: true  # or path to CA bundle
  alias: LocalEMQX
```

Required:

- `api_key` and `api_secret` are outputs from EMQX itself
- `api_url` must be URL to base EMQX API endpoint (ending with `/api/`)
- `alias` is name for EMQX broker, it should be unique among multiple instances of this program being ingested by single Prometheus DB

Optional:

- `ssl` is parameter passed to `requests` library as `verify` - it's either:
  - default `True` for validation of Root CA certs using whatever your Python trusts
  - `False` is insecure mode (for brave and lazy people)
  - string *path*, which points to CA chain; this is useful for handling private Root CA on systems where Python doesn't use system chains
- `timeout_seconds` is EMQX API connection timeout (default `5`)
- `attempts` allows multiple attempts before failing (default `3`)

### `monitored_clients`

```yaml
monitored_clients:
  - alias: QingpingCO2_Room1
    client_id: "qingping-DEADBEEF1234"
  - alias: QingpingCO2_Room2
    client_id: "qingping-DEADBEEF5678"
```

It's a list of clients to be monitored. Each entry contains `client_id` for matching MQTT client ID (it's unique on broker) and `alias` used as extra label.

### `prometheus`

```yaml
prometheus:
  # all values below are defaults
  port: 9671
  address: 0.0.0.0
  ttl_seconds: 15
  enable_processed_counters: true
  enable_qos_split: false
  enable_dropped_counters: false
  enable_reason_split: false
  enable_bytes_metrics: true
  enable_packet_metrics: true
  enable_dates: false
  enable_inflight_metrics: true
  enable_subscription_count: true
```

All values are optional:

- `port` and `address` define where exporter binds
- `ttl_seconds` define how long exporter caches data between multiple calls to exporter API
- `enable_*` are flags to enable various metrics

## Running

### Manual check (sub-command `check`)

This is a sub-command for manually checking connected clients once and printing human-readable table. Flag `--all` can be used to ignore `monitored_clients` and get all clients connected ("Alias" column becomes "Client ID").

Example output:

```
┏━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━┳━━━━━━━━━━━━┳━━━━━┳━━━━━━━┳━━━━━━━━━┳━━━━━━┳━━━━━┳━━━━━━┓
┃                        ┃    Created ┃  Keep ┃  Connected ┃ Sub ┃ MsgIn ┃      RX ┃   RX ┃  TX ┃   TX ┃
┃ Alias                  ┃ (time ago) ┃ alive ┃ (time ago) ┃ Cnt ┃ Flght ┃     Msg ┃ Drop ┃ Msg ┃ Drop ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━╇━━━━━━━━━━━━╇━━━━━╇━━━━━━━╇━━━━━━━━━╇━━━━━━╇━━━━━╇━━━━━━┩
│ HumidifierWaterRefTerm │   16 hours │   10s │   16 hours │   4 │     0 │   24279 │    0 │  16 │    0 │
│ QingpingIoTMQTT_Client │ 39 minutes │    1m │ 39 minutes │   8 │     0 │       0 │    0 │ 126 │    0 │
│ QingpingCO2_Room1      │   a second │    2m │   a second │   0 │     0 │       0 │    0 │   0 │    0 │
│ QingpingCO2_Room2      │  4 minutes │    2m │  4 minutes │   1 │     0 │       2 │    0 │   0 │    0 │
│ QingpingCO2_Room3      │    13 days │    2m │    13 days │   1 │     0 │   38455 │    0 │  69 │    0 │
│ RTL433_Room1           │    a month │    1m │   23 hours │   0 │     0 │  702456 │    0 │   0 │    0 │
│ RTL433_Room2           │    30 days │    1m │    18 days │   0 │     0 │ 5184897 │    0 │   0 │    0 │
│ ZAMEL                  │     7 days │   32s │     7 days │   0 │     0 │ 6007727 │    0 │   0 │    0 │
└────────────────────────┴────────────┴───────┴────────────┴─────┴───────┴─────────┴──────┴─────┴──────┘
```

### Prometheus exporter (sub-command `prometheus`)

#### Labels

All metrics have labels:

- `alias` for client alias from configuration 
- `broker` for EMQX instance alias from configuration
- `client_id` for MQTT Client ID

Additionally, most metrics have:

- `direction` being either `rx` for subscriptions and `tx` for published messages

#### Metrics

All available metrics with example data:

```
# HELP emqx_client_monitor_connected Is client connected
# TYPE emqx_client_monitor_connected gauge
emqx_client_monitor_connected{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678"} 1.0
# HELP emqx_client_monitor_subscriptions Number of subscriptions
# TYPE emqx_client_monitor_subscriptions gauge
emqx_client_monitor_subscriptions{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678"} 1.0
# HELP emqx_client_monitor_inflights Number of inflight messages
# TYPE emqx_client_monitor_inflights gauge
emqx_client_monitor_inflights{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678"} 0.0
# HELP emqx_client_monitor_created_at Client creation time (epoch seconds)
# TYPE emqx_client_monitor_created_at gauge
emqx_client_monitor_created_at{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678"} 1.763772688905e+09
# HELP emqx_client_monitor_connected_at Client last connected time (epoch seconds)
# TYPE emqx_client_monitor_connected_at gauge
emqx_client_monitor_connected_at{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678"} 1.763772688906e+09
# HELP emqx_client_monitor_messages_processed_total Number of received messages processed (total)
# TYPE emqx_client_monitor_messages_processed_total counter
emqx_client_monitor_messages_processed_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx"} 38657.0
emqx_client_monitor_messages_processed_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx"} 69.0
# HELP emqx_client_monitor_messages_processed_by_qos_total Number of received messages processed split by QoS
# TYPE emqx_client_monitor_messages_processed_by_qos_total counter
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx",qos="0"} 38657.0
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx",qos="1"} 0.0
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx",qos="2"} 0.0
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",qos="0"} 69.0
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",qos="1"} 0.0
emqx_client_monitor_messages_processed_by_qos_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",qos="2"} 0.0
# HELP emqx_client_monitor_messages_dropped_total Number of received messages dropped (total)
# TYPE emqx_client_monitor_messages_dropped_total counter
emqx_client_monitor_messages_dropped_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx"} 0.0
emqx_client_monitor_messages_dropped_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx"} 0.0
# HELP emqx_client_monitor_messages_dropped_by_reason_total Number of received messages dropped split by reason
# TYPE emqx_client_monitor_messages_dropped_by_reason_total counter
emqx_client_monitor_messages_dropped_by_reason_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx",reason="await_pubrel_timeout"} 0.0
emqx_client_monitor_messages_dropped_by_reason_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",reason="expired"} 0.0
emqx_client_monitor_messages_dropped_by_reason_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",reason="queue_full"} 0.0
emqx_client_monitor_messages_dropped_by_reason_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx",reason="too_large"} 0.0
# HELP emqx_client_monitor_bytes_total Number of received raw octets (bytes)
# TYPE emqx_client_monitor_bytes_total counter
emqx_client_monitor_bytes_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx"} 1.0525736e+07
emqx_client_monitor_bytes_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx"} 8009.0
# HELP emqx_client_monitor_packets_total Number of received MQTT packets
# TYPE emqx_client_monitor_packets_total counter
emqx_client_monitor_packets_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="rx"} 38761.0
emqx_client_monitor_packets_total{alias="QingpingCO2_Room2",broker="emqx",client_id="qingping-DEADBEEF5678",direction="tx"} 173.0
```

Those are controlled by feature-flags:

- `emqx_client_monitor_connected` is always exported, even if not returned by EMQX API
- `enable_subscription_count` controls:
  - `emqx_client_monitor_subscriptions`
- `enable_inflight_metrics` controls:
  - `emqx_client_monitor_inflights`
- `enable_dates` controls:
  - `emqx_client_monitor_created_at`
  - `emqx_client_monitor_connected_at`
- `enable_processed_counters` controls:
  - `emqx_client_monitor_messages_processed_total` (RX/TX)
- `enable_qos_split` controls:
  - `emqx_client_monitor_messages_processed_by_qos_total` (RX/TX) by `qos` label (`0`/`1`/`2`)
- `enable_dropped_counters` controls:
  - `emqx_client_monitor_messages_dropped_total` (RX/TX)
- `enable_reason_split` controls:
  - `emqx_client_monitor_messages_dropped_by_reason_total` (RX/TX) by `reason` label:
    - RX `await_pubrel_timeout`
    - TX `expired`
    - TX `queue_full`
    - TX `too_large`
- `enable_bytes_metrics` controls:
  - `emqx_client_monitor_bytes_total` (RX/TX)
- `enable_packet_metrics` controls:
  - `emqx_client_monitor_packets_total` (RX/TX)

---

## Important assumptions

### Client (and resulting metric) uniqueness

It is based on following attributes:

- client `alias` from configuration
- MQTT client ID (from config), which must be unique per-broker (so it's also unique per server response)
- MQTT username (from server response)
- broker `alias` from configuration (this allows multiple instances of monitor to be ingested into one Prometheus)

Following fields are ignored for this purpose:

- IP address and port number of client - those may change over time (e.g. from DHCP) and are anyway problematic with NAT; in future, they may get exposed as metric (IP converted to integer)
- EMQX node - it may be random in round-robin cluster; in future, this may get exposed as metric via some mapping coming from agent configuration (right now it's string like `"emqx@emqx-0.emqx-headless.namespace.svc.cluster.local"`)
- all connection attributes like *clean start*, because they are client configurable

## Live data from EMQX API

For now, all data is live from EMQX API. This means that once client disconnects, all gauges and counters disappear (except `emqx_client_monitor_connected`). By design, once client connects back, counters on EMQX API reset. 

In other words, **some metrics may not make much sense for clients that have TTL shorter than publish interval**. For now, it's a responsibility of some other system to aggregate resetting counters into rate gauges.

Additionally, Prometheus scraping must be more frequent than shortest TTL for clients that connect and disconnect very often. Usually it's not a problem, as default scrape interval is 30s. This means that clients are going to be marked as disconnected when they have absurdly short TTL (like 10s), they connect very rarely and immediately disconnect after publishing single message.

To solve that, this program will need to implement the following:

- EMQX API scraper running independently of Prometheus scrapes
- internal state for keeping track of counters and reporting last-known data for disconnected clients
