Coverage for filip/clients/mqtt/client.py: 90%
172 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-20 16:54 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-20 16:54 +0000
1"""
2Implementation of an extended MQTT client that automatically handles the
3topic subscription for FIWARE's IoT communication pattern.
4"""
5import itertools
6import logging
7import warnings
8from datetime import datetime
9from typing import Any, Callable, Dict, List, Tuple, Union
11import paho.mqtt.client as mqtt
13from filip.clients.mqtt.encoder import BaseEncoder, Json, Ultralight
14from filip.models.mqtt import IoTAMQTTMessageType
15from filip.models.ngsi_v2.iot import \
16 Device, \
17 PayloadProtocol, \
18 ServiceGroup, \
19 TransportProtocol
22class IoTAMQTTClient(mqtt.Client):
23 """
24 This class is an extension to the MQTT client from the well established
25 Eclipse Paho™ MQTT Python Client. The official documentation is located
26 here: https://github.com/eclipse/paho.mqtt.python
28 The class adds additional functions to facilitate the communication to
29 FIWARE's IoT-Agent via MQTT. It magically generates and subscribes to all
30 important topics that are necessary to establish a
31 bi-directional communication with the IoT-Agent.
33 Note:
34 The client does not sync the device configuration with the IoT-Agent.
35 This is up to the user!
37 Note:
38 The extension does not effect the normal workflow or any other
39 functionality known from the original client.
41 The client does not yet support the retrieval of command
42 configurations via mqtt documented here:
43 https://fiware-iotagent-json.readthedocs.io/en/latest/usermanual/index.html#api-overview
45 Example:
46 This example shows the basic usage of the client. It does not
47 demonstrate its whole capabilities. Please check the single methods
48 for more details. Please also keep in mind that this still requires
49 provisioning of the device in the IoT-Agent and sending the commands
50 via the context broker. For more details check the additional example
51 section::
53 from filip.models.ngsi_v2.iot import Device, DeviceAttribute, DeviceCommand, ServiceGroup
54 from filip.clients.mqtt import MQTTClient
55 from filip.clients.mqtt.encoder import IoTA_Json
57 # create a device configuration
58 device_attr = DeviceAttribute(name='temperature',
59 object_id='t',
60 type="Number")
61 device_command = DeviceCommand(name='heater', type="Boolean")
62 device = Device(device_id='MyDevice',
63 entity_name='MyDevice',
64 entity_type='Thing',
65 protocol='IoTA-JSON',
66 transport='MQTT',
67 apikey=YourApiKey,
68 attributes=[device_attr],
69 commands=[device_command])
71 service_group = ServiceGroup(apikey="YourApiKey", resource="/iot")
73 mqttc = MQTTClient(client_id="YourID",
74 userdata=None,
75 protocol=mqtt.MQTTv5,
76 transport="tcp",
77 _devices = [device],
78 service_groups = [service_group])
80 # create a callback function that will be called for incoming
81 # commands and add it for a single device
82 def on_command(client, obj, msg):
83 apikey, device_id, payload = \
84 client.get_encoder().decode_message(msg=msg)
86 # do_something with the message.
87 # For instance write into a queue.
89 # acknowledge a command
90 client.publish(device_id=device_id,
91 command_name=next(iter(payload))
92 payload=payload)
94 mqttc.add_command_callback(on_command)
96 # create a non blocking loop
97 mqttc.loop_start()
99 # publish a multi-measurement for a device
100 mqttc.publish(device_id='MyDevice', payload={'t': 50})
102 # publish a single measurement for a device
103 mqttc.publish(device_id='MyDevice',
104 attribute_name='temperature',
105 payload=50)
107 # adding timestamps to measurements using the client
110 # adding timestamps to measurements in payload
111 from datetime import datetime
113 mqttc.publish(device_id='MyDevice',
114 payload={'t': 50,
115 'timeInstant': datetime.now().astimezone().isoformat()},
116 timestamp=true)
118 # stop network loop and disconnect cleanly
119 mqttc.loop_stop()
120 mqttc.disconnect()
122 """
124 def __init__(self,
125 client_id="",
126 clean_session=None,
127 userdata=None,
128 protocol=mqtt.MQTTv311,
129 transport="tcp",
130 callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
131 devices: List[Device] = None,
132 service_groups: List[ServiceGroup] = None,
133 custom_encoder: Dict[str, BaseEncoder] = None):
134 """
135 Args:
136 client_id:
137 Unique client id string used when connecting
138 to the broker. If client_id is zero length or None, then the
139 behaviour is defined by which protocol version is in use. If
140 using MQTT v3.1.1, then a zero length client id will be sent
141 to the broker and the broker will generate a random for the
142 client. If using MQTT v3.1 then an id will be randomly
143 generated. In both cases, clean_session must be True.
144 If this is not the case a ValueError will be raised.
145 clean_session:
146 boolean that determines the client type. If True,
147 the broker will remove all information about this client when it
148 disconnects. If False, the client is a persistent client and
149 subscription information and queued messages will be retained
150 when the client disconnects.
151 Note that a client will never discard its own outgoing
152 messages on disconnect. Calling connect() or reconnect() will
153 cause the messages to be resent. Use reinitialise() to reset
154 a client to its original state. The clean_session argument
155 only applies to MQTT versions v3.1.1 and v3.1. It is not
156 accepted if the MQTT version is v5.0 - use the clean_start
157 argument on connect() instead.
158 userdata:
159 defined data of any type that is passed as the "userdata"
160 parameter to callbacks. It may be updated at a later point
161 with the user_data_set() function.
162 protocol:
163 explicit setting of the MQTT version to use for this client.
164 Can be paho.mqtt.client.MQTTv311 (v3.1.1),
165 paho.mqtt.client.MQTTv31 (v3.1) or paho.mqtt.client.MQTTv5
166 (v5.0), with the default being v3.1.1.
167 transport:
168 Set to "websockets" to use WebSockets as the transport
169 mechanism. Set to "tcp" to use raw TCP, which is the default.
170 devices:
171 List of device configurations that will be registered
172 with the client. Consequently, the client will be able to
173 subscribe to all registered device topics. Furthermore,
174 after registration messages can simply published by the
175 _devices id.
176 service_groups:
177 List of service group configurations that will be registered
178 with the client. These should be known upon subscribing
179 because the client will check for a matching service group if
180 this is not known or registered with the IoT-Agent service
181 the receiving of commands will fail. Please check the
182 official documentation of the IoT-Agents API for more details.
183 custom_encoder:
184 Custom encoder class that will automatically parse the supported
185 payload formats to a dictionary and vice versa. This
186 essentially saves boiler plate code.
187 """
188 # initialize parent client
189 super().__init__(client_id=client_id,
190 clean_session=clean_session,
191 userdata=userdata,
192 protocol=protocol,
193 callback_api_version=callback_api_version,
194 transport=transport)
196 # setup logging functionality
197 self.logger = logging.getLogger(
198 name=f"{self.__class__.__name__}")
199 self.logger.addHandler(logging.NullHandler())
200 self.enable_logger(self.logger)
202 # create dictionary holding the registered service groups
203 self.service_groups: Dict[Tuple[str, str], ServiceGroup]
204 if service_groups:
205 self.service_groups = {gr.apikey: gr for gr in service_groups}
206 else:
207 self.service_groups = {}
209 # create dictionary holding the registered device configurations
210 # check if all _devices have the right transport protocol
211 self._devices: Dict[str, Device] = {}
212 if devices:
213 self.devices = devices
215 # create dict with available encoders
216 self._encoders = {'IoTA-JSON': Json(),
217 'PDI-IoTA-UltraLight': Ultralight()}
219 # add custom encoder for message parsing
220 if custom_encoder:
221 self.add_encoder(custom_encoder)
223 @property
224 def devices(self):
225 """
226 Returns as list of all registered device configurations
227 Returns:
229 """
230 return list(self._devices.values())
232 @devices.setter
233 def devices(self, devices: List[Device]):
234 """
235 Sets list of device configurations
237 Args:
238 devices: List of device configurations
240 Returns:
241 None
243 Raises:
244 ValueError: if duplicate device id was found
245 """
246 for device in devices:
247 try:
248 self.add_device(device=device)
249 except ValueError:
250 raise ValueError(f"Duplicate device_id: {device.device_id}")
252 def get_encoder(self, encoder: Union[str, PayloadProtocol]):
253 """
254 Returns the encoder by key
256 Args:
257 encoder: encoder name
259 Returns:
260 Subclass of Baseencoder
261 """
262 return self._encoders.get(encoder)
264 def add_encoder(self, encoder: Dict[str, BaseEncoder]):
265 for value in encoder.values():
266 assert isinstance(value, BaseEncoder), \
267 f"Encoder must be a subclass of {type(BaseEncoder)}"
269 self._encoders.update(encoder)
271 def __validate_device(self, device: Union[Device, Dict]) -> Device:
272 """
273 Validates configuration of an IoT Device
275 Args:
276 device: device model to check on
278 Returns:
279 Device: validated model
281 Raises:
282 AssertionError: for faulty configurations
283 """
284 if isinstance(device, dict):
285 device = Device.model_validate(device)
287 assert isinstance(device, Device), "Invalid device configuration!"
289 assert device.transport == TransportProtocol.MQTT, \
290 "Unsupported transport protocol found in device configuration!"
292 if device.apikey in self.service_groups.keys():
293 pass
294 # check if matching service group is registered
295 else:
296 msg = "Could not find matching service group! " \
297 "Communication may not work correctly!"
298 self.logger.warning(msg=msg)
299 warnings.warn(message=msg)
301 return device
303 def __create_topic(self,
304 *,
305 topic_type: IoTAMQTTMessageType,
306 device: Device,
307 attribute: str = None) -> str:
308 """
309 Creates a topic for a device configuration based on the requested
310 topic type.
312 Args:
313 device:
314 Configuration of an IoT device
315 topic_type:
316 type of the topic to be created,
317 'multi' for topics that the device is suppose to publish on.
318 'single' for topics that the device is suppose to publish on.
319 'cmd' for topic the device is expecting its commands on.
320 'cmdexe' for topic the device can acknowledge its commands on.
321 'configuration' for topic the device can request command
322 configurations on
323 attribute:
324 attribute needs to be set for single measurements
325 Returns:
326 string with topic
328 Raises:
329 KeyError:
330 If unknown message type is used
331 ValueError:
332 If attribute name is missing for single measurements
333 """
334 if topic_type == IoTAMQTTMessageType.MULTI:
335 topic = '/'.join((self._encoders[device.protocol].prefix,
336 device.apikey,
337 device.device_id,
338 'attrs'))
339 elif topic_type == IoTAMQTTMessageType.SINGLE:
340 if attribute:
341 attr = next(attr for attr in device.attributes
342 if attr.name == attribute)
343 if attr.object_id:
344 attr_suffix = attr.object_id
345 else:
346 attr_suffix = attr.name
347 topic = '/'.join((self._encoders[device.protocol].prefix,
348 device.apikey,
349 device.device_id,
350 'attrs',
351 attr_suffix))
352 else:
353 raise ValueError("Missing argument name for single measurement")
354 elif topic_type == IoTAMQTTMessageType.CMD:
355 topic = '/' + '/'.join((device.apikey, device.device_id, 'cmd'))
356 elif topic_type == IoTAMQTTMessageType.CMDEXE:
357 topic = '/'.join((self._encoders[device.protocol].prefix,
358 device.apikey,
359 device.device_id,
360 'cmdexe'))
361 elif topic_type == IoTAMQTTMessageType.CONFIG:
362 topic = '/'.join((self._encoders[device.protocol].prefix,
363 device.apikey,
364 device.device_id,
365 'configuration'))
366 else:
367 raise KeyError("topic_type not supported")
368 return topic
370 def __subscribe_commands(self, *,
371 device: Device = None,
372 qos=0,
373 options=None,
374 properties=None):
375 """
376 Subscribes commands based on device configuration. If device argument is
377 omitted the function will subscribe to all topics of already registered
378 _devices.
379 Additionally, it will also check if a matching service group is
380 registered with the client. If nor a warning will be raised.
382 Args:
383 device: Configuration of an IoT device
384 qos: Quality of service can be 0, 1 or 2
385 options: MQTT v5.0 subscribe options
386 properties: MQTT v5.0 properties
388 Returns:
389 None
390 """
391 if Device:
392 if len(device.commands) > 0:
393 topic = self.__create_topic(device=device,
394 topic_type=IoTAMQTTMessageType.CMD)
395 super().subscribe(topic=topic,
396 qos=qos,
397 options=options,
398 properties=properties)
399 else:
400 # call itself but with device argument for all registered _devices
401 for device in self._devices.values():
402 self.__subscribe_commands(device=device,
403 qos=qos,
404 options=options,
405 properties=properties)
407 def get_service_group(self, apikey: str) -> ServiceGroup:
408 """
409 Returns registered service group configuration
411 Args:
412 apikey: Unique APIKey of the service group
414 Returns:
415 ServiceGroup
417 Raises:
418 KeyError: if service group not yet registered
420 Example::
422 from filip.clients.mqtt import MQTTClient
424 mqttc = MQTTClient()
425 group = mqttc.get_service_group(apikey="MyAPIKEY")
426 print(group.json(indent=2))
427 print(type(group))
428 """
429 group = self.service_groups.get(apikey, None)
430 if group is None:
431 raise KeyError("Service group with apikey %s not found!", apikey)
432 return group
434 def add_service_group(self, service_group: Union[ServiceGroup, Dict]):
435 """
436 Registers a device service group with the client
438 Args:
439 service_group: Service group configuration
441 Returns:
442 None
444 Raises:
445 ValueError: if service group already exists
446 """
447 if isinstance(service_group, dict):
448 service_group = ServiceGroup.model_validate(service_group)
449 assert isinstance(service_group, ServiceGroup), \
450 "Invalid content for service group!"
452 if self.service_groups.get(service_group.apikey, None) is None:
453 pass
454 else:
455 raise ValueError("Service group already exists! %s",
456 service_group.apikey)
457 # add service group configuration to the service group list
458 self.service_groups[service_group.apikey] = service_group
460 def delete_service_group(self, apikey):
461 """
462 Unregisters a service group and removes
464 Args:
465 apikey: Unique APIKey of the service group
467 Returns:
468 None
469 """
470 group = self.service_groups.pop(apikey, None)
471 if group:
472 self.logger.info("Successfully unregistered Service Group '%s'!",
473 apikey)
474 else:
475 self.logger.error("Could not unregister service group '%s'!",
476 apikey)
477 raise KeyError("Device not found!")
479 def update_service_group(self, service_group: Union[ServiceGroup, Dict]):
480 """
481 Updates a registered service group configuration. There is no
482 opportunity to only partially update the device. Hence, your service
483 group model should be complete.
485 Args:
486 service_group: Service group configuration
488 Returns:
489 None
491 Raises:
492 KeyError: if service group not yet registered
493 """
494 if isinstance(service_group, dict):
495 service_group = ServiceGroup.model_validate(service_group)
496 assert isinstance(service_group, ServiceGroup), \
497 "Invalid content for service group"
499 if self.service_groups.get(service_group.apikey, None) is None:
500 raise KeyError("Service group not found! %s",
501 service_group.apikey)
502 # add service group configuration to the service group list
503 self.service_groups[service_group.apikey] = service_group
505 def get_device(self, device_id: str) -> Device:
506 """
507 Returns the configuration of a registered device.
509 Args:
510 device_id: Id of the requested device
512 Returns:
513 Device: Device model of the requested device
515 Raises:
516 KeyError: if requested device is not registered with the client
518 Example::
520 from filip.clients.mqtt import MQTTClient
522 mqttc = MQTTClient()
523 device = mqttc.get_device(device_id="MyDeviceId")
524 print(device.json(indent=2))
525 print(type(device))
526 """
527 return self._devices[device_id]
529 def add_device(self,
530 device: Union[Device, Dict],
531 qos=0,
532 options=None,
533 properties=None):
534 """
535 Registers a device config with the mqtt client. Subsequently,
536 the client will magically subscribe to the corresponding topics based
537 on the device config and any matching registered service group config
538 if exists.
540 Note:
541 To register the device config only with this client is not
542 sufficient for data streaming the configuration also needs to be
543 registered with IoTA-Agent.
545 Args:
546 device: Configuration of an IoT device
547 qos: Quality of service can be 0, 1 or 2
548 options: MQTT v5.0 subscribe options
549 properties: MQTT v5.0 properties
551 Returns:
552 None
554 Raises:
555 ValueError: if device configuration already exists
556 """
557 device = self.__validate_device(device=device)
559 if self._devices.get(device.device_id, None) is None:
560 pass
561 else:
562 raise ValueError("Device already exists! %s", device.device_id)
563 # add device configuration to the device list
564 self._devices[device.device_id] = device
565 # subscribes to the command topic
566 self.__subscribe_commands(device=device,
567 qos=qos,
568 options=options,
569 properties=properties)
571 def delete_device(self, device_id: str):
572 """
573 Unregisters a device and removes its subscriptions and callbacks
575 Args:
576 device_id: id of and IoT device
578 Returns:
579 None
580 """
581 device = self._devices.pop(device_id, None)
582 if device:
583 topic = self.__create_topic(device=device,
584 topic_type=IoTAMQTTMessageType.CMD)
585 self.unsubscribe(topic=topic)
586 self.message_callback_remove(sub=topic)
587 self.logger.info("Successfully unregistered Device '%s'!",
588 device_id)
589 else:
590 self.logger.error("Could not unregister device '%s'", device_id)
592 def update_device(self,
593 device: Union[Device, Dict],
594 qos=0,
595 options=None,
596 properties=None):
597 """
598 Updates a registered device configuration. There is no opportunity
599 to only partially update the device. Hence, your device model should
600 be complete.
602 Args:
603 device: Configuration of an IoT device
604 qos: Quality of service can be 0, 1 or 2
605 options: MQTT v5.0 subscribe options
606 properties: MQTT v5.0 properties
608 Returns:
609 None
611 Raises:
612 KeyError: if device not yet registered
613 """
614 device = self.__validate_device(device=device)
616 if self._devices.get(device.device_id, None) is None:
617 raise KeyError("Device not found! %s", device.device_id)
619 # update device configuration in the device list
620 self._devices[device.device_id] = device
621 # subscribes to the command topic
622 self.__subscribe_commands(device=device,
623 qos=qos,
624 options=options,
625 properties=properties)
627 def add_command_callback(self, device_id: str, callback: Callable):
628 """
629 Adds callback function for a device configuration.
631 Args:
632 device_id:
633 id of and IoT device
634 callback:
635 function that will be called for incoming commands.
636 This function should have the following format:
638 Example::
640 def on_command(client, obj, msg):
641 apikey, device_id, payload = \
642 client.encoder.decode_message(msg=msg)
644 # do_something with the message.
645 # For instance write into a queue.
647 # acknowledge a command. Here command are usually single
648 # messages. The first key is equal to the commands name.
649 client.publish(device_id=device_id,
650 command_name=next(iter(payload)),
651 payload=payload)
653 mqttc.add_command_callback(device_id="MyDevice",
654 callback=on_command)
656 Returns:
657 None
658 """
659 device = self._devices.get(device_id, None)
660 if device is None:
661 raise KeyError("Device does not exist! %s", device_id)
662 self.__subscribe_commands(device=device)
663 topic = self.__create_topic(device=device,
664 topic_type=IoTAMQTTMessageType.CMD)
665 self.message_callback_add(topic, callback)
667 def publish(self,
668 topic=None,
669 payload: Union[Dict, Any] = None,
670 qos: int = 0,
671 retain: bool = False,
672 properties=None,
673 device_id: str = None,
674 attribute_name: str = None,
675 command_name: str = None,
676 timestamp: bool = False
677 ):
678 """
679 Publish an MQTT Message to a specified topic. If you want to publish
680 a device specific message to a device use the device_id argument for
681 multi-measurement. The function will then automatically validate
682 against the registered device configuration if the payload keys are
683 valid. If you want to publish a single measurement the attribute_name
684 argument is required as well.
686 Note:
687 If the device_id argument is set, the topic argument will be
688 ignored.
690 Args:
691 topic:
692 The topic that the message should be published on.
693 payload:
694 The actual message to send. If not given, or set to None a
695 zero length message will be used. Passing an int or float will
696 result in the payload being converted to a string
697 representing that number. If you wish to send a true
698 int/float, use struct.pack() to create the
699 payload you require. For publishing to a device use a dict
700 containing the object_ids as keys.
701 qos:
702 The quality of service level to use.
703 retain:
704 If set to true, the message will be set as the "last known
705 good"/retained message for the topic.
706 properties:
707 (MQTT v5.0 only) the MQTT v5.0 properties to be included.
708 Use the Properties class.
709 device_id:
710 Id of the IoT device you want to publish for. The topics will
711 automatically created. If set, the message type will be
712 assumed to be multi measurement.
713 attribute_name:
714 Name of an attribute of the device. Do only use this for
715 single measurements. If set, `command_name` must
716 be omitted.
717 command_name:
718 Name of a command of the device that should be acknowledged. If
719 set `attribute_name` must be omitted.
720 timestamp:
721 If `true` the client will generate a valid timestamp based on
722 utc and added to the multi measurement payload.
723 If a `timeInstant` is already contained in the
724 message payload it will not overwritten.
726 Returns:
727 None
729 Raises:
730 KeyError: if device configuration is not registered with client
731 ValueError: if the passed arguments are inconsistent or a
732 timestamp does not match the ISO 8601 format.
733 AssertionError: if the message payload does not match the device
734 configuration.
735 """
737 # TODO: time stamps are not tested yet
739 if device_id:
740 device = self.get_device(device_id=device_id)
742 # create message for multi measurement payload
743 if attribute_name is None and command_name is None:
744 assert isinstance(payload, dict), \
745 "Payload must be a dictionary"
747 if timestamp and 'timeInstant' not in payload.keys():
748 payload["timeInstant"] = datetime.utcnow()
749 # validate if dict keys match device configuration
751 msg_payload = payload.copy()
752 for key in payload.keys():
753 for attr in device.attributes:
754 key_constraint = key == "timeInstant"
755 def elif_action(msg): None
757 if attr.object_id is not None:
758 key_constraint = key_constraint or (key in attr.object_id)
759 def elif_action(msg): msg[attr.object_id] = msg.pop(key)
761 #could be made more compact by pulling up the second condition
762 #but would probably make the code less readable...
763 if key_constraint:
764 break
766 elif key == attr.name:
767 elif_action(msg_payload)
768 break
770 else:
771 err_msg = f"Attribute key '{key}' is not allowed " \
772 f"in the message payload for this " \
773 f"device configuration with device_id " \
774 f"'{device_id}'"
775 raise KeyError(err_msg)
776 topic = self.__create_topic(
777 device=device,
778 topic_type=IoTAMQTTMessageType.MULTI)
779 payload = self._encoders[device.protocol].encode_msg(
780 device_id=device_id,
781 payload=msg_payload,
782 msg_type=IoTAMQTTMessageType.MULTI)
784 # create message for command acknowledgement
785 elif attribute_name is None and command_name:
786 assert isinstance(payload, Dict), "Payload must be a dictionary"
787 assert len(payload.keys()) == 1, \
788 "Cannot acknowledge multiple commands simultaneously"
789 assert next(iter(payload.keys())) in \
790 [cmd.name for cmd in device.commands], \
791 "Unknown command for this device!"
792 topic = self.__create_topic(
793 device=device,
794 topic_type=IoTAMQTTMessageType.CMDEXE)
795 payload = self._encoders[device.protocol].encode_msg(
796 device_id=device_id,
797 payload=payload,
798 msg_type=IoTAMQTTMessageType.CMDEXE)
800 # create message for single measurement
801 elif attribute_name and command_name is None:
802 topic = self.__create_topic(
803 device=device,
804 topic_type=IoTAMQTTMessageType.SINGLE,
805 attribute=attribute_name)
806 payload = self._encoders[device.protocol].encode_msg(
807 device_id=device_id,
808 payload=payload,
809 msg_type=IoTAMQTTMessageType.SINGLE)
810 else:
811 raise ValueError("Inconsistent arguments!")
813 super().publish(topic=topic,
814 payload=payload,
815 qos=qos,
816 retain=retain,
817 properties=properties)
819 def subscribe(self, topic=None, qos=0, options=None, properties=None):
820 """
821 Extends the normal subscribe function of the paho.mqtt.client.
822 If the topic argument is omitted the client will subscribe to all
823 registered device command topics.
825 Args:
826 topic:
827 A string specifying the subscription topic to subscribe to.
828 qos:
829 The desired quality of service level for the subscription.
830 Defaults to 0.
831 options: Not used.
832 properties: Not used.
834 Returns:
835 None
836 """
837 if topic:
838 super().subscribe(topic=topic,
839 qos=qos,
840 options=options,
841 properties=properties)
842 else:
843 for device in self._devices.values():
844 self.__subscribe_commands(device=device,
845 qos=qos,
846 options=options,
847 properties=properties)