Coverage for filip/clients/mqtt/client.py: 89%
174 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-03-10 13:43 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-03-10 13:43 +0000
1"""
2Implementation of an extended MQTT client that automatically handles the
3topic subscription for FIWARE's IoT communication pattern.
4"""
6import itertools
7import logging
8import warnings
9from datetime import datetime
10from typing import Any, Callable, Dict, List, Tuple, Union
12import paho.mqtt.client as mqtt
14from filip.clients.mqtt.encoder import BaseEncoder, Json, Ultralight
15from filip.models.mqtt import IoTAMQTTMessageType
16from filip.models.ngsi_v2.iot import (
17 Device,
18 PayloadProtocol,
19 ServiceGroup,
20 TransportProtocol,
21)
24class IoTAMQTTClient(mqtt.Client):
25 """
26 This class is an extension to the MQTT client from the well established
27 Eclipse Paho™ MQTT Python Client. The official documentation is located
28 here: https://github.com/eclipse/paho.mqtt.python
30 The class adds additional functions to facilitate the communication to
31 FIWARE's IoT-Agent via MQTT. It magically generates and subscribes to all
32 important topics that are necessary to establish a
33 bi-directional communication with the IoT-Agent.
35 Note:
36 The client does not sync the device configuration with the IoT-Agent.
37 This is up to the user!
39 Note:
40 The extension does not effect the normal workflow or any other
41 functionality known from the original client.
43 The client does not yet support the retrieval of command
44 configurations via mqtt documented here:
45 https://fiware-iotagent-json.readthedocs.io/en/latest/usermanual/index.html#api-overview
47 Example:
48 This example shows the basic usage of the client. It does not
49 demonstrate its whole capabilities. Please check the single methods
50 for more details. Please also keep in mind that this still requires
51 provisioning of the device in the IoT-Agent and sending the commands
52 via the context broker. For more details check the additional example
53 section::
55 from filip.models.ngsi_v2.iot import Device, DeviceAttribute, DeviceCommand, ServiceGroup
56 from filip.clients.mqtt import MQTTClient
57 from filip.clients.mqtt.encoder import IoTA_Json
59 # create a device configuration
60 device_attr = DeviceAttribute(name='temperature',
61 object_id='t',
62 type="Number")
63 device_command = DeviceCommand(name='heater', type="Boolean")
64 device = Device(device_id='MyDevice',
65 entity_name='MyDevice',
66 entity_type='Thing',
67 protocol='IoTA-JSON',
68 transport='MQTT',
69 apikey=YourApiKey,
70 attributes=[device_attr],
71 commands=[device_command])
73 service_group = ServiceGroup(apikey="YourApiKey", resource="/iot")
75 mqttc = MQTTClient(client_id="YourID",
76 userdata=None,
77 protocol=mqtt.MQTTv5,
78 transport="tcp",
79 _devices = [device],
80 service_groups = [service_group])
82 # create a callback function that will be called for incoming
83 # commands and add it for a single device
84 def on_command(client, obj, msg):
85 apikey, device_id, payload = \
86 client.get_encoder().decode_message(msg=msg)
88 # do_something with the message.
89 # For instance write into a queue.
91 # acknowledge a command
92 client.publish(device_id=device_id,
93 command_name=next(iter(payload))
94 payload=payload)
96 mqttc.add_command_callback(on_command)
98 # create a non blocking loop
99 mqttc.loop_start()
101 # publish a multi-measurement for a device
102 mqttc.publish(device_id='MyDevice', payload={'t': 50})
104 # publish a single measurement for a device
105 mqttc.publish(device_id='MyDevice',
106 attribute_name='temperature',
107 payload=50)
109 # adding timestamps to measurements using the client
112 # adding timestamps to measurements in payload
113 from datetime import datetime
115 mqttc.publish(device_id='MyDevice',
116 payload={'t': 50,
117 'timeInstant': datetime.now().astimezone().isoformat()},
118 timestamp=true)
120 # stop network loop and disconnect cleanly
121 mqttc.loop_stop()
122 mqttc.disconnect()
124 """
126 def __init__(
127 self,
128 client_id="",
129 clean_session=None,
130 userdata=None,
131 protocol=mqtt.MQTTv311,
132 transport="tcp",
133 callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
134 devices: List[Device] = None,
135 service_groups: List[ServiceGroup] = None,
136 custom_encoder: Dict[str, BaseEncoder] = None,
137 ):
138 """
139 Args:
140 client_id:
141 Unique client id string used when connecting
142 to the broker. If client_id is zero length or None, then the
143 behaviour is defined by which protocol version is in use. If
144 using MQTT v3.1.1, then a zero length client id will be sent
145 to the broker and the broker will generate a random for the
146 client. If using MQTT v3.1 then an id will be randomly
147 generated. In both cases, clean_session must be True.
148 If this is not the case a ValueError will be raised.
149 clean_session:
150 boolean that determines the client type. If True,
151 the broker will remove all information about this client when it
152 disconnects. If False, the client is a persistent client and
153 subscription information and queued messages will be retained
154 when the client disconnects.
155 Note that a client will never discard its own outgoing
156 messages on disconnect. Calling connect() or reconnect() will
157 cause the messages to be resent. Use reinitialise() to reset
158 a client to its original state. The clean_session argument
159 only applies to MQTT versions v3.1.1 and v3.1. It is not
160 accepted if the MQTT version is v5.0 - use the clean_start
161 argument on connect() instead.
162 userdata:
163 defined data of any type that is passed as the "userdata"
164 parameter to callbacks. It may be updated at a later point
165 with the user_data_set() function.
166 protocol:
167 explicit setting of the MQTT version to use for this client.
168 Can be paho.mqtt.client.MQTTv311 (v3.1.1),
169 paho.mqtt.client.MQTTv31 (v3.1) or paho.mqtt.client.MQTTv5
170 (v5.0), with the default being v3.1.1.
171 transport:
172 Set to "websockets" to use WebSockets as the transport
173 mechanism. Set to "tcp" to use raw TCP, which is the default.
174 devices:
175 List of device configurations that will be registered
176 with the client. Consequently, the client will be able to
177 subscribe to all registered device topics. Furthermore,
178 after registration messages can simply published by the
179 _devices id.
180 service_groups:
181 List of service group configurations that will be registered
182 with the client. These should be known upon subscribing
183 because the client will check for a matching service group if
184 this is not known or registered with the IoT-Agent service
185 the receiving of commands will fail. Please check the
186 official documentation of the IoT-Agents API for more details.
187 custom_encoder:
188 Custom encoder class that will automatically parse the supported
189 payload formats to a dictionary and vice versa. This
190 essentially saves boiler plate code.
191 """
192 # initialize parent client
193 super().__init__(
194 client_id=client_id,
195 clean_session=clean_session,
196 userdata=userdata,
197 protocol=protocol,
198 callback_api_version=callback_api_version,
199 transport=transport,
200 )
202 # setup logging functionality
203 self.logger = logging.getLogger(name=f"{self.__class__.__name__}")
204 self.logger.addHandler(logging.NullHandler())
205 self.enable_logger(self.logger)
207 # create dictionary holding the registered service groups
208 self.service_groups: Dict[Tuple[str, str], ServiceGroup]
209 if service_groups:
210 self.service_groups = {gr.apikey: gr for gr in service_groups}
211 else:
212 self.service_groups = {}
214 # create dictionary holding the registered device configurations
215 # check if all _devices have the right transport protocol
216 self._devices: Dict[str, Device] = {}
217 if devices:
218 self.devices = devices
220 # create dict with available encoders
221 self._encoders = {"IoTA-JSON": Json(), "PDI-IoTA-UltraLight": Ultralight()}
223 # add custom encoder for message parsing
224 if custom_encoder:
225 self.add_encoder(custom_encoder)
227 @property
228 def devices(self):
229 """
230 Returns as list of all registered device configurations
231 Returns:
233 """
234 return list(self._devices.values())
236 @devices.setter
237 def devices(self, devices: List[Device]):
238 """
239 Sets list of device configurations
241 Args:
242 devices: List of device configurations
244 Returns:
245 None
247 Raises:
248 ValueError: if duplicate device id was found
249 """
250 for device in devices:
251 try:
252 self.add_device(device=device)
253 except ValueError:
254 raise ValueError(f"Duplicate device_id: {device.device_id}")
256 def get_encoder(self, encoder: Union[str, PayloadProtocol]):
257 """
258 Returns the encoder by key
260 Args:
261 encoder: encoder name
263 Returns:
264 Subclass of Baseencoder
265 """
266 return self._encoders.get(encoder)
268 def add_encoder(self, encoder: Dict[str, BaseEncoder]):
269 for value in encoder.values():
270 assert isinstance(
271 value, BaseEncoder
272 ), f"Encoder must be a subclass of {type(BaseEncoder)}"
274 self._encoders.update(encoder)
276 def __validate_device(self, device: Union[Device, Dict]) -> Device:
277 """
278 Validates configuration of an IoT Device
280 Args:
281 device: device model to check on
283 Returns:
284 Device: validated model
286 Raises:
287 AssertionError: for faulty configurations
288 """
289 if isinstance(device, dict):
290 device = Device.model_validate(device)
292 assert isinstance(device, Device), "Invalid device configuration!"
294 assert (
295 device.transport == TransportProtocol.MQTT
296 ), "Unsupported transport protocol found in device configuration!"
298 if device.apikey in self.service_groups.keys():
299 pass
300 # check if matching service group is registered
301 else:
302 msg = (
303 "Could not find matching service group! "
304 "Communication may not work correctly!"
305 )
306 self.logger.warning(msg=msg)
307 warnings.warn(message=msg)
309 return device
311 def __create_topic(
312 self, *, topic_type: IoTAMQTTMessageType, device: Device, attribute: str = None
313 ) -> str:
314 """
315 Creates a topic for a device configuration based on the requested
316 topic type.
318 Args:
319 device:
320 Configuration of an IoT device
321 topic_type:
322 type of the topic to be created,
323 'multi' for topics that the device is suppose to publish on.
324 'single' for topics that the device is suppose to publish on.
325 'cmd' for topic the device is expecting its commands on.
326 'cmdexe' for topic the device can acknowledge its commands on.
327 'configuration' for topic the device can request command
328 configurations on
329 attribute:
330 attribute needs to be set for single measurements
331 Returns:
332 string with topic
334 Raises:
335 KeyError:
336 If unknown message type is used
337 ValueError:
338 If attribute name is missing for single measurements
339 """
340 if topic_type == IoTAMQTTMessageType.MULTI:
341 topic = "/".join(
342 (
343 self._encoders[device.protocol].prefix,
344 device.apikey,
345 device.device_id,
346 "attrs",
347 )
348 )
349 elif topic_type == IoTAMQTTMessageType.SINGLE:
350 if attribute:
351 attr = next(
352 attr for attr in device.attributes if attr.name == attribute
353 )
354 if attr.object_id:
355 attr_suffix = attr.object_id
356 else:
357 attr_suffix = attr.name
358 topic = "/".join(
359 (
360 self._encoders[device.protocol].prefix,
361 device.apikey,
362 device.device_id,
363 "attrs",
364 attr_suffix,
365 )
366 )
367 else:
368 raise ValueError("Missing argument name for single measurement")
369 elif topic_type == IoTAMQTTMessageType.CMD:
370 topic = "/" + "/".join((device.apikey, device.device_id, "cmd"))
371 elif topic_type == IoTAMQTTMessageType.CMDEXE:
372 topic = "/".join(
373 (
374 self._encoders[device.protocol].prefix,
375 device.apikey,
376 device.device_id,
377 "cmdexe",
378 )
379 )
380 elif topic_type == IoTAMQTTMessageType.CONFIG:
381 topic = "/".join(
382 (
383 self._encoders[device.protocol].prefix,
384 device.apikey,
385 device.device_id,
386 "configuration",
387 )
388 )
389 else:
390 raise KeyError("topic_type not supported")
391 return topic
393 def __subscribe_commands(
394 self, *, device: Device = None, qos=0, options=None, properties=None
395 ):
396 """
397 Subscribes commands based on device configuration. If device argument is
398 omitted the function will subscribe to all topics of already registered
399 _devices.
400 Additionally, it will also check if a matching service group is
401 registered with the client. If nor a warning will be raised.
403 Args:
404 device: Configuration of an IoT device
405 qos: Quality of service can be 0, 1 or 2
406 options: MQTT v5.0 subscribe options
407 properties: MQTT v5.0 properties
409 Returns:
410 None
411 """
412 if Device:
413 if len(device.commands) > 0:
414 topic = self.__create_topic(
415 device=device, topic_type=IoTAMQTTMessageType.CMD
416 )
417 super().subscribe(
418 topic=topic, qos=qos, options=options, properties=properties
419 )
420 else:
421 # call itself but with device argument for all registered _devices
422 for device in self._devices.values():
423 self.__subscribe_commands(
424 device=device, qos=qos, options=options, properties=properties
425 )
427 def get_service_group(self, apikey: str) -> ServiceGroup:
428 """
429 Returns registered service group configuration
431 Args:
432 apikey: Unique APIKey of the service group
434 Returns:
435 ServiceGroup
437 Raises:
438 KeyError: if service group not yet registered
440 Example::
442 from filip.clients.mqtt import MQTTClient
444 mqttc = MQTTClient()
445 group = mqttc.get_service_group(apikey="MyAPIKEY")
446 print(group.json(indent=2))
447 print(type(group))
448 """
449 group = self.service_groups.get(apikey, None)
450 if group is None:
451 raise KeyError("Service group with apikey %s not found!", apikey)
452 return group
454 def add_service_group(self, service_group: Union[ServiceGroup, Dict]):
455 """
456 Registers a device service group with the client
458 Args:
459 service_group: Service group configuration
461 Returns:
462 None
464 Raises:
465 ValueError: if service group already exists
466 """
467 if isinstance(service_group, dict):
468 service_group = ServiceGroup.model_validate(service_group)
469 assert isinstance(
470 service_group, ServiceGroup
471 ), "Invalid content for service group!"
473 if self.service_groups.get(service_group.apikey, None) is None:
474 pass
475 else:
476 raise ValueError("Service group already exists! %s", service_group.apikey)
477 # add service group configuration to the service group list
478 self.service_groups[service_group.apikey] = service_group
480 def delete_service_group(self, apikey):
481 """
482 Unregisters a service group and removes
484 Args:
485 apikey: Unique APIKey of the service group
487 Returns:
488 None
489 """
490 group = self.service_groups.pop(apikey, None)
491 if group:
492 self.logger.info("Successfully unregistered Service Group '%s'!", apikey)
493 else:
494 self.logger.error("Could not unregister service group '%s'!", apikey)
495 raise KeyError("Device not found!")
497 def update_service_group(self, service_group: Union[ServiceGroup, Dict]):
498 """
499 Updates a registered service group configuration. There is no
500 opportunity to only partially update the device. Hence, your service
501 group model should be complete.
503 Args:
504 service_group: Service group configuration
506 Returns:
507 None
509 Raises:
510 KeyError: if service group not yet registered
511 """
512 if isinstance(service_group, dict):
513 service_group = ServiceGroup.model_validate(service_group)
514 assert isinstance(
515 service_group, ServiceGroup
516 ), "Invalid content for service group"
518 if self.service_groups.get(service_group.apikey, None) is None:
519 raise KeyError("Service group not found! %s", service_group.apikey)
520 # add service group configuration to the service group list
521 self.service_groups[service_group.apikey] = service_group
523 def get_device(self, device_id: str) -> Device:
524 """
525 Returns the configuration of a registered device.
527 Args:
528 device_id: Id of the requested device
530 Returns:
531 Device: Device model of the requested device
533 Raises:
534 KeyError: if requested device is not registered with the client
536 Example::
538 from filip.clients.mqtt import MQTTClient
540 mqttc = MQTTClient()
541 device = mqttc.get_device(device_id="MyDeviceId")
542 print(device.json(indent=2))
543 print(type(device))
544 """
545 return self._devices[device_id]
547 def add_device(
548 self, device: Union[Device, Dict], qos=0, options=None, properties=None
549 ):
550 """
551 Registers a device config with the mqtt client. Subsequently,
552 the client will magically subscribe to the corresponding topics based
553 on the device config and any matching registered service group config
554 if exists.
556 Note:
557 To register the device config only with this client is not
558 sufficient for data streaming the configuration also needs to be
559 registered with IoTA-Agent.
561 Args:
562 device: Configuration of an IoT device
563 qos: Quality of service can be 0, 1 or 2
564 options: MQTT v5.0 subscribe options
565 properties: MQTT v5.0 properties
567 Returns:
568 None
570 Raises:
571 ValueError: if device configuration already exists
572 """
573 device = self.__validate_device(device=device)
575 if self._devices.get(device.device_id, None) is None:
576 pass
577 else:
578 raise ValueError("Device already exists! %s", device.device_id)
579 # add device configuration to the device list
580 self._devices[device.device_id] = device
581 # subscribes to the command topic
582 self.__subscribe_commands(
583 device=device, qos=qos, options=options, properties=properties
584 )
586 def delete_device(self, device_id: str):
587 """
588 Unregisters a device and removes its subscriptions and callbacks
590 Args:
591 device_id: id of and IoT device
593 Returns:
594 None
595 """
596 device = self._devices.pop(device_id, None)
597 if device:
598 topic = self.__create_topic(
599 device=device, topic_type=IoTAMQTTMessageType.CMD
600 )
601 self.unsubscribe(topic=topic)
602 self.message_callback_remove(sub=topic)
603 self.logger.info("Successfully unregistered Device '%s'!", device_id)
604 else:
605 self.logger.error("Could not unregister device '%s'", device_id)
607 def update_device(
608 self, device: Union[Device, Dict], qos=0, options=None, properties=None
609 ):
610 """
611 Updates a registered device configuration. There is no opportunity
612 to only partially update the device. Hence, your device model should
613 be complete.
615 Args:
616 device: Configuration of an IoT device
617 qos: Quality of service can be 0, 1 or 2
618 options: MQTT v5.0 subscribe options
619 properties: MQTT v5.0 properties
621 Returns:
622 None
624 Raises:
625 KeyError: if device not yet registered
626 """
627 device = self.__validate_device(device=device)
629 if self._devices.get(device.device_id, None) is None:
630 raise KeyError("Device not found! %s", device.device_id)
632 # update device configuration in the device list
633 self._devices[device.device_id] = device
634 # subscribes to the command topic
635 self.__subscribe_commands(
636 device=device, qos=qos, options=options, properties=properties
637 )
639 def add_command_callback(self, device_id: str, callback: Callable):
640 """
641 Adds callback function for a device configuration.
643 Args:
644 device_id:
645 id of and IoT device
646 callback:
647 function that will be called for incoming commands.
648 This function should have the following format:
650 Example::
652 def on_command(client, obj, msg):
653 apikey, device_id, payload = \
654 client.encoder.decode_message(msg=msg)
656 # do_something with the message.
657 # For instance write into a queue.
659 # acknowledge a command. Here command are usually single
660 # messages. The first key is equal to the commands name.
661 client.publish(device_id=device_id,
662 command_name=next(iter(payload)),
663 payload=payload)
665 mqttc.add_command_callback(device_id="MyDevice",
666 callback=on_command)
668 Returns:
669 None
670 """
671 device = self._devices.get(device_id, None)
672 if device is None:
673 raise KeyError("Device does not exist! %s", device_id)
674 self.__subscribe_commands(device=device)
675 topic = self.__create_topic(device=device, topic_type=IoTAMQTTMessageType.CMD)
676 self.message_callback_add(topic, callback)
678 def publish(
679 self,
680 topic=None,
681 payload: Union[Dict, Any] = None,
682 qos: int = 0,
683 retain: bool = False,
684 properties=None,
685 device_id: str = None,
686 attribute_name: str = None,
687 command_name: str = None,
688 timestamp: bool = False,
689 ):
690 """
691 Publish an MQTT Message to a specified topic. If you want to publish
692 a device specific message to a device use the device_id argument for
693 multi-measurement. The function will then automatically validate
694 against the registered device configuration if the payload keys are
695 valid. If you want to publish a single measurement the attribute_name
696 argument is required as well.
698 Note:
699 If the device_id argument is set, the topic argument will be
700 ignored.
702 Args:
703 topic:
704 The topic that the message should be published on.
705 payload:
706 The actual message to send. If not given, or set to None a
707 zero length message will be used. Passing an int or float will
708 result in the payload being converted to a string
709 representing that number. If you wish to send a true
710 int/float, use struct.pack() to create the
711 payload you require. For publishing to a device use a dict
712 containing the object_ids as keys.
713 qos:
714 The quality of service level to use.
715 retain:
716 If set to true, the message will be set as the "last known
717 good"/retained message for the topic.
718 properties:
719 (MQTT v5.0 only) the MQTT v5.0 properties to be included.
720 Use the Properties class.
721 device_id:
722 Id of the IoT device you want to publish for. The topics will
723 automatically created. If set, the message type will be
724 assumed to be multi measurement.
725 attribute_name:
726 Name of an attribute of the device. Do only use this for
727 single measurements. If set, `command_name` must
728 be omitted.
729 command_name:
730 Name of a command of the device that should be acknowledged. If
731 set `attribute_name` must be omitted.
732 timestamp:
733 If `true` the client will generate a valid timestamp based on
734 utc and added to the multi measurement payload.
735 If a `timeInstant` is already contained in the
736 message payload it will not overwritten.
738 Returns:
739 None
741 Raises:
742 KeyError: if device configuration is not registered with client
743 ValueError: if the passed arguments are inconsistent or a
744 timestamp does not match the ISO 8601 format.
745 AssertionError: if the message payload does not match the device
746 configuration.
747 """
749 # TODO: time stamps are not tested yet
751 if device_id:
752 device = self.get_device(device_id=device_id)
754 # create message for multi measurement payload
755 if attribute_name is None and command_name is None:
756 assert isinstance(payload, dict), "Payload must be a dictionary"
758 if timestamp and "timeInstant" not in payload.keys():
759 payload["timeInstant"] = datetime.utcnow()
760 # validate if dict keys match device configuration
762 msg_payload = payload.copy()
763 for key in payload.keys():
764 for attr in device.attributes:
765 key_constraint = key == "timeInstant"
767 def elif_action(msg):
768 None
770 if attr.object_id is not None:
771 key_constraint = key_constraint or (key in attr.object_id)
773 def elif_action(msg):
774 msg[attr.object_id] = msg.pop(key)
776 # could be made more compact by pulling up the second condition
777 # but would probably make the code less readable...
778 if key_constraint:
779 break
781 elif key == attr.name:
782 elif_action(msg_payload)
783 break
785 else:
786 err_msg = (
787 f"Attribute key '{key}' is not allowed "
788 f"in the message payload for this "
789 f"device configuration with device_id "
790 f"'{device_id}'"
791 )
792 raise KeyError(err_msg)
793 topic = self.__create_topic(
794 device=device, topic_type=IoTAMQTTMessageType.MULTI
795 )
796 payload = self._encoders[device.protocol].encode_msg(
797 device_id=device_id,
798 payload=msg_payload,
799 msg_type=IoTAMQTTMessageType.MULTI,
800 )
802 # create message for command acknowledgement
803 elif attribute_name is None and command_name:
804 assert isinstance(payload, Dict), "Payload must be a dictionary"
805 assert (
806 len(payload.keys()) == 1
807 ), "Cannot acknowledge multiple commands simultaneously"
808 assert next(iter(payload.keys())) in [
809 cmd.name for cmd in device.commands
810 ], "Unknown command for this device!"
811 topic = self.__create_topic(
812 device=device, topic_type=IoTAMQTTMessageType.CMDEXE
813 )
814 payload = self._encoders[device.protocol].encode_msg(
815 device_id=device_id,
816 payload=payload,
817 msg_type=IoTAMQTTMessageType.CMDEXE,
818 )
820 # create message for single measurement
821 elif attribute_name and command_name is None:
822 topic = self.__create_topic(
823 device=device,
824 topic_type=IoTAMQTTMessageType.SINGLE,
825 attribute=attribute_name,
826 )
827 payload = self._encoders[device.protocol].encode_msg(
828 device_id=device_id,
829 payload=payload,
830 msg_type=IoTAMQTTMessageType.SINGLE,
831 )
832 else:
833 raise ValueError("Inconsistent arguments!")
835 super().publish(
836 topic=topic, payload=payload, qos=qos, retain=retain, properties=properties
837 )
839 def subscribe(self, topic=None, qos=0, options=None, properties=None):
840 """
841 Extends the normal subscribe function of the paho.mqtt.client.
842 If the topic argument is omitted the client will subscribe to all
843 registered device command topics.
845 Args:
846 topic:
847 A string specifying the subscription topic to subscribe to.
848 qos:
849 The desired quality of service level for the subscription.
850 Defaults to 0.
851 options: Not used.
852 properties: Not used.
854 Returns:
855 None
856 """
857 if topic:
858 super().subscribe(
859 topic=topic, qos=qos, options=options, properties=properties
860 )
861 else:
862 for device in self._devices.values():
863 self.__subscribe_commands(
864 device=device, qos=qos, options=options, properties=properties
865 )