Coverage for filip/clients/mqtt/client.py: 89%

174 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-03-10 13:43 +0000

1""" 

2Implementation of an extended MQTT client that automatically handles the 

3topic subscription for FIWARE's IoT communication pattern. 

4""" 

5 

6import itertools 

7import logging 

8import warnings 

9from datetime import datetime 

10from typing import Any, Callable, Dict, List, Tuple, Union 

11 

12import paho.mqtt.client as mqtt 

13 

14from filip.clients.mqtt.encoder import BaseEncoder, Json, Ultralight 

15from filip.models.mqtt import IoTAMQTTMessageType 

16from filip.models.ngsi_v2.iot import ( 

17 Device, 

18 PayloadProtocol, 

19 ServiceGroup, 

20 TransportProtocol, 

21) 

22 

23 

24class IoTAMQTTClient(mqtt.Client): 

25 """ 

26 This class is an extension to the MQTT client from the well established 

27 Eclipse Paho™ MQTT Python Client. The official documentation is located 

28 here: https://github.com/eclipse/paho.mqtt.python 

29 

30 The class adds additional functions to facilitate the communication to 

31 FIWARE's IoT-Agent via MQTT. It magically generates and subscribes to all 

32 important topics that are necessary to establish a 

33 bi-directional communication with the IoT-Agent. 

34 

35 Note: 

36 The client does not sync the device configuration with the IoT-Agent. 

37 This is up to the user! 

38 

39 Note: 

40 The extension does not effect the normal workflow or any other 

41 functionality known from the original client. 

42 

43 The client does not yet support the retrieval of command 

44 configurations via mqtt documented here: 

45 https://fiware-iotagent-json.readthedocs.io/en/latest/usermanual/index.html#api-overview 

46 

47 Example: 

48 This example shows the basic usage of the client. It does not 

49 demonstrate its whole capabilities. Please check the single methods 

50 for more details. Please also keep in mind that this still requires 

51 provisioning of the device in the IoT-Agent and sending the commands 

52 via the context broker. For more details check the additional example 

53 section:: 

54 

55 from filip.models.ngsi_v2.iot import Device, DeviceAttribute, DeviceCommand, ServiceGroup 

56 from filip.clients.mqtt import MQTTClient 

57 from filip.clients.mqtt.encoder import IoTA_Json 

58 

59 # create a device configuration 

60 device_attr = DeviceAttribute(name='temperature', 

61 object_id='t', 

62 type="Number") 

63 device_command = DeviceCommand(name='heater', type="Boolean") 

64 device = Device(device_id='MyDevice', 

65 entity_name='MyDevice', 

66 entity_type='Thing', 

67 protocol='IoTA-JSON', 

68 transport='MQTT', 

69 apikey=YourApiKey, 

70 attributes=[device_attr], 

71 commands=[device_command]) 

72 

73 service_group = ServiceGroup(apikey="YourApiKey", resource="/iot") 

74 

75 mqttc = MQTTClient(client_id="YourID", 

76 userdata=None, 

77 protocol=mqtt.MQTTv5, 

78 transport="tcp", 

79 _devices = [device], 

80 service_groups = [service_group]) 

81 

82 # create a callback function that will be called for incoming 

83 # commands and add it for a single device 

84 def on_command(client, obj, msg): 

85 apikey, device_id, payload = \ 

86 client.get_encoder().decode_message(msg=msg) 

87 

88 # do_something with the message. 

89 # For instance write into a queue. 

90 

91 # acknowledge a command 

92 client.publish(device_id=device_id, 

93 command_name=next(iter(payload)) 

94 payload=payload) 

95 

96 mqttc.add_command_callback(on_command) 

97 

98 # create a non blocking loop 

99 mqttc.loop_start() 

100 

101 # publish a multi-measurement for a device 

102 mqttc.publish(device_id='MyDevice', payload={'t': 50}) 

103 

104 # publish a single measurement for a device 

105 mqttc.publish(device_id='MyDevice', 

106 attribute_name='temperature', 

107 payload=50) 

108 

109 # adding timestamps to measurements using the client 

110 

111 

112 # adding timestamps to measurements in payload 

113 from datetime import datetime 

114 

115 mqttc.publish(device_id='MyDevice', 

116 payload={'t': 50, 

117 'timeInstant': datetime.now().astimezone().isoformat()}, 

118 timestamp=true) 

119 

120 # stop network loop and disconnect cleanly 

121 mqttc.loop_stop() 

122 mqttc.disconnect() 

123 

124 """ 

125 

126 def __init__( 

127 self, 

128 client_id="", 

129 clean_session=None, 

130 userdata=None, 

131 protocol=mqtt.MQTTv311, 

132 transport="tcp", 

133 callback_api_version=mqtt.CallbackAPIVersion.VERSION2, 

134 devices: List[Device] = None, 

135 service_groups: List[ServiceGroup] = None, 

136 custom_encoder: Dict[str, BaseEncoder] = None, 

137 ): 

138 """ 

139 Args: 

140 client_id: 

141 Unique client id string used when connecting 

142 to the broker. If client_id is zero length or None, then the 

143 behaviour is defined by which protocol version is in use. If 

144 using MQTT v3.1.1, then a zero length client id will be sent 

145 to the broker and the broker will generate a random for the 

146 client. If using MQTT v3.1 then an id will be randomly 

147 generated. In both cases, clean_session must be True. 

148 If this is not the case a ValueError will be raised. 

149 clean_session: 

150 boolean that determines the client type. If True, 

151 the broker will remove all information about this client when it 

152 disconnects. If False, the client is a persistent client and 

153 subscription information and queued messages will be retained 

154 when the client disconnects. 

155 Note that a client will never discard its own outgoing 

156 messages on disconnect. Calling connect() or reconnect() will 

157 cause the messages to be resent. Use reinitialise() to reset 

158 a client to its original state. The clean_session argument 

159 only applies to MQTT versions v3.1.1 and v3.1. It is not 

160 accepted if the MQTT version is v5.0 - use the clean_start 

161 argument on connect() instead. 

162 userdata: 

163 defined data of any type that is passed as the "userdata" 

164 parameter to callbacks. It may be updated at a later point 

165 with the user_data_set() function. 

166 protocol: 

167 explicit setting of the MQTT version to use for this client. 

168 Can be paho.mqtt.client.MQTTv311 (v3.1.1), 

169 paho.mqtt.client.MQTTv31 (v3.1) or paho.mqtt.client.MQTTv5 

170 (v5.0), with the default being v3.1.1. 

171 transport: 

172 Set to "websockets" to use WebSockets as the transport 

173 mechanism. Set to "tcp" to use raw TCP, which is the default. 

174 devices: 

175 List of device configurations that will be registered 

176 with the client. Consequently, the client will be able to 

177 subscribe to all registered device topics. Furthermore, 

178 after registration messages can simply published by the 

179 _devices id. 

180 service_groups: 

181 List of service group configurations that will be registered 

182 with the client. These should be known upon subscribing 

183 because the client will check for a matching service group if 

184 this is not known or registered with the IoT-Agent service 

185 the receiving of commands will fail. Please check the 

186 official documentation of the IoT-Agents API for more details. 

187 custom_encoder: 

188 Custom encoder class that will automatically parse the supported 

189 payload formats to a dictionary and vice versa. This 

190 essentially saves boiler plate code. 

191 """ 

192 # initialize parent client 

193 super().__init__( 

194 client_id=client_id, 

195 clean_session=clean_session, 

196 userdata=userdata, 

197 protocol=protocol, 

198 callback_api_version=callback_api_version, 

199 transport=transport, 

200 ) 

201 

202 # setup logging functionality 

203 self.logger = logging.getLogger(name=f"{self.__class__.__name__}") 

204 self.logger.addHandler(logging.NullHandler()) 

205 self.enable_logger(self.logger) 

206 

207 # create dictionary holding the registered service groups 

208 self.service_groups: Dict[Tuple[str, str], ServiceGroup] 

209 if service_groups: 

210 self.service_groups = {gr.apikey: gr for gr in service_groups} 

211 else: 

212 self.service_groups = {} 

213 

214 # create dictionary holding the registered device configurations 

215 # check if all _devices have the right transport protocol 

216 self._devices: Dict[str, Device] = {} 

217 if devices: 

218 self.devices = devices 

219 

220 # create dict with available encoders 

221 self._encoders = {"IoTA-JSON": Json(), "PDI-IoTA-UltraLight": Ultralight()} 

222 

223 # add custom encoder for message parsing 

224 if custom_encoder: 

225 self.add_encoder(custom_encoder) 

226 

227 @property 

228 def devices(self): 

229 """ 

230 Returns as list of all registered device configurations 

231 Returns: 

232 

233 """ 

234 return list(self._devices.values()) 

235 

236 @devices.setter 

237 def devices(self, devices: List[Device]): 

238 """ 

239 Sets list of device configurations 

240 

241 Args: 

242 devices: List of device configurations 

243 

244 Returns: 

245 None 

246 

247 Raises: 

248 ValueError: if duplicate device id was found 

249 """ 

250 for device in devices: 

251 try: 

252 self.add_device(device=device) 

253 except ValueError: 

254 raise ValueError(f"Duplicate device_id: {device.device_id}") 

255 

256 def get_encoder(self, encoder: Union[str, PayloadProtocol]): 

257 """ 

258 Returns the encoder by key 

259 

260 Args: 

261 encoder: encoder name 

262 

263 Returns: 

264 Subclass of Baseencoder 

265 """ 

266 return self._encoders.get(encoder) 

267 

268 def add_encoder(self, encoder: Dict[str, BaseEncoder]): 

269 for value in encoder.values(): 

270 assert isinstance( 

271 value, BaseEncoder 

272 ), f"Encoder must be a subclass of {type(BaseEncoder)}" 

273 

274 self._encoders.update(encoder) 

275 

276 def __validate_device(self, device: Union[Device, Dict]) -> Device: 

277 """ 

278 Validates configuration of an IoT Device 

279 

280 Args: 

281 device: device model to check on 

282 

283 Returns: 

284 Device: validated model 

285 

286 Raises: 

287 AssertionError: for faulty configurations 

288 """ 

289 if isinstance(device, dict): 

290 device = Device.model_validate(device) 

291 

292 assert isinstance(device, Device), "Invalid device configuration!" 

293 

294 assert ( 

295 device.transport == TransportProtocol.MQTT 

296 ), "Unsupported transport protocol found in device configuration!" 

297 

298 if device.apikey in self.service_groups.keys(): 

299 pass 

300 # check if matching service group is registered 

301 else: 

302 msg = ( 

303 "Could not find matching service group! " 

304 "Communication may not work correctly!" 

305 ) 

306 self.logger.warning(msg=msg) 

307 warnings.warn(message=msg) 

308 

309 return device 

310 

311 def __create_topic( 

312 self, *, topic_type: IoTAMQTTMessageType, device: Device, attribute: str = None 

313 ) -> str: 

314 """ 

315 Creates a topic for a device configuration based on the requested 

316 topic type. 

317 

318 Args: 

319 device: 

320 Configuration of an IoT device 

321 topic_type: 

322 type of the topic to be created, 

323 'multi' for topics that the device is suppose to publish on. 

324 'single' for topics that the device is suppose to publish on. 

325 'cmd' for topic the device is expecting its commands on. 

326 'cmdexe' for topic the device can acknowledge its commands on. 

327 'configuration' for topic the device can request command 

328 configurations on 

329 attribute: 

330 attribute needs to be set for single measurements 

331 Returns: 

332 string with topic 

333 

334 Raises: 

335 KeyError: 

336 If unknown message type is used 

337 ValueError: 

338 If attribute name is missing for single measurements 

339 """ 

340 if topic_type == IoTAMQTTMessageType.MULTI: 

341 topic = "/".join( 

342 ( 

343 self._encoders[device.protocol].prefix, 

344 device.apikey, 

345 device.device_id, 

346 "attrs", 

347 ) 

348 ) 

349 elif topic_type == IoTAMQTTMessageType.SINGLE: 

350 if attribute: 

351 attr = next( 

352 attr for attr in device.attributes if attr.name == attribute 

353 ) 

354 if attr.object_id: 

355 attr_suffix = attr.object_id 

356 else: 

357 attr_suffix = attr.name 

358 topic = "/".join( 

359 ( 

360 self._encoders[device.protocol].prefix, 

361 device.apikey, 

362 device.device_id, 

363 "attrs", 

364 attr_suffix, 

365 ) 

366 ) 

367 else: 

368 raise ValueError("Missing argument name for single measurement") 

369 elif topic_type == IoTAMQTTMessageType.CMD: 

370 topic = "/" + "/".join((device.apikey, device.device_id, "cmd")) 

371 elif topic_type == IoTAMQTTMessageType.CMDEXE: 

372 topic = "/".join( 

373 ( 

374 self._encoders[device.protocol].prefix, 

375 device.apikey, 

376 device.device_id, 

377 "cmdexe", 

378 ) 

379 ) 

380 elif topic_type == IoTAMQTTMessageType.CONFIG: 

381 topic = "/".join( 

382 ( 

383 self._encoders[device.protocol].prefix, 

384 device.apikey, 

385 device.device_id, 

386 "configuration", 

387 ) 

388 ) 

389 else: 

390 raise KeyError("topic_type not supported") 

391 return topic 

392 

393 def __subscribe_commands( 

394 self, *, device: Device = None, qos=0, options=None, properties=None 

395 ): 

396 """ 

397 Subscribes commands based on device configuration. If device argument is 

398 omitted the function will subscribe to all topics of already registered 

399 _devices. 

400 Additionally, it will also check if a matching service group is 

401 registered with the client. If nor a warning will be raised. 

402 

403 Args: 

404 device: Configuration of an IoT device 

405 qos: Quality of service can be 0, 1 or 2 

406 options: MQTT v5.0 subscribe options 

407 properties: MQTT v5.0 properties 

408 

409 Returns: 

410 None 

411 """ 

412 if Device: 

413 if len(device.commands) > 0: 

414 topic = self.__create_topic( 

415 device=device, topic_type=IoTAMQTTMessageType.CMD 

416 ) 

417 super().subscribe( 

418 topic=topic, qos=qos, options=options, properties=properties 

419 ) 

420 else: 

421 # call itself but with device argument for all registered _devices 

422 for device in self._devices.values(): 

423 self.__subscribe_commands( 

424 device=device, qos=qos, options=options, properties=properties 

425 ) 

426 

427 def get_service_group(self, apikey: str) -> ServiceGroup: 

428 """ 

429 Returns registered service group configuration 

430 

431 Args: 

432 apikey: Unique APIKey of the service group 

433 

434 Returns: 

435 ServiceGroup 

436 

437 Raises: 

438 KeyError: if service group not yet registered 

439 

440 Example:: 

441 

442 from filip.clients.mqtt import MQTTClient 

443 

444 mqttc = MQTTClient() 

445 group = mqttc.get_service_group(apikey="MyAPIKEY") 

446 print(group.json(indent=2)) 

447 print(type(group)) 

448 """ 

449 group = self.service_groups.get(apikey, None) 

450 if group is None: 

451 raise KeyError("Service group with apikey %s not found!", apikey) 

452 return group 

453 

454 def add_service_group(self, service_group: Union[ServiceGroup, Dict]): 

455 """ 

456 Registers a device service group with the client 

457 

458 Args: 

459 service_group: Service group configuration 

460 

461 Returns: 

462 None 

463 

464 Raises: 

465 ValueError: if service group already exists 

466 """ 

467 if isinstance(service_group, dict): 

468 service_group = ServiceGroup.model_validate(service_group) 

469 assert isinstance( 

470 service_group, ServiceGroup 

471 ), "Invalid content for service group!" 

472 

473 if self.service_groups.get(service_group.apikey, None) is None: 

474 pass 

475 else: 

476 raise ValueError("Service group already exists! %s", service_group.apikey) 

477 # add service group configuration to the service group list 

478 self.service_groups[service_group.apikey] = service_group 

479 

480 def delete_service_group(self, apikey): 

481 """ 

482 Unregisters a service group and removes 

483 

484 Args: 

485 apikey: Unique APIKey of the service group 

486 

487 Returns: 

488 None 

489 """ 

490 group = self.service_groups.pop(apikey, None) 

491 if group: 

492 self.logger.info("Successfully unregistered Service Group '%s'!", apikey) 

493 else: 

494 self.logger.error("Could not unregister service group '%s'!", apikey) 

495 raise KeyError("Device not found!") 

496 

497 def update_service_group(self, service_group: Union[ServiceGroup, Dict]): 

498 """ 

499 Updates a registered service group configuration. There is no 

500 opportunity to only partially update the device. Hence, your service 

501 group model should be complete. 

502 

503 Args: 

504 service_group: Service group configuration 

505 

506 Returns: 

507 None 

508 

509 Raises: 

510 KeyError: if service group not yet registered 

511 """ 

512 if isinstance(service_group, dict): 

513 service_group = ServiceGroup.model_validate(service_group) 

514 assert isinstance( 

515 service_group, ServiceGroup 

516 ), "Invalid content for service group" 

517 

518 if self.service_groups.get(service_group.apikey, None) is None: 

519 raise KeyError("Service group not found! %s", service_group.apikey) 

520 # add service group configuration to the service group list 

521 self.service_groups[service_group.apikey] = service_group 

522 

523 def get_device(self, device_id: str) -> Device: 

524 """ 

525 Returns the configuration of a registered device. 

526 

527 Args: 

528 device_id: Id of the requested device 

529 

530 Returns: 

531 Device: Device model of the requested device 

532 

533 Raises: 

534 KeyError: if requested device is not registered with the client 

535 

536 Example:: 

537 

538 from filip.clients.mqtt import MQTTClient 

539 

540 mqttc = MQTTClient() 

541 device = mqttc.get_device(device_id="MyDeviceId") 

542 print(device.json(indent=2)) 

543 print(type(device)) 

544 """ 

545 return self._devices[device_id] 

546 

547 def add_device( 

548 self, device: Union[Device, Dict], qos=0, options=None, properties=None 

549 ): 

550 """ 

551 Registers a device config with the mqtt client. Subsequently, 

552 the client will magically subscribe to the corresponding topics based 

553 on the device config and any matching registered service group config 

554 if exists. 

555 

556 Note: 

557 To register the device config only with this client is not 

558 sufficient for data streaming the configuration also needs to be 

559 registered with IoTA-Agent. 

560 

561 Args: 

562 device: Configuration of an IoT device 

563 qos: Quality of service can be 0, 1 or 2 

564 options: MQTT v5.0 subscribe options 

565 properties: MQTT v5.0 properties 

566 

567 Returns: 

568 None 

569 

570 Raises: 

571 ValueError: if device configuration already exists 

572 """ 

573 device = self.__validate_device(device=device) 

574 

575 if self._devices.get(device.device_id, None) is None: 

576 pass 

577 else: 

578 raise ValueError("Device already exists! %s", device.device_id) 

579 # add device configuration to the device list 

580 self._devices[device.device_id] = device 

581 # subscribes to the command topic 

582 self.__subscribe_commands( 

583 device=device, qos=qos, options=options, properties=properties 

584 ) 

585 

586 def delete_device(self, device_id: str): 

587 """ 

588 Unregisters a device and removes its subscriptions and callbacks 

589 

590 Args: 

591 device_id: id of and IoT device 

592 

593 Returns: 

594 None 

595 """ 

596 device = self._devices.pop(device_id, None) 

597 if device: 

598 topic = self.__create_topic( 

599 device=device, topic_type=IoTAMQTTMessageType.CMD 

600 ) 

601 self.unsubscribe(topic=topic) 

602 self.message_callback_remove(sub=topic) 

603 self.logger.info("Successfully unregistered Device '%s'!", device_id) 

604 else: 

605 self.logger.error("Could not unregister device '%s'", device_id) 

606 

607 def update_device( 

608 self, device: Union[Device, Dict], qos=0, options=None, properties=None 

609 ): 

610 """ 

611 Updates a registered device configuration. There is no opportunity 

612 to only partially update the device. Hence, your device model should 

613 be complete. 

614 

615 Args: 

616 device: Configuration of an IoT device 

617 qos: Quality of service can be 0, 1 or 2 

618 options: MQTT v5.0 subscribe options 

619 properties: MQTT v5.0 properties 

620 

621 Returns: 

622 None 

623 

624 Raises: 

625 KeyError: if device not yet registered 

626 """ 

627 device = self.__validate_device(device=device) 

628 

629 if self._devices.get(device.device_id, None) is None: 

630 raise KeyError("Device not found! %s", device.device_id) 

631 

632 # update device configuration in the device list 

633 self._devices[device.device_id] = device 

634 # subscribes to the command topic 

635 self.__subscribe_commands( 

636 device=device, qos=qos, options=options, properties=properties 

637 ) 

638 

639 def add_command_callback(self, device_id: str, callback: Callable): 

640 """ 

641 Adds callback function for a device configuration. 

642 

643 Args: 

644 device_id: 

645 id of and IoT device 

646 callback: 

647 function that will be called for incoming commands. 

648 This function should have the following format: 

649 

650 Example:: 

651 

652 def on_command(client, obj, msg): 

653 apikey, device_id, payload = \ 

654 client.encoder.decode_message(msg=msg) 

655 

656 # do_something with the message. 

657 # For instance write into a queue. 

658 

659 # acknowledge a command. Here command are usually single 

660 # messages. The first key is equal to the commands name. 

661 client.publish(device_id=device_id, 

662 command_name=next(iter(payload)), 

663 payload=payload) 

664 

665 mqttc.add_command_callback(device_id="MyDevice", 

666 callback=on_command) 

667 

668 Returns: 

669 None 

670 """ 

671 device = self._devices.get(device_id, None) 

672 if device is None: 

673 raise KeyError("Device does not exist! %s", device_id) 

674 self.__subscribe_commands(device=device) 

675 topic = self.__create_topic(device=device, topic_type=IoTAMQTTMessageType.CMD) 

676 self.message_callback_add(topic, callback) 

677 

678 def publish( 

679 self, 

680 topic=None, 

681 payload: Union[Dict, Any] = None, 

682 qos: int = 0, 

683 retain: bool = False, 

684 properties=None, 

685 device_id: str = None, 

686 attribute_name: str = None, 

687 command_name: str = None, 

688 timestamp: bool = False, 

689 ): 

690 """ 

691 Publish an MQTT Message to a specified topic. If you want to publish 

692 a device specific message to a device use the device_id argument for 

693 multi-measurement. The function will then automatically validate 

694 against the registered device configuration if the payload keys are 

695 valid. If you want to publish a single measurement the attribute_name 

696 argument is required as well. 

697 

698 Note: 

699 If the device_id argument is set, the topic argument will be 

700 ignored. 

701 

702 Args: 

703 topic: 

704 The topic that the message should be published on. 

705 payload: 

706 The actual message to send. If not given, or set to None a 

707 zero length message will be used. Passing an int or float will 

708 result in the payload being converted to a string 

709 representing that number. If you wish to send a true 

710 int/float, use struct.pack() to create the 

711 payload you require. For publishing to a device use a dict 

712 containing the object_ids as keys. 

713 qos: 

714 The quality of service level to use. 

715 retain: 

716 If set to true, the message will be set as the "last known 

717 good"/retained message for the topic. 

718 properties: 

719 (MQTT v5.0 only) the MQTT v5.0 properties to be included. 

720 Use the Properties class. 

721 device_id: 

722 Id of the IoT device you want to publish for. The topics will 

723 automatically created. If set, the message type will be 

724 assumed to be multi measurement. 

725 attribute_name: 

726 Name of an attribute of the device. Do only use this for 

727 single measurements. If set, `command_name` must 

728 be omitted. 

729 command_name: 

730 Name of a command of the device that should be acknowledged. If 

731 set `attribute_name` must be omitted. 

732 timestamp: 

733 If `true` the client will generate a valid timestamp based on 

734 utc and added to the multi measurement payload. 

735 If a `timeInstant` is already contained in the 

736 message payload it will not overwritten. 

737 

738 Returns: 

739 None 

740 

741 Raises: 

742 KeyError: if device configuration is not registered with client 

743 ValueError: if the passed arguments are inconsistent or a 

744 timestamp does not match the ISO 8601 format. 

745 AssertionError: if the message payload does not match the device 

746 configuration. 

747 """ 

748 

749 # TODO: time stamps are not tested yet 

750 

751 if device_id: 

752 device = self.get_device(device_id=device_id) 

753 

754 # create message for multi measurement payload 

755 if attribute_name is None and command_name is None: 

756 assert isinstance(payload, dict), "Payload must be a dictionary" 

757 

758 if timestamp and "timeInstant" not in payload.keys(): 

759 payload["timeInstant"] = datetime.utcnow() 

760 # validate if dict keys match device configuration 

761 

762 msg_payload = payload.copy() 

763 for key in payload.keys(): 

764 for attr in device.attributes: 

765 key_constraint = key == "timeInstant" 

766 

767 def elif_action(msg): 

768 None 

769 

770 if attr.object_id is not None: 

771 key_constraint = key_constraint or (key in attr.object_id) 

772 

773 def elif_action(msg): 

774 msg[attr.object_id] = msg.pop(key) 

775 

776 # could be made more compact by pulling up the second condition 

777 # but would probably make the code less readable... 

778 if key_constraint: 

779 break 

780 

781 elif key == attr.name: 

782 elif_action(msg_payload) 

783 break 

784 

785 else: 

786 err_msg = ( 

787 f"Attribute key '{key}' is not allowed " 

788 f"in the message payload for this " 

789 f"device configuration with device_id " 

790 f"'{device_id}'" 

791 ) 

792 raise KeyError(err_msg) 

793 topic = self.__create_topic( 

794 device=device, topic_type=IoTAMQTTMessageType.MULTI 

795 ) 

796 payload = self._encoders[device.protocol].encode_msg( 

797 device_id=device_id, 

798 payload=msg_payload, 

799 msg_type=IoTAMQTTMessageType.MULTI, 

800 ) 

801 

802 # create message for command acknowledgement 

803 elif attribute_name is None and command_name: 

804 assert isinstance(payload, Dict), "Payload must be a dictionary" 

805 assert ( 

806 len(payload.keys()) == 1 

807 ), "Cannot acknowledge multiple commands simultaneously" 

808 assert next(iter(payload.keys())) in [ 

809 cmd.name for cmd in device.commands 

810 ], "Unknown command for this device!" 

811 topic = self.__create_topic( 

812 device=device, topic_type=IoTAMQTTMessageType.CMDEXE 

813 ) 

814 payload = self._encoders[device.protocol].encode_msg( 

815 device_id=device_id, 

816 payload=payload, 

817 msg_type=IoTAMQTTMessageType.CMDEXE, 

818 ) 

819 

820 # create message for single measurement 

821 elif attribute_name and command_name is None: 

822 topic = self.__create_topic( 

823 device=device, 

824 topic_type=IoTAMQTTMessageType.SINGLE, 

825 attribute=attribute_name, 

826 ) 

827 payload = self._encoders[device.protocol].encode_msg( 

828 device_id=device_id, 

829 payload=payload, 

830 msg_type=IoTAMQTTMessageType.SINGLE, 

831 ) 

832 else: 

833 raise ValueError("Inconsistent arguments!") 

834 

835 super().publish( 

836 topic=topic, payload=payload, qos=qos, retain=retain, properties=properties 

837 ) 

838 

839 def subscribe(self, topic=None, qos=0, options=None, properties=None): 

840 """ 

841 Extends the normal subscribe function of the paho.mqtt.client. 

842 If the topic argument is omitted the client will subscribe to all 

843 registered device command topics. 

844 

845 Args: 

846 topic: 

847 A string specifying the subscription topic to subscribe to. 

848 qos: 

849 The desired quality of service level for the subscription. 

850 Defaults to 0. 

851 options: Not used. 

852 properties: Not used. 

853 

854 Returns: 

855 None 

856 """ 

857 if topic: 

858 super().subscribe( 

859 topic=topic, qos=qos, options=options, properties=properties 

860 ) 

861 else: 

862 for device in self._devices.values(): 

863 self.__subscribe_commands( 

864 device=device, qos=qos, options=options, properties=properties 

865 )