Coverage for filip/clients/mqtt/client.py: 90%

172 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-11-20 16:54 +0000

1""" 

2Implementation of an extended MQTT client that automatically handles the 

3topic subscription for FIWARE's IoT communication pattern. 

4""" 

5import itertools 

6import logging 

7import warnings 

8from datetime import datetime 

9from typing import Any, Callable, Dict, List, Tuple, Union 

10 

11import paho.mqtt.client as mqtt 

12 

13from filip.clients.mqtt.encoder import BaseEncoder, Json, Ultralight 

14from filip.models.mqtt import IoTAMQTTMessageType 

15from filip.models.ngsi_v2.iot import \ 

16 Device, \ 

17 PayloadProtocol, \ 

18 ServiceGroup, \ 

19 TransportProtocol 

20 

21 

22class IoTAMQTTClient(mqtt.Client): 

23 """ 

24 This class is an extension to the MQTT client from the well established 

25 Eclipse Paho™ MQTT Python Client. The official documentation is located 

26 here: https://github.com/eclipse/paho.mqtt.python 

27 

28 The class adds additional functions to facilitate the communication to 

29 FIWARE's IoT-Agent via MQTT. It magically generates and subscribes to all 

30 important topics that are necessary to establish a 

31 bi-directional communication with the IoT-Agent. 

32 

33 Note: 

34 The client does not sync the device configuration with the IoT-Agent. 

35 This is up to the user! 

36 

37 Note: 

38 The extension does not effect the normal workflow or any other 

39 functionality known from the original client. 

40 

41 The client does not yet support the retrieval of command 

42 configurations via mqtt documented here: 

43 https://fiware-iotagent-json.readthedocs.io/en/latest/usermanual/index.html#api-overview 

44 

45 Example: 

46 This example shows the basic usage of the client. It does not 

47 demonstrate its whole capabilities. Please check the single methods 

48 for more details. Please also keep in mind that this still requires 

49 provisioning of the device in the IoT-Agent and sending the commands 

50 via the context broker. For more details check the additional example 

51 section:: 

52 

53 from filip.models.ngsi_v2.iot import Device, DeviceAttribute, DeviceCommand, ServiceGroup 

54 from filip.clients.mqtt import MQTTClient 

55 from filip.clients.mqtt.encoder import IoTA_Json 

56 

57 # create a device configuration 

58 device_attr = DeviceAttribute(name='temperature', 

59 object_id='t', 

60 type="Number") 

61 device_command = DeviceCommand(name='heater', type="Boolean") 

62 device = Device(device_id='MyDevice', 

63 entity_name='MyDevice', 

64 entity_type='Thing', 

65 protocol='IoTA-JSON', 

66 transport='MQTT', 

67 apikey=YourApiKey, 

68 attributes=[device_attr], 

69 commands=[device_command]) 

70 

71 service_group = ServiceGroup(apikey="YourApiKey", resource="/iot") 

72 

73 mqttc = MQTTClient(client_id="YourID", 

74 userdata=None, 

75 protocol=mqtt.MQTTv5, 

76 transport="tcp", 

77 _devices = [device], 

78 service_groups = [service_group]) 

79 

80 # create a callback function that will be called for incoming 

81 # commands and add it for a single device 

82 def on_command(client, obj, msg): 

83 apikey, device_id, payload = \ 

84 client.get_encoder().decode_message(msg=msg) 

85 

86 # do_something with the message. 

87 # For instance write into a queue. 

88 

89 # acknowledge a command 

90 client.publish(device_id=device_id, 

91 command_name=next(iter(payload)) 

92 payload=payload) 

93 

94 mqttc.add_command_callback(on_command) 

95 

96 # create a non blocking loop 

97 mqttc.loop_start() 

98 

99 # publish a multi-measurement for a device 

100 mqttc.publish(device_id='MyDevice', payload={'t': 50}) 

101 

102 # publish a single measurement for a device 

103 mqttc.publish(device_id='MyDevice', 

104 attribute_name='temperature', 

105 payload=50) 

106 

107 # adding timestamps to measurements using the client 

108 

109 

110 # adding timestamps to measurements in payload 

111 from datetime import datetime 

112 

113 mqttc.publish(device_id='MyDevice', 

114 payload={'t': 50, 

115 'timeInstant': datetime.now().astimezone().isoformat()}, 

116 timestamp=true) 

117 

118 # stop network loop and disconnect cleanly 

119 mqttc.loop_stop() 

120 mqttc.disconnect() 

121 

122 """ 

123 

124 def __init__(self, 

125 client_id="", 

126 clean_session=None, 

127 userdata=None, 

128 protocol=mqtt.MQTTv311, 

129 transport="tcp", 

130 callback_api_version=mqtt.CallbackAPIVersion.VERSION2, 

131 devices: List[Device] = None, 

132 service_groups: List[ServiceGroup] = None, 

133 custom_encoder: Dict[str, BaseEncoder] = None): 

134 """ 

135 Args: 

136 client_id: 

137 Unique client id string used when connecting 

138 to the broker. If client_id is zero length or None, then the 

139 behaviour is defined by which protocol version is in use. If 

140 using MQTT v3.1.1, then a zero length client id will be sent 

141 to the broker and the broker will generate a random for the 

142 client. If using MQTT v3.1 then an id will be randomly 

143 generated. In both cases, clean_session must be True. 

144 If this is not the case a ValueError will be raised. 

145 clean_session: 

146 boolean that determines the client type. If True, 

147 the broker will remove all information about this client when it 

148 disconnects. If False, the client is a persistent client and 

149 subscription information and queued messages will be retained 

150 when the client disconnects. 

151 Note that a client will never discard its own outgoing 

152 messages on disconnect. Calling connect() or reconnect() will 

153 cause the messages to be resent. Use reinitialise() to reset 

154 a client to its original state. The clean_session argument 

155 only applies to MQTT versions v3.1.1 and v3.1. It is not 

156 accepted if the MQTT version is v5.0 - use the clean_start 

157 argument on connect() instead. 

158 userdata: 

159 defined data of any type that is passed as the "userdata" 

160 parameter to callbacks. It may be updated at a later point 

161 with the user_data_set() function. 

162 protocol: 

163 explicit setting of the MQTT version to use for this client. 

164 Can be paho.mqtt.client.MQTTv311 (v3.1.1), 

165 paho.mqtt.client.MQTTv31 (v3.1) or paho.mqtt.client.MQTTv5 

166 (v5.0), with the default being v3.1.1. 

167 transport: 

168 Set to "websockets" to use WebSockets as the transport 

169 mechanism. Set to "tcp" to use raw TCP, which is the default. 

170 devices: 

171 List of device configurations that will be registered 

172 with the client. Consequently, the client will be able to 

173 subscribe to all registered device topics. Furthermore, 

174 after registration messages can simply published by the 

175 _devices id. 

176 service_groups: 

177 List of service group configurations that will be registered 

178 with the client. These should be known upon subscribing 

179 because the client will check for a matching service group if 

180 this is not known or registered with the IoT-Agent service 

181 the receiving of commands will fail. Please check the 

182 official documentation of the IoT-Agents API for more details. 

183 custom_encoder: 

184 Custom encoder class that will automatically parse the supported 

185 payload formats to a dictionary and vice versa. This 

186 essentially saves boiler plate code. 

187 """ 

188 # initialize parent client 

189 super().__init__(client_id=client_id, 

190 clean_session=clean_session, 

191 userdata=userdata, 

192 protocol=protocol, 

193 callback_api_version=callback_api_version, 

194 transport=transport) 

195 

196 # setup logging functionality 

197 self.logger = logging.getLogger( 

198 name=f"{self.__class__.__name__}") 

199 self.logger.addHandler(logging.NullHandler()) 

200 self.enable_logger(self.logger) 

201 

202 # create dictionary holding the registered service groups 

203 self.service_groups: Dict[Tuple[str, str], ServiceGroup] 

204 if service_groups: 

205 self.service_groups = {gr.apikey: gr for gr in service_groups} 

206 else: 

207 self.service_groups = {} 

208 

209 # create dictionary holding the registered device configurations 

210 # check if all _devices have the right transport protocol 

211 self._devices: Dict[str, Device] = {} 

212 if devices: 

213 self.devices = devices 

214 

215 # create dict with available encoders 

216 self._encoders = {'IoTA-JSON': Json(), 

217 'PDI-IoTA-UltraLight': Ultralight()} 

218 

219 # add custom encoder for message parsing 

220 if custom_encoder: 

221 self.add_encoder(custom_encoder) 

222 

223 @property 

224 def devices(self): 

225 """ 

226 Returns as list of all registered device configurations 

227 Returns: 

228 

229 """ 

230 return list(self._devices.values()) 

231 

232 @devices.setter 

233 def devices(self, devices: List[Device]): 

234 """ 

235 Sets list of device configurations 

236 

237 Args: 

238 devices: List of device configurations 

239 

240 Returns: 

241 None 

242 

243 Raises: 

244 ValueError: if duplicate device id was found 

245 """ 

246 for device in devices: 

247 try: 

248 self.add_device(device=device) 

249 except ValueError: 

250 raise ValueError(f"Duplicate device_id: {device.device_id}") 

251 

252 def get_encoder(self, encoder: Union[str, PayloadProtocol]): 

253 """ 

254 Returns the encoder by key 

255 

256 Args: 

257 encoder: encoder name 

258 

259 Returns: 

260 Subclass of Baseencoder 

261 """ 

262 return self._encoders.get(encoder) 

263 

264 def add_encoder(self, encoder: Dict[str, BaseEncoder]): 

265 for value in encoder.values(): 

266 assert isinstance(value, BaseEncoder), \ 

267 f"Encoder must be a subclass of {type(BaseEncoder)}" 

268 

269 self._encoders.update(encoder) 

270 

271 def __validate_device(self, device: Union[Device, Dict]) -> Device: 

272 """ 

273 Validates configuration of an IoT Device 

274 

275 Args: 

276 device: device model to check on 

277 

278 Returns: 

279 Device: validated model 

280 

281 Raises: 

282 AssertionError: for faulty configurations 

283 """ 

284 if isinstance(device, dict): 

285 device = Device.model_validate(device) 

286 

287 assert isinstance(device, Device), "Invalid device configuration!" 

288 

289 assert device.transport == TransportProtocol.MQTT, \ 

290 "Unsupported transport protocol found in device configuration!" 

291 

292 if device.apikey in self.service_groups.keys(): 

293 pass 

294 # check if matching service group is registered 

295 else: 

296 msg = "Could not find matching service group! " \ 

297 "Communication may not work correctly!" 

298 self.logger.warning(msg=msg) 

299 warnings.warn(message=msg) 

300 

301 return device 

302 

303 def __create_topic(self, 

304 *, 

305 topic_type: IoTAMQTTMessageType, 

306 device: Device, 

307 attribute: str = None) -> str: 

308 """ 

309 Creates a topic for a device configuration based on the requested 

310 topic type. 

311 

312 Args: 

313 device: 

314 Configuration of an IoT device 

315 topic_type: 

316 type of the topic to be created, 

317 'multi' for topics that the device is suppose to publish on. 

318 'single' for topics that the device is suppose to publish on. 

319 'cmd' for topic the device is expecting its commands on. 

320 'cmdexe' for topic the device can acknowledge its commands on. 

321 'configuration' for topic the device can request command 

322 configurations on 

323 attribute: 

324 attribute needs to be set for single measurements 

325 Returns: 

326 string with topic 

327 

328 Raises: 

329 KeyError: 

330 If unknown message type is used 

331 ValueError: 

332 If attribute name is missing for single measurements 

333 """ 

334 if topic_type == IoTAMQTTMessageType.MULTI: 

335 topic = '/'.join((self._encoders[device.protocol].prefix, 

336 device.apikey, 

337 device.device_id, 

338 'attrs')) 

339 elif topic_type == IoTAMQTTMessageType.SINGLE: 

340 if attribute: 

341 attr = next(attr for attr in device.attributes 

342 if attr.name == attribute) 

343 if attr.object_id: 

344 attr_suffix = attr.object_id 

345 else: 

346 attr_suffix = attr.name 

347 topic = '/'.join((self._encoders[device.protocol].prefix, 

348 device.apikey, 

349 device.device_id, 

350 'attrs', 

351 attr_suffix)) 

352 else: 

353 raise ValueError("Missing argument name for single measurement") 

354 elif topic_type == IoTAMQTTMessageType.CMD: 

355 topic = '/' + '/'.join((device.apikey, device.device_id, 'cmd')) 

356 elif topic_type == IoTAMQTTMessageType.CMDEXE: 

357 topic = '/'.join((self._encoders[device.protocol].prefix, 

358 device.apikey, 

359 device.device_id, 

360 'cmdexe')) 

361 elif topic_type == IoTAMQTTMessageType.CONFIG: 

362 topic = '/'.join((self._encoders[device.protocol].prefix, 

363 device.apikey, 

364 device.device_id, 

365 'configuration')) 

366 else: 

367 raise KeyError("topic_type not supported") 

368 return topic 

369 

370 def __subscribe_commands(self, *, 

371 device: Device = None, 

372 qos=0, 

373 options=None, 

374 properties=None): 

375 """ 

376 Subscribes commands based on device configuration. If device argument is 

377 omitted the function will subscribe to all topics of already registered 

378 _devices. 

379 Additionally, it will also check if a matching service group is 

380 registered with the client. If nor a warning will be raised. 

381 

382 Args: 

383 device: Configuration of an IoT device 

384 qos: Quality of service can be 0, 1 or 2 

385 options: MQTT v5.0 subscribe options 

386 properties: MQTT v5.0 properties 

387 

388 Returns: 

389 None 

390 """ 

391 if Device: 

392 if len(device.commands) > 0: 

393 topic = self.__create_topic(device=device, 

394 topic_type=IoTAMQTTMessageType.CMD) 

395 super().subscribe(topic=topic, 

396 qos=qos, 

397 options=options, 

398 properties=properties) 

399 else: 

400 # call itself but with device argument for all registered _devices 

401 for device in self._devices.values(): 

402 self.__subscribe_commands(device=device, 

403 qos=qos, 

404 options=options, 

405 properties=properties) 

406 

407 def get_service_group(self, apikey: str) -> ServiceGroup: 

408 """ 

409 Returns registered service group configuration 

410 

411 Args: 

412 apikey: Unique APIKey of the service group 

413 

414 Returns: 

415 ServiceGroup 

416 

417 Raises: 

418 KeyError: if service group not yet registered 

419 

420 Example:: 

421 

422 from filip.clients.mqtt import MQTTClient 

423 

424 mqttc = MQTTClient() 

425 group = mqttc.get_service_group(apikey="MyAPIKEY") 

426 print(group.json(indent=2)) 

427 print(type(group)) 

428 """ 

429 group = self.service_groups.get(apikey, None) 

430 if group is None: 

431 raise KeyError("Service group with apikey %s not found!", apikey) 

432 return group 

433 

434 def add_service_group(self, service_group: Union[ServiceGroup, Dict]): 

435 """ 

436 Registers a device service group with the client 

437 

438 Args: 

439 service_group: Service group configuration 

440 

441 Returns: 

442 None 

443 

444 Raises: 

445 ValueError: if service group already exists 

446 """ 

447 if isinstance(service_group, dict): 

448 service_group = ServiceGroup.model_validate(service_group) 

449 assert isinstance(service_group, ServiceGroup), \ 

450 "Invalid content for service group!" 

451 

452 if self.service_groups.get(service_group.apikey, None) is None: 

453 pass 

454 else: 

455 raise ValueError("Service group already exists! %s", 

456 service_group.apikey) 

457 # add service group configuration to the service group list 

458 self.service_groups[service_group.apikey] = service_group 

459 

460 def delete_service_group(self, apikey): 

461 """ 

462 Unregisters a service group and removes 

463 

464 Args: 

465 apikey: Unique APIKey of the service group 

466 

467 Returns: 

468 None 

469 """ 

470 group = self.service_groups.pop(apikey, None) 

471 if group: 

472 self.logger.info("Successfully unregistered Service Group '%s'!", 

473 apikey) 

474 else: 

475 self.logger.error("Could not unregister service group '%s'!", 

476 apikey) 

477 raise KeyError("Device not found!") 

478 

479 def update_service_group(self, service_group: Union[ServiceGroup, Dict]): 

480 """ 

481 Updates a registered service group configuration. There is no 

482 opportunity to only partially update the device. Hence, your service 

483 group model should be complete. 

484 

485 Args: 

486 service_group: Service group configuration 

487 

488 Returns: 

489 None 

490 

491 Raises: 

492 KeyError: if service group not yet registered 

493 """ 

494 if isinstance(service_group, dict): 

495 service_group = ServiceGroup.model_validate(service_group) 

496 assert isinstance(service_group, ServiceGroup), \ 

497 "Invalid content for service group" 

498 

499 if self.service_groups.get(service_group.apikey, None) is None: 

500 raise KeyError("Service group not found! %s", 

501 service_group.apikey) 

502 # add service group configuration to the service group list 

503 self.service_groups[service_group.apikey] = service_group 

504 

505 def get_device(self, device_id: str) -> Device: 

506 """ 

507 Returns the configuration of a registered device. 

508 

509 Args: 

510 device_id: Id of the requested device 

511 

512 Returns: 

513 Device: Device model of the requested device 

514 

515 Raises: 

516 KeyError: if requested device is not registered with the client 

517 

518 Example:: 

519 

520 from filip.clients.mqtt import MQTTClient 

521 

522 mqttc = MQTTClient() 

523 device = mqttc.get_device(device_id="MyDeviceId") 

524 print(device.json(indent=2)) 

525 print(type(device)) 

526 """ 

527 return self._devices[device_id] 

528 

529 def add_device(self, 

530 device: Union[Device, Dict], 

531 qos=0, 

532 options=None, 

533 properties=None): 

534 """ 

535 Registers a device config with the mqtt client. Subsequently, 

536 the client will magically subscribe to the corresponding topics based 

537 on the device config and any matching registered service group config 

538 if exists. 

539 

540 Note: 

541 To register the device config only with this client is not 

542 sufficient for data streaming the configuration also needs to be 

543 registered with IoTA-Agent. 

544 

545 Args: 

546 device: Configuration of an IoT device 

547 qos: Quality of service can be 0, 1 or 2 

548 options: MQTT v5.0 subscribe options 

549 properties: MQTT v5.0 properties 

550 

551 Returns: 

552 None 

553 

554 Raises: 

555 ValueError: if device configuration already exists 

556 """ 

557 device = self.__validate_device(device=device) 

558 

559 if self._devices.get(device.device_id, None) is None: 

560 pass 

561 else: 

562 raise ValueError("Device already exists! %s", device.device_id) 

563 # add device configuration to the device list 

564 self._devices[device.device_id] = device 

565 # subscribes to the command topic 

566 self.__subscribe_commands(device=device, 

567 qos=qos, 

568 options=options, 

569 properties=properties) 

570 

571 def delete_device(self, device_id: str): 

572 """ 

573 Unregisters a device and removes its subscriptions and callbacks 

574 

575 Args: 

576 device_id: id of and IoT device 

577 

578 Returns: 

579 None 

580 """ 

581 device = self._devices.pop(device_id, None) 

582 if device: 

583 topic = self.__create_topic(device=device, 

584 topic_type=IoTAMQTTMessageType.CMD) 

585 self.unsubscribe(topic=topic) 

586 self.message_callback_remove(sub=topic) 

587 self.logger.info("Successfully unregistered Device '%s'!", 

588 device_id) 

589 else: 

590 self.logger.error("Could not unregister device '%s'", device_id) 

591 

592 def update_device(self, 

593 device: Union[Device, Dict], 

594 qos=0, 

595 options=None, 

596 properties=None): 

597 """ 

598 Updates a registered device configuration. There is no opportunity 

599 to only partially update the device. Hence, your device model should 

600 be complete. 

601 

602 Args: 

603 device: Configuration of an IoT device 

604 qos: Quality of service can be 0, 1 or 2 

605 options: MQTT v5.0 subscribe options 

606 properties: MQTT v5.0 properties 

607 

608 Returns: 

609 None 

610 

611 Raises: 

612 KeyError: if device not yet registered 

613 """ 

614 device = self.__validate_device(device=device) 

615 

616 if self._devices.get(device.device_id, None) is None: 

617 raise KeyError("Device not found! %s", device.device_id) 

618 

619 # update device configuration in the device list 

620 self._devices[device.device_id] = device 

621 # subscribes to the command topic 

622 self.__subscribe_commands(device=device, 

623 qos=qos, 

624 options=options, 

625 properties=properties) 

626 

627 def add_command_callback(self, device_id: str, callback: Callable): 

628 """ 

629 Adds callback function for a device configuration. 

630 

631 Args: 

632 device_id: 

633 id of and IoT device 

634 callback: 

635 function that will be called for incoming commands. 

636 This function should have the following format: 

637 

638 Example:: 

639 

640 def on_command(client, obj, msg): 

641 apikey, device_id, payload = \ 

642 client.encoder.decode_message(msg=msg) 

643 

644 # do_something with the message. 

645 # For instance write into a queue. 

646 

647 # acknowledge a command. Here command are usually single 

648 # messages. The first key is equal to the commands name. 

649 client.publish(device_id=device_id, 

650 command_name=next(iter(payload)), 

651 payload=payload) 

652 

653 mqttc.add_command_callback(device_id="MyDevice", 

654 callback=on_command) 

655 

656 Returns: 

657 None 

658 """ 

659 device = self._devices.get(device_id, None) 

660 if device is None: 

661 raise KeyError("Device does not exist! %s", device_id) 

662 self.__subscribe_commands(device=device) 

663 topic = self.__create_topic(device=device, 

664 topic_type=IoTAMQTTMessageType.CMD) 

665 self.message_callback_add(topic, callback) 

666 

667 def publish(self, 

668 topic=None, 

669 payload: Union[Dict, Any] = None, 

670 qos: int = 0, 

671 retain: bool = False, 

672 properties=None, 

673 device_id: str = None, 

674 attribute_name: str = None, 

675 command_name: str = None, 

676 timestamp: bool = False 

677 ): 

678 """ 

679 Publish an MQTT Message to a specified topic. If you want to publish 

680 a device specific message to a device use the device_id argument for 

681 multi-measurement. The function will then automatically validate 

682 against the registered device configuration if the payload keys are 

683 valid. If you want to publish a single measurement the attribute_name 

684 argument is required as well. 

685 

686 Note: 

687 If the device_id argument is set, the topic argument will be 

688 ignored. 

689 

690 Args: 

691 topic: 

692 The topic that the message should be published on. 

693 payload: 

694 The actual message to send. If not given, or set to None a 

695 zero length message will be used. Passing an int or float will 

696 result in the payload being converted to a string 

697 representing that number. If you wish to send a true 

698 int/float, use struct.pack() to create the 

699 payload you require. For publishing to a device use a dict 

700 containing the object_ids as keys. 

701 qos: 

702 The quality of service level to use. 

703 retain: 

704 If set to true, the message will be set as the "last known 

705 good"/retained message for the topic. 

706 properties: 

707 (MQTT v5.0 only) the MQTT v5.0 properties to be included. 

708 Use the Properties class. 

709 device_id: 

710 Id of the IoT device you want to publish for. The topics will 

711 automatically created. If set, the message type will be 

712 assumed to be multi measurement. 

713 attribute_name: 

714 Name of an attribute of the device. Do only use this for 

715 single measurements. If set, `command_name` must 

716 be omitted. 

717 command_name: 

718 Name of a command of the device that should be acknowledged. If 

719 set `attribute_name` must be omitted. 

720 timestamp: 

721 If `true` the client will generate a valid timestamp based on 

722 utc and added to the multi measurement payload. 

723 If a `timeInstant` is already contained in the 

724 message payload it will not overwritten. 

725 

726 Returns: 

727 None 

728 

729 Raises: 

730 KeyError: if device configuration is not registered with client 

731 ValueError: if the passed arguments are inconsistent or a 

732 timestamp does not match the ISO 8601 format. 

733 AssertionError: if the message payload does not match the device 

734 configuration. 

735 """ 

736 

737 # TODO: time stamps are not tested yet 

738 

739 if device_id: 

740 device = self.get_device(device_id=device_id) 

741 

742 # create message for multi measurement payload 

743 if attribute_name is None and command_name is None: 

744 assert isinstance(payload, dict), \ 

745 "Payload must be a dictionary" 

746 

747 if timestamp and 'timeInstant' not in payload.keys(): 

748 payload["timeInstant"] = datetime.utcnow() 

749 # validate if dict keys match device configuration 

750 

751 msg_payload = payload.copy() 

752 for key in payload.keys(): 

753 for attr in device.attributes: 

754 key_constraint = key == "timeInstant" 

755 def elif_action(msg): None 

756 

757 if attr.object_id is not None: 

758 key_constraint = key_constraint or (key in attr.object_id) 

759 def elif_action(msg): msg[attr.object_id] = msg.pop(key) 

760 

761 #could be made more compact by pulling up the second condition 

762 #but would probably make the code less readable... 

763 if key_constraint: 

764 break 

765 

766 elif key == attr.name: 

767 elif_action(msg_payload) 

768 break 

769 

770 else: 

771 err_msg = f"Attribute key '{key}' is not allowed " \ 

772 f"in the message payload for this " \ 

773 f"device configuration with device_id " \ 

774 f"'{device_id}'" 

775 raise KeyError(err_msg) 

776 topic = self.__create_topic( 

777 device=device, 

778 topic_type=IoTAMQTTMessageType.MULTI) 

779 payload = self._encoders[device.protocol].encode_msg( 

780 device_id=device_id, 

781 payload=msg_payload, 

782 msg_type=IoTAMQTTMessageType.MULTI) 

783 

784 # create message for command acknowledgement 

785 elif attribute_name is None and command_name: 

786 assert isinstance(payload, Dict), "Payload must be a dictionary" 

787 assert len(payload.keys()) == 1, \ 

788 "Cannot acknowledge multiple commands simultaneously" 

789 assert next(iter(payload.keys())) in \ 

790 [cmd.name for cmd in device.commands], \ 

791 "Unknown command for this device!" 

792 topic = self.__create_topic( 

793 device=device, 

794 topic_type=IoTAMQTTMessageType.CMDEXE) 

795 payload = self._encoders[device.protocol].encode_msg( 

796 device_id=device_id, 

797 payload=payload, 

798 msg_type=IoTAMQTTMessageType.CMDEXE) 

799 

800 # create message for single measurement 

801 elif attribute_name and command_name is None: 

802 topic = self.__create_topic( 

803 device=device, 

804 topic_type=IoTAMQTTMessageType.SINGLE, 

805 attribute=attribute_name) 

806 payload = self._encoders[device.protocol].encode_msg( 

807 device_id=device_id, 

808 payload=payload, 

809 msg_type=IoTAMQTTMessageType.SINGLE) 

810 else: 

811 raise ValueError("Inconsistent arguments!") 

812 

813 super().publish(topic=topic, 

814 payload=payload, 

815 qos=qos, 

816 retain=retain, 

817 properties=properties) 

818 

819 def subscribe(self, topic=None, qos=0, options=None, properties=None): 

820 """ 

821 Extends the normal subscribe function of the paho.mqtt.client. 

822 If the topic argument is omitted the client will subscribe to all 

823 registered device command topics. 

824 

825 Args: 

826 topic: 

827 A string specifying the subscription topic to subscribe to. 

828 qos: 

829 The desired quality of service level for the subscription. 

830 Defaults to 0. 

831 options: Not used. 

832 properties: Not used. 

833 

834 Returns: 

835 None 

836 """ 

837 if topic: 

838 super().subscribe(topic=topic, 

839 qos=qos, 

840 options=options, 

841 properties=properties) 

842 else: 

843 for device in self._devices.values(): 

844 self.__subscribe_commands(device=device, 

845 qos=qos, 

846 options=options, 

847 properties=properties)