filip.clients.mqtt package

MQTT client for streaming data via FIWARE’s IoT-Agent

Subpackages

Submodules

filip.clients.mqtt.client module

Implementation of an extended MQTT client that automatically handles the topic subscription for FIWARE’s IoT communication pattern.

class filip.clients.mqtt.client.IoTAMQTTClient(client_id='', clean_session=None, userdata=None, protocol=MQTTProtocolVersion.MQTTv311, transport='tcp', callback_api_version=CallbackAPIVersion.VERSION2, devices: List[Device] | None = None, service_groups: List[ServiceGroup] | None = None, custom_encoder: Dict[str, BaseEncoder] | None = None)[source]

Bases: Client

This class is an extension to the MQTT client from the well established Eclipse Paho™ MQTT Python Client. The official documentation is located here: https://github.com/eclipse/paho.mqtt.python

The class adds additional functions to facilitate the communication to FIWARE’s IoT-Agent via MQTT. It magically generates and subscribes to all important topics that are necessary to establish a bi-directional communication with the IoT-Agent.

Note

The client does not sync the device configuration with the IoT-Agent. This is up to the user!

Note

The extension does not effect the normal workflow or any other functionality known from the original client.

The client does not yet support the retrieval of command configurations via mqtt documented here: https://fiware-iotagent-json.readthedocs.io/en/latest/usermanual/index.html#api-overview

Example

This example shows the basic usage of the client. It does not demonstrate its whole capabilities. Please check the single methods for more details. Please also keep in mind that this still requires provisioning of the device in the IoT-Agent and sending the commands via the context broker. For more details check the additional example section:

from filip.models.ngsi_v2.iot import Device, DeviceAttribute, DeviceCommand, ServiceGroup
from filip.clients.mqtt import MQTTClient
from filip.clients.mqtt.encoder import IoTA_Json

# create a device configuration
device_attr = DeviceAttribute(name='temperature',
                              object_id='t',
                              type="Number")
device_command = DeviceCommand(name='heater', type="Boolean")
device = Device(device_id='MyDevice',
                entity_name='MyDevice',
                entity_type='Thing',
                protocol='IoTA-JSON',
                transport='MQTT',
                apikey=YourApiKey,
                attributes=[device_attr],
                commands=[device_command])

service_group = ServiceGroup(apikey="YourApiKey", resource="/iot")

mqttc = MQTTClient(client_id="YourID",
                   userdata=None,
                   protocol=mqtt.MQTTv5,
                   transport="tcp",
                   _devices = [device],
                   service_groups = [service_group])

# create a callback function that will be called for incoming
# commands and add it for a single device
def on_command(client, obj, msg):
    apikey, device_id, payload =                     client.get_encoder().decode_message(msg=msg)

    # do_something with the message.
    # For instance write into a queue.

    # acknowledge a command
    client.publish(device_id=device_id,
                   command_name=next(iter(payload))
                   payload=payload)

mqttc.add_command_callback(on_command)

# create a non blocking loop
mqttc.loop_start()

# publish a multi-measurement for a device
mqttc.publish(device_id='MyDevice', payload={'t': 50})

# publish a single measurement for a device
mqttc.publish(device_id='MyDevice',
              attribute_name='temperature',
              payload=50)

# adding timestamps to measurements using the client


# adding timestamps to measurements in payload
from datetime import datetime

mqttc.publish(device_id='MyDevice',
              payload={'t': 50,
                       'timeInstant': datetime.now().astimezone().isoformat()},
              timestamp=true)

# stop network loop and disconnect cleanly
mqttc.loop_stop()
mqttc.disconnect()
add_command_callback(device_id: str, callback: Callable)[source]

Adds callback function for a device configuration.

Parameters:
  • device_id – id of and IoT device

  • callback – function that will be called for incoming commands. This function should have the following format:

Example:

def on_command(client, obj, msg):
    apikey, device_id, payload =                     client.encoder.decode_message(msg=msg)

    # do_something with the message.
    # For instance write into a queue.

    # acknowledge a command. Here command are usually single
    # messages. The first key is equal to the commands name.
    client.publish(device_id=device_id,
                   command_name=next(iter(payload)),
                   payload=payload)

mqttc.add_command_callback(device_id="MyDevice",
                           callback=on_command)
Returns:

None

add_device(device: Device | Dict, qos=0, options=None, properties=None)[source]

Registers a device config with the mqtt client. Subsequently, the client will magically subscribe to the corresponding topics based on the device config and any matching registered service group config if exists.

Note

To register the device config only with this client is not sufficient for data streaming the configuration also needs to be registered with IoTA-Agent.

Parameters:
  • device – Configuration of an IoT device

  • qos – Quality of service can be 0, 1 or 2

  • options – MQTT v5.0 subscribe options

  • properties – MQTT v5.0 properties

Returns:

None

Raises:

ValueError – if device configuration already exists

add_encoder(encoder: Dict[str, BaseEncoder])[source]
add_service_group(service_group: ServiceGroup | Dict)[source]

Registers a device service group with the client

Parameters:

service_group – Service group configuration

Returns:

None

Raises:

ValueError – if service group already exists

delete_device(device_id: str)[source]

Unregisters a device and removes its subscriptions and callbacks

Parameters:

device_id – id of and IoT device

Returns:

None

delete_service_group(apikey)[source]

Unregisters a service group and removes

Parameters:

apikey – Unique APIKey of the service group

Returns:

None

property devices

Returns as list of all registered device configurations Returns:

get_device(device_id: str) Device[source]

Returns the configuration of a registered device.

Parameters:

device_id – Id of the requested device

Returns:

Device model of the requested device

Return type:

Device

Raises:

KeyError – if requested device is not registered with the client

Example:

from filip.clients.mqtt import MQTTClient

mqttc = MQTTClient()
device = mqttc.get_device(device_id="MyDeviceId")
print(device.json(indent=2))
print(type(device))
get_encoder(encoder: str | PayloadProtocol)[source]

Returns the encoder by key

Parameters:

encoder – encoder name

Returns:

Subclass of Baseencoder

get_service_group(apikey: str) ServiceGroup[source]

Returns registered service group configuration

Parameters:

apikey – Unique APIKey of the service group

Returns:

ServiceGroup

Raises:

KeyError – if service group not yet registered

Example:

from filip.clients.mqtt import MQTTClient

mqttc = MQTTClient()
group = mqttc.get_service_group(apikey="MyAPIKEY")
print(group.json(indent=2))
print(type(group))
publish(topic=None, payload: Dict | Any | None = None, qos: int = 0, retain: bool = False, properties=None, device_id: str | None = None, attribute_name: str | None = None, command_name: str | None = None, timestamp: bool = False)[source]

Publish an MQTT Message to a specified topic. If you want to publish a device specific message to a device use the device_id argument for multi-measurement. The function will then automatically validate against the registered device configuration if the payload keys are valid. If you want to publish a single measurement the attribute_name argument is required as well.

Note

If the device_id argument is set, the topic argument will be ignored.

Parameters:
  • topic – The topic that the message should be published on.

  • payload – The actual message to send. If not given, or set to None a zero length message will be used. Passing an int or float will result in the payload being converted to a string representing that number. If you wish to send a true int/float, use struct.pack() to create the payload you require. For publishing to a device use a dict containing the object_ids as keys.

  • qos – The quality of service level to use.

  • retain – If set to true, the message will be set as the “last known good”/retained message for the topic.

  • properties – (MQTT v5.0 only) the MQTT v5.0 properties to be included. Use the Properties class.

  • device_id – Id of the IoT device you want to publish for. The topics will automatically created. If set, the message type will be assumed to be multi measurement.

  • attribute_name – Name of an attribute of the device. Do only use this for single measurements. If set, command_name must be omitted.

  • command_name – Name of a command of the device that should be acknowledged. If set attribute_name must be omitted.

  • timestamp – If true the client will generate a valid timestamp based on utc and added to the multi measurement payload. If a timeInstant is already contained in the message payload it will not overwritten.

Returns:

None

Raises:
  • KeyError – if device configuration is not registered with client

  • ValueError – if the passed arguments are inconsistent or a timestamp does not match the ISO 8601 format.

  • AssertionError – if the message payload does not match the device configuration.

subscribe(topic=None, qos=0, options=None, properties=None)[source]

Extends the normal subscribe function of the paho.mqtt.client. If the topic argument is omitted the client will subscribe to all registered device command topics.

Parameters:
  • topic – A string specifying the subscription topic to subscribe to.

  • qos – The desired quality of service level for the subscription. Defaults to 0.

  • options – Not used.

  • properties – Not used.

Returns:

None

update_device(device: Device | Dict, qos=0, options=None, properties=None)[source]

Updates a registered device configuration. There is no opportunity to only partially update the device. Hence, your device model should be complete.

Parameters:
  • device – Configuration of an IoT device

  • qos – Quality of service can be 0, 1 or 2

  • options – MQTT v5.0 subscribe options

  • properties – MQTT v5.0 properties

Returns:

None

Raises:

KeyError – if device not yet registered

update_service_group(service_group: ServiceGroup | Dict)[source]

Updates a registered service group configuration. There is no opportunity to only partially update the device. Hence, your service group model should be complete.

Parameters:

service_group – Service group configuration

Returns:

None

Raises:

KeyError – if service group not yet registered