filip.clients.mqtt package
Subpackages
Submodules
filip.clients.mqtt.client module
Implementation of an extended MQTT client that automatically handles the topic subscription for FIWARE’s IoT communication pattern.
- class filip.clients.mqtt.client.IoTAMQTTClient(client_id='', clean_session=None, userdata=None, protocol=MQTTProtocolVersion.MQTTv311, transport='tcp', callback_api_version=CallbackAPIVersion.VERSION2, devices: List[Device] | None = None, service_groups: List[ServiceGroup] | None = None, custom_encoder: Dict[str, BaseEncoder] | None = None)[source]
Bases:
Client
This class is an extension to the MQTT client from the well established Eclipse Paho™ MQTT Python Client. The official documentation is located here: https://github.com/eclipse/paho.mqtt.python
The class adds additional functions to facilitate the communication to FIWARE’s IoT-Agent via MQTT. It magically generates and subscribes to all important topics that are necessary to establish a bi-directional communication with the IoT-Agent.
Note
The client does not sync the device configuration with the IoT-Agent. This is up to the user!
Note
The extension does not effect the normal workflow or any other functionality known from the original client.
The client does not yet support the retrieval of command configurations via mqtt documented here: https://fiware-iotagent-json.readthedocs.io/en/latest/usermanual/index.html#api-overview
Example
This example shows the basic usage of the client. It does not demonstrate its whole capabilities. Please check the single methods for more details. Please also keep in mind that this still requires provisioning of the device in the IoT-Agent and sending the commands via the context broker. For more details check the additional example section:
from filip.models.ngsi_v2.iot import Device, DeviceAttribute, DeviceCommand, ServiceGroup from filip.clients.mqtt import MQTTClient from filip.clients.mqtt.encoder import IoTA_Json # create a device configuration device_attr = DeviceAttribute(name='temperature', object_id='t', type="Number") device_command = DeviceCommand(name='heater', type="Boolean") device = Device(device_id='MyDevice', entity_name='MyDevice', entity_type='Thing', protocol='IoTA-JSON', transport='MQTT', apikey=YourApiKey, attributes=[device_attr], commands=[device_command]) service_group = ServiceGroup(apikey="YourApiKey", resource="/iot") mqttc = MQTTClient(client_id="YourID", userdata=None, protocol=mqtt.MQTTv5, transport="tcp", _devices = [device], service_groups = [service_group]) # create a callback function that will be called for incoming # commands and add it for a single device def on_command(client, obj, msg): apikey, device_id, payload = client.get_encoder().decode_message(msg=msg) # do_something with the message. # For instance write into a queue. # acknowledge a command client.publish(device_id=device_id, command_name=next(iter(payload)) payload=payload) mqttc.add_command_callback(on_command) # create a non blocking loop mqttc.loop_start() # publish a multi-measurement for a device mqttc.publish(device_id='MyDevice', payload={'t': 50}) # publish a single measurement for a device mqttc.publish(device_id='MyDevice', attribute_name='temperature', payload=50) # adding timestamps to measurements using the client # adding timestamps to measurements in payload from datetime import datetime mqttc.publish(device_id='MyDevice', payload={'t': 50, 'timeInstant': datetime.now().astimezone().isoformat()}, timestamp=true) # stop network loop and disconnect cleanly mqttc.loop_stop() mqttc.disconnect()
- add_command_callback(device_id: str, callback: Callable)[source]
Adds callback function for a device configuration.
- Parameters:
device_id – id of and IoT device
callback – function that will be called for incoming commands. This function should have the following format:
Example:
def on_command(client, obj, msg): apikey, device_id, payload = client.encoder.decode_message(msg=msg) # do_something with the message. # For instance write into a queue. # acknowledge a command. Here command are usually single # messages. The first key is equal to the commands name. client.publish(device_id=device_id, command_name=next(iter(payload)), payload=payload) mqttc.add_command_callback(device_id="MyDevice", callback=on_command)
- Returns:
None
- add_device(device: Device | Dict, qos=0, options=None, properties=None)[source]
Registers a device config with the mqtt client. Subsequently, the client will magically subscribe to the corresponding topics based on the device config and any matching registered service group config if exists.
Note
To register the device config only with this client is not sufficient for data streaming the configuration also needs to be registered with IoTA-Agent.
- Parameters:
device – Configuration of an IoT device
qos – Quality of service can be 0, 1 or 2
options – MQTT v5.0 subscribe options
properties – MQTT v5.0 properties
- Returns:
None
- Raises:
ValueError – if device configuration already exists
- add_encoder(encoder: Dict[str, BaseEncoder])[source]
- add_service_group(service_group: ServiceGroup | Dict)[source]
Registers a device service group with the client
- Parameters:
service_group – Service group configuration
- Returns:
None
- Raises:
ValueError – if service group already exists
- delete_device(device_id: str)[source]
Unregisters a device and removes its subscriptions and callbacks
- Parameters:
device_id – id of and IoT device
- Returns:
None
- delete_service_group(apikey)[source]
Unregisters a service group and removes
- Parameters:
apikey – Unique APIKey of the service group
- Returns:
None
- property devices
Returns as list of all registered device configurations Returns:
- get_device(device_id: str) Device [source]
Returns the configuration of a registered device.
- Parameters:
device_id – Id of the requested device
- Returns:
Device model of the requested device
- Return type:
- Raises:
KeyError – if requested device is not registered with the client
Example:
from filip.clients.mqtt import MQTTClient mqttc = MQTTClient() device = mqttc.get_device(device_id="MyDeviceId") print(device.json(indent=2)) print(type(device))
- get_encoder(encoder: str | PayloadProtocol)[source]
Returns the encoder by key
- Parameters:
encoder – encoder name
- Returns:
Subclass of Baseencoder
- get_service_group(apikey: str) ServiceGroup [source]
Returns registered service group configuration
- Parameters:
apikey – Unique APIKey of the service group
- Returns:
ServiceGroup
- Raises:
KeyError – if service group not yet registered
Example:
from filip.clients.mqtt import MQTTClient mqttc = MQTTClient() group = mqttc.get_service_group(apikey="MyAPIKEY") print(group.json(indent=2)) print(type(group))
- publish(topic=None, payload: Dict | Any | None = None, qos: int = 0, retain: bool = False, properties=None, device_id: str | None = None, attribute_name: str | None = None, command_name: str | None = None, timestamp: bool = False)[source]
Publish an MQTT Message to a specified topic. If you want to publish a device specific message to a device use the device_id argument for multi-measurement. The function will then automatically validate against the registered device configuration if the payload keys are valid. If you want to publish a single measurement the attribute_name argument is required as well.
Note
If the device_id argument is set, the topic argument will be ignored.
- Parameters:
topic – The topic that the message should be published on.
payload – The actual message to send. If not given, or set to None a zero length message will be used. Passing an int or float will result in the payload being converted to a string representing that number. If you wish to send a true int/float, use struct.pack() to create the payload you require. For publishing to a device use a dict containing the object_ids as keys.
qos – The quality of service level to use.
retain – If set to true, the message will be set as the “last known good”/retained message for the topic.
properties – (MQTT v5.0 only) the MQTT v5.0 properties to be included. Use the Properties class.
device_id – Id of the IoT device you want to publish for. The topics will automatically created. If set, the message type will be assumed to be multi measurement.
attribute_name – Name of an attribute of the device. Do only use this for single measurements. If set, command_name must be omitted.
command_name – Name of a command of the device that should be acknowledged. If set attribute_name must be omitted.
timestamp – If true the client will generate a valid timestamp based on utc and added to the multi measurement payload. If a timeInstant is already contained in the message payload it will not overwritten.
- Returns:
None
- Raises:
KeyError – if device configuration is not registered with client
ValueError – if the passed arguments are inconsistent or a timestamp does not match the ISO 8601 format.
AssertionError – if the message payload does not match the device configuration.
- subscribe(topic=None, qos=0, options=None, properties=None)[source]
Extends the normal subscribe function of the paho.mqtt.client. If the topic argument is omitted the client will subscribe to all registered device command topics.
- Parameters:
topic – A string specifying the subscription topic to subscribe to.
qos – The desired quality of service level for the subscription. Defaults to 0.
options – Not used.
properties – Not used.
- Returns:
None
- update_device(device: Device | Dict, qos=0, options=None, properties=None)[source]
Updates a registered device configuration. There is no opportunity to only partially update the device. Hence, your device model should be complete.
- Parameters:
device – Configuration of an IoT device
qos – Quality of service can be 0, 1 or 2
options – MQTT v5.0 subscribe options
properties – MQTT v5.0 properties
- Returns:
None
- Raises:
KeyError – if device not yet registered
- update_service_group(service_group: ServiceGroup | Dict)[source]
Updates a registered service group configuration. There is no opportunity to only partially update the device. Hence, your service group model should be complete.
- Parameters:
service_group – Service group configuration
- Returns:
None
- Raises:
KeyError – if service group not yet registered
Module contents
MQTT client for streaming data via FIWARE’s IoT-Agent