otupy.transfers.mqtt.mqtt_transfer.MQTTTransfer

class MQTTTransfer(broker, port=1883, role=1, profiles=None, device_id=None, username=None, password=None, client_id=None, response_timeout=10, usessl=False)

Bases: Transfer

MQTT Transfer Protocol

This class provides an implementation of the Specification. It builds on the paho library.

Use MQTTTransfer to build OpenC2 communication stacks in Producer and Consumer.

Methods

on_connect

on_message

Service callback on receiving data from MQTT subscribed topics

on_publish

on_subscribe

receive

Listen for incoming messages

send

Sends OpenC2 message

class MQTTUserData(event: Event, expected_msgs: int = None)

Bases: object

Define the data structure to exchange data with MQTT thread

It will include - a list of (received) messages - the number of expected messages - an event to signal all expected messages were received

__init__(event: Event, expected_msgs: int = None)

Initialize MQTTUserData

__init__(broker, port=1883, role=1, profiles=None, device_id=None, username=None, password=None, client_id=None, response_timeout=10, usessl=False)

Builds the MQTTTransfer instance

The broker and port parameters are used to select theh MQTT Broker. This implementation only supports TCP as transport protocol.

The topics to be subscribed or used for publishing will be automatically derived. If no device_id is provided, the device-specific topic will not be subscribed. The Specification does not cover the generation of device identifiers (Sec. 2.2); additionally, it is not clear why the Consumer’s topic is denoted as device_id while the Producer’s topic as producer_id. This implementation uses the same identifiers already present in the Message metadata (namely, From and To fields) (Sec. 2.4.2). Note that both Consumers and Producers are expected to always include their identifiers in the From field (Sec. 2.4.2), so this argument is mandatory for the MQTT transfer.

According to Sec. 3.1.8: - Consumer SHALL subscribe to topics for all actuator devices, all-commands topic, individual topic for device - Producer SHALL subscribe to the general response topic - Producer SHOULD subscribe to their individual topic Therefore, 1) the profiles and device_id arguments are mandatory for Consumers; 2) the device_id could be optional for Producers, but it is mandatory in this implementation because of the above requirement on the From field. This implementation will always subscribe all topics for Consumers/Producers.

For sending data, the send() method will wait at most response_timeout secodns before returning. In case the Commands are sent to specific devices, it will return as soon as all of them have answered. If the Command is broadcast to all APs or to all devices, it will return after the above timeout. The same will happen if the message is sent to specific devices, but they do not include their identifier in the Message.from field, as required by the standard.

For receiving data, the receive() method will keep MQTT message for at least response_timeout seconds for detecting and discarding duplicates. Duplicates received after response_timeout seconds are not guaranteed to be correctly detected as such.

Parameters:
  • broker – Hostname or IP address of the MQTT Broker

  • port – Transport port of the MQTT Broker

  • device_id – OpenC2 device identifier to be used in topic name for the specific actuator (mandatory)

  • profiles – list of profile names for actuators running on the Consumer (will be ignored by the Producer)

  • username – Username to connect to MQTT broker

  • password – Password to connect to MQTT broker

  • client_id – Client identifier to connect to MQTT broker

  • response_timeout – Timeout to wait for responses to commands from multiple devices (for Producers). Timeout to detect duplicates of commands from multiple topics (for Consumers).

  • usessl – Enable (True) or disable (False) SSL. Internal use only. Do not set this argument, use the MQTTSTransfer instead.

on_message(client, userdata, msg)

Service callback on receiving data from MQTT subscribed topics

receive(callback, encoder)

Listen for incoming messages

This method implements the Transfer interface to listen for and receive OpenC2 messages. Note that only Consumers can use this method (it will fail if a Producer invokes it).

The internal implementation uses Paho MQTT client. The method invokes the callback for each received message, which must be provided by a Consumer to properly dispatch Commands to the relevant server(s). It also takes an Encoder that is used to create responses to Commands encoded with unknown encoders.

This implemnetation sends the response to the device-specific topic, if available from the command (it SHOULD be available according to the Specification requirements, but we want to be be safe and robust about buggy implementations).

Parameters:
  • callback – The function that is invoked to process OpenC2 messages.

  • encoder – Default Encoder instance to respond to unknown or wrong messages.

Returns:

None

send(msg, encoder)

Sends OpenC2 message

This method implements the required Transfer interface to send message to an OpenC2 server. Note that this method can be used by Producers only (it will fail if a Consumer invokes it)

The MQTT specification does not dictate to which topic the messages are sent. This implementation follows a “polite” approach, which avoids flooding Consumers by publishing the messages over an unnecessary number of topics. The approach is to follow the filters the Producer may have set on the message, by implementing the following logic: 1. If msg.to is present, send the message to all devices in this field 2. If msg.to is not present, send the message to the profile in the Command 3. If no profile is given, broadcast the message

Parameters:
  • msg – The message to send (otupy Message).

  • encoder – The encoder to use for encoding the msg.

Returns:

An list of OpenC2 responses (Response).