Coverage for filip/clients/ngsi_v2/iota.py: 77%

231 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-03-10 13:43 +0000

1""" 

2IoT-Agent Module for API Client 

3""" 

4 

5from __future__ import annotations 

6 

7import json 

8from copy import deepcopy 

9from typing import List, Dict, Set, TYPE_CHECKING, Union, Optional 

10import warnings 

11from urllib.parse import urljoin 

12import requests 

13from pydantic import AnyHttpUrl 

14from pydantic.type_adapter import TypeAdapter 

15from filip.config import settings 

16from filip.clients.base_http_client import BaseHttpClient 

17from filip.models.base import FiwareHeader 

18from filip.models.ngsi_v2.iot import Device, ServiceGroup 

19 

20from filip.utils.filter import filter_device_list, filter_group_list 

21 

22if TYPE_CHECKING: 

23 from filip.clients.ngsi_v2.cb import ContextBrokerClient 

24 

25 

26class IoTAClient(BaseHttpClient): 

27 """ 

28 Client for FIWARE IoT-Agents. The implementation follows the API 

29 specifications from here: 

30 https://iotagent-node-lib.readthedocs.io/en/latest/ 

31 

32 Args: 

33 url: Url of IoT-Agent 

34 session (requests.Session): 

35 fiware_header (FiwareHeader): fiware service and fiware service path 

36 **kwargs (Optional): Optional arguments that ``request`` takes. 

37 """ 

38 

39 def __init__( 

40 self, 

41 url: str = None, 

42 *, 

43 session: requests.Session = None, 

44 fiware_header: FiwareHeader = None, 

45 **kwargs, 

46 ): 

47 # set service url 

48 url = url or settings.IOTA_URL 

49 super().__init__( 

50 url=url, session=session, fiware_header=fiware_header, **kwargs 

51 ) 

52 

53 # ABOUT API 

54 def get_version(self) -> Dict: 

55 """ 

56 Gets version of IoT Agent 

57 

58 Returns: 

59 Dictionary with response 

60 """ 

61 url = urljoin(self.base_url, "iot/about") 

62 try: 

63 res = self.get(url=url, headers=self.headers) 

64 if res.ok: 

65 return res.json() 

66 res.raise_for_status() 

67 except requests.RequestException as err: 

68 self.logger.error(err) 

69 raise 

70 

71 # SERVICE GROUP API 

72 def post_groups( 

73 self, 

74 service_groups: Union[ServiceGroup, List[ServiceGroup]], 

75 update: bool = False, 

76 ): 

77 """ 

78 Creates a set of service groups for the given service and service_path. 

79 The service_group and subservice information will taken from the 

80 headers, overwriting any preexisting values. 

81 

82 Args: 

83 service_groups (list of ServiceGroup): Service groups that will be 

84 posted to the agent's API 

85 update (bool): If service group already exists try to update its 

86 

87 Returns: 

88 None 

89 """ 

90 if not isinstance(service_groups, list): 

91 service_groups = [service_groups] 

92 for group in service_groups: 

93 if group.service: 

94 assert ( 

95 group.service == self.headers["fiware-service"] 

96 ), "Service group service does not math fiware service" 

97 if group.subservice: 

98 assert ( 

99 group.subservice == self.headers["fiware-servicepath"] 

100 ), "Service group subservice does not math fiware service path" 

101 

102 url = urljoin(self.base_url, "iot/services") 

103 headers = self.headers 

104 data = { 

105 "services": [ 

106 group.model_dump(exclude={"service", "subservice"}, exclude_none=True) 

107 for group in service_groups 

108 ] 

109 } 

110 try: 

111 res = self.post(url=url, headers=headers, json=data) 

112 if res.ok: 

113 self.logger.info("Services successfully posted") 

114 elif res.status_code == 409: 

115 self.logger.warning(res.text) 

116 if len(service_groups) > 1: 

117 self.logger.info( 

118 "Trying to split bulk operation into " "single operations" 

119 ) 

120 for group in service_groups: 

121 self.post_group(service_group=group, update=update) 

122 elif update is True: 

123 self.update_group(service_group=service_groups[0], fields=None) 

124 else: 

125 res.raise_for_status() 

126 else: 

127 res.raise_for_status() 

128 except requests.RequestException as err: 

129 self.log_error(err=err, msg=None) 

130 raise 

131 

132 def post_group(self, service_group: ServiceGroup, update: bool = False): 

133 """ 

134 Single service registration but using the bulk operation in background 

135 

136 Args: 

137 service_group (ServiceGroup): Service that will be posted to the 

138 agent's API 

139 update (bool): 

140 

141 Returns: 

142 None 

143 """ 

144 return self.post_groups(service_groups=[service_group], update=update) 

145 

146 def get_group_list(self) -> List[ServiceGroup]: 

147 r""" 

148 Retrieves service_group groups from the database. If the servicepath 

149 header has the wildcard expression, /\*, all the subservices for the 

150 service_group are returned. The specific subservice parameters are 

151 returned in any other case. 

152 

153 Returns: 

154 

155 """ 

156 url = urljoin(self.base_url, "iot/services") 

157 headers = self.headers 

158 try: 

159 res = self.get(url=url, headers=headers) 

160 if res.ok: 

161 ta = TypeAdapter(List[ServiceGroup]) 

162 return ta.validate_python(res.json()["services"]) 

163 res.raise_for_status() 

164 except requests.RequestException as err: 

165 self.log_error(err=err, msg=None) 

166 raise 

167 

168 def get_group(self, *, resource: str, apikey: str) -> ServiceGroup: 

169 """ 

170 Retrieves service_group groups from the database based on resource and 

171 apikey 

172 Args: 

173 resource: 

174 apikey: 

175 Returns: 

176 

177 """ 

178 groups = self.get_group_list() 

179 groups = filter_group_list( 

180 group_list=groups, resources=resource, apikeys=apikey 

181 ) 

182 if len(groups) == 1: 

183 group = groups[0] 

184 return group 

185 elif len(groups) == 0: 

186 raise KeyError( 

187 f"Service group with resource={resource} and apikey={apikey} was not found" 

188 ) 

189 else: 

190 raise NotImplementedError( 

191 "There is a wierd error, try get_group_list() for debugging" 

192 ) 

193 

194 def update_groups( 

195 self, 

196 *, 

197 service_groups: Union[ServiceGroup, List[ServiceGroup]], 

198 add: False, 

199 fields: Union[Set[str], List[str]] = None, 

200 ) -> None: 

201 """ 

202 Bulk operation for service group update. 

203 Args: 

204 fields: 

205 service_groups: 

206 add: 

207 

208 Returns: 

209 

210 """ 

211 if not isinstance(service_groups, list): 

212 service_groups = [service_groups] 

213 for group in service_groups: 

214 self.update_group(service_group=group, fields=fields, add=add) 

215 

216 def update_group( 

217 self, 

218 *, 

219 service_group: ServiceGroup, 

220 fields: Union[Set[str], List[str]] = None, 

221 add: bool = True, 

222 ): 

223 """ 

224 Modifies the information for a service group configuration, identified 

225 by the resource and apikey query parameters. Takes a service group body 

226 as the payload. The body does not have to be complete: for incomplete 

227 bodies, just the existing attributes will be updated 

228 

229 Args: 

230 service_group (ServiceGroup): Service to update. 

231 fields: Fields of the service_group to update. If 'None' all allowed 

232 fields will be updated 

233 add: 

234 Returns: 

235 None 

236 """ 

237 if fields: 

238 if isinstance(fields, list): 

239 fields = set(fields) 

240 else: 

241 fields = None 

242 url = urljoin(self.base_url, "iot/services") 

243 headers = self.headers 

244 params = service_group.model_dump(include={"resource", "apikey"}) 

245 try: 

246 res = self.put( 

247 url=url, 

248 headers=headers, 

249 params=params, 

250 json=service_group.model_dump( 

251 include=fields, exclude={"service", "subservice"}, exclude_none=True 

252 ), 

253 ) 

254 if res.ok: 

255 self.logger.info("ServiceGroup updated!") 

256 elif (res.status_code == 404) & (add is True): 

257 self.post_group(service_group=service_group) 

258 else: 

259 res.raise_for_status() 

260 except requests.RequestException as err: 

261 self.log_error(err=err, msg=None) 

262 raise 

263 

264 def delete_group(self, *, resource: str, apikey: str): 

265 """ 

266 Deletes a service group in in the IoT-Agent 

267 

268 Args: 

269 resource: 

270 apikey: 

271 

272 Returns: 

273 

274 """ 

275 url = urljoin(self.base_url, "iot/services") 

276 headers = self.headers 

277 params = {"resource": resource, "apikey": apikey} 

278 try: 

279 res = self.delete(url=url, headers=headers, params=params) 

280 if res.ok: 

281 self.logger.info( 

282 "ServiceGroup with resource: '%s' and " 

283 "apikey: '%s' successfully deleted!", 

284 resource, 

285 apikey, 

286 ) 

287 else: 

288 res.raise_for_status() 

289 except requests.RequestException as err: 

290 msg = ( 

291 f"Could not delete ServiceGroup with resource " 

292 f"'{resource}' and apikey '{apikey}'!" 

293 ) 

294 self.log_error(err=err, msg=msg) 

295 raise 

296 

297 # DEVICE API 

298 def post_devices( 

299 self, *, devices: Union[Device, List[Device]], update: bool = False 

300 ) -> None: 

301 """ 

302 Post a device from the device registry. No payload is required 

303 or received. 

304 If a device already exists in can be updated with update = True 

305 Args: 

306 devices (list of Devices): 

307 update (bool): Whether if the device is already existent it 

308 should be updated 

309 Returns: 

310 None 

311 """ 

312 if not isinstance(devices, list): 

313 devices = [devices] 

314 url = urljoin(self.base_url, "iot/devices") 

315 headers = self.headers 

316 

317 data = { 

318 "devices": [ 

319 json.loads(device.model_dump_json(exclude_none=True)) 

320 for device in devices 

321 ] 

322 } 

323 try: 

324 res = self.post(url=url, headers=headers, json=data) 

325 if res.ok: 

326 self.logger.info("Devices successfully posted!") 

327 else: 

328 res.raise_for_status() 

329 except requests.RequestException as err: 

330 if update: 

331 return self.update_devices(devices=devices, add=False) 

332 msg = "Could not post devices" 

333 self.log_error(err=err, msg=msg) 

334 raise 

335 

336 def post_device(self, *, device: Device, update: bool = False) -> None: 

337 """ 

338 Post a device configuration to the IoT-Agent 

339 

340 Args: 

341 device: IoT device configuration to send 

342 update: update device if configuration already exists 

343 

344 Returns: 

345 None 

346 """ 

347 return self.post_devices(devices=[device], update=update) 

348 

349 def get_device_list( 

350 self, 

351 *, 

352 limit: int = None, 

353 offset: int = None, 

354 device_ids: Union[str, List[str]] = None, 

355 entity_names: Union[str, List[str]] = None, 

356 entity_types: Union[str, List[str]] = None, 

357 ) -> List[Device]: 

358 """ 

359 Returns a list of all the devices in the device registry with all 

360 its data. The IoTAgent now only supports "limit" and "offset" as 

361 request parameters. 

362 

363 Args: 

364 limit: 

365 if present, limits the number of devices returned in the 

366 list. Must be a number between 1 and 1000. 

367 offset: 

368 if present, skip that number of devices from the original 

369 query. 

370 device_ids: 

371 List of device_ids. If given, only devices with matching ids 

372 will be returned 

373 entity_names: 

374 The entity_ids of the devices. If given, only the devices 

375 with the specified entity_id will be returned 

376 entity_types: 

377 The entity_type of the device. If given, only the devices 

378 with the specified entity_type will be returned 

379 

380 Returns: 

381 List of matching devices 

382 """ 

383 if limit: 

384 if not 1 < limit < 1000: 

385 self.logger.error("'limit' must be an integer between 1 and " "1000!") 

386 raise ValueError 

387 url = urljoin(self.base_url, "iot/devices") 

388 headers = self.headers 

389 params = {key: value for key, value in locals().items() if value is not None} 

390 try: 

391 res = self.get(url=url, headers=headers, params=params) 

392 if res.ok: 

393 ta = TypeAdapter(List[Device]) 

394 devices = ta.validate_python(res.json()["devices"]) 

395 # filter by device_ids, entity_names or entity_types 

396 devices = filter_device_list( 

397 devices, device_ids, entity_names, entity_types 

398 ) 

399 return devices 

400 res.raise_for_status() 

401 except requests.RequestException as err: 

402 self.log_error(err=err, msg=None) 

403 raise 

404 

405 def get_device(self, *, device_id: str) -> Device: 

406 """ 

407 Returns all the information about a particular device. 

408 

409 Args: 

410 device_id: 

411 Raises: 

412 requests.RequestException, if device does not exist 

413 Returns: 

414 Device 

415 

416 """ 

417 url = urljoin(self.base_url, f"iot/devices/{device_id}") 

418 headers = self.headers 

419 try: 

420 res = self.get(url=url, headers=headers) 

421 if res.ok: 

422 return Device.model_validate(res.json()) 

423 res.raise_for_status() 

424 except requests.RequestException as err: 

425 msg = f"Device {device_id} was not found" 

426 self.log_error(err=err, msg=msg) 

427 raise 

428 

429 def update_device(self, *, device: Device, add: bool = True) -> None: 

430 """ 

431 Updates a device from the device registry. 

432 Adds, removes attributes from the device entry and changes 

433 attributes values. 

434 It does not change device settings (endpoint,..) and only adds 

435 attributes to the corresponding entity, their it does not 

436 change any attribute value and does not delete removed attributes 

437 

438 Args: 

439 device: 

440 add (bool): If device not found add it 

441 Returns: 

442 None 

443 """ 

444 url = urljoin(self.base_url, f"iot/devices/{device.device_id}") 

445 headers = self.headers 

446 try: 

447 res = self.put( 

448 url=url, 

449 headers=headers, 

450 json=device.model_dump( 

451 include={"attributes", "lazy", "commands", "static_attributes"}, 

452 exclude_none=True, 

453 ), 

454 ) 

455 if res.ok: 

456 self.logger.info("Device '%s' successfully updated!", device.device_id) 

457 elif (res.status_code == 404) & (add is True): 

458 self.post_device(device=device, update=False) 

459 else: 

460 res.raise_for_status() 

461 except requests.RequestException as err: 

462 msg = f"Could not update device '{device.device_id}'" 

463 self.log_error(err=err, msg=msg) 

464 raise 

465 

466 def update_devices( 

467 self, *, devices: Union[Device, List[Device]], add: False 

468 ) -> None: 

469 """ 

470 Bulk operation for device update. 

471 Args: 

472 devices: 

473 add: 

474 

475 Returns: 

476 

477 """ 

478 if not isinstance(devices, list): 

479 devices = [devices] 

480 for device in devices: 

481 self.update_device(device=device, add=add) 

482 

483 def delete_device( 

484 self, 

485 *, 

486 device_id: str, 

487 cb_url: AnyHttpUrl = settings.CB_URL, 

488 delete_entity: bool = False, 

489 force_entity_deletion: bool = False, 

490 cb_client: ContextBrokerClient = None, 

491 ) -> None: 

492 """ 

493 Remove a device from the device registry. No payload is required 

494 or received. 

495 

496 Args: 

497 device_id: str, ID of Device 

498 delete_entity: False -> Only delete the device entry, 

499 the automatically created and linked 

500 context-entity will continue to 

501 exist in Fiware 

502 True -> Also delete the automatically 

503 created and linked context-entity 

504 If multiple devices are linked to this 

505 entity, this operation is not executed and 

506 an exception is raised 

507 force_entity_deletion: 

508 bool, if delete_entity is true and multiple devices are linked 

509 to the linked entity, delete it and do not raise an error 

510 cb_client (ContextBrokerClient): 

511 Corresponding ContextBrokerClient object for entity manipulation 

512 cb_url (AnyHttpUrl): 

513 Url of the ContextBroker where the entity is found. 

514 This will autogenerate an CB-Client, mirroring the information 

515 of the IoTA-Client, e.g. FiwareHeader, and other headers 

516 (not recommended!) 

517 

518 Returns: 

519 None 

520 """ 

521 url = urljoin( 

522 self.base_url, 

523 f"iot/devices/{device_id}", 

524 ) 

525 headers = self.headers 

526 

527 device = self.get_device(device_id=device_id) 

528 

529 try: 

530 res = self.delete(url=url, headers=headers) 

531 if res.ok: 

532 self.logger.info("Device '%s' successfully deleted!", device_id) 

533 else: 

534 res.raise_for_status() 

535 except requests.RequestException as err: 

536 msg = f"Could not delete device {device_id}!" 

537 self.log_error(err=err, msg=msg) 

538 raise 

539 

540 if delete_entity: 

541 # An entity can technically belong to multiple devices 

542 # Only delete the entity if 

543 devices = self.get_device_list(entity_names=[device.entity_name]) 

544 

545 # Zero because we count the remaining devices 

546 if len(devices) > 0 and not force_entity_deletion: 

547 raise Exception( 

548 f"The corresponding entity to the device " 

549 f"{device_id} was not deleted because it is " 

550 f"linked to multiple devices. " 

551 ) 

552 else: 

553 try: 

554 from filip.clients.ngsi_v2 import ContextBrokerClient 

555 

556 if cb_client: 

557 cb_client_local = deepcopy(cb_client) 

558 else: 

559 warnings.warn( 

560 "No `ContextBrokerClient` " 

561 "object providesd! Will try to generate " 

562 "one. This usage is not recommended." 

563 ) 

564 

565 cb_client_local = ContextBrokerClient( 

566 url=cb_url, 

567 fiware_header=self.fiware_headers, 

568 headers=headers, 

569 ) 

570 

571 cb_client_local.delete_entity( 

572 entity_id=device.entity_name, entity_type=device.entity_type 

573 ) 

574 

575 except requests.RequestException as err: 

576 # Do not throw an error 

577 # It is only important that the entity does not exist after 

578 # this methode, not if this methode actively deleted it 

579 pass 

580 

581 cb_client_local.close() 

582 

583 def patch_device( 

584 self, 

585 device: Device, 

586 patch_entity: bool = True, 

587 cb_client: ContextBrokerClient = None, 

588 cb_url: AnyHttpUrl = settings.CB_URL, 

589 ) -> None: 

590 """ 

591 Updates a device state in Fiware, if the device does not exists it 

592 is created, else its values are updated. 

593 If the device settings (endpoint,..) were changed the device and 

594 entity are deleted and re-added. 

595 

596 If patch_entity is true the corresponding entity in the ContextBroker is 

597 also correctly updated. Else only new attributes are added there. 

598 

599 Args: 

600 device (Device): Device to be posted to /updated in Fiware 

601 patch_entity (bool): If true the corresponding entity is 

602 completely synced 

603 cb_client (ContextBrokerClient): 

604 Corresponding ContextBrokerClient object for entity manipulation 

605 cb_url (AnyHttpUrl): 

606 Url of the ContextBroker where the entity is found. 

607 This will autogenerate an CB-Client, mirroring the information 

608 of the IoTA-Client, e.g. FiwareHeader, and other headers 

609 (not recommended!) 

610 

611 Returns: 

612 None 

613 """ 

614 try: 

615 live_device = self.get_device(device_id=device.device_id) 

616 except requests.RequestException: 

617 # device does not exist yet, post it 

618 self.post_device(device=device) 

619 return 

620 

621 # if the device settings were changed we need to delete the device 

622 # and repost it 

623 settings_dict = { 

624 "device_id", 

625 "service", 

626 "service_path", 

627 "entity_name", 

628 "entity_type", 

629 "timestamp", 

630 "apikey", 

631 "endpoint", 

632 "protocol", 

633 "transport", 

634 "expressionLanguage", 

635 } 

636 

637 live_settings = live_device.model_dump(include=settings_dict) 

638 new_settings = device.model_dump(include=settings_dict) 

639 

640 if not live_settings == new_settings: 

641 self.delete_device( 

642 device_id=device.device_id, 

643 delete_entity=True, 

644 force_entity_deletion=True, 

645 cb_client=cb_client, 

646 ) 

647 self.post_device(device=device) 

648 return 

649 

650 # We are at a state where the device exists, but only attributes were 

651 # changed. 

652 # we need to update the device, and the context entry separately, 

653 # as update device only takes over a part of the changes to the 

654 # ContextBroker. 

655 

656 # update device 

657 self.update_device(device=device) 

658 

659 # update context entry 

660 # 1. build context entity from information in device 

661 # 2. patch it 

662 from filip.models.ngsi_v2.context import ContextEntity, NamedContextAttribute 

663 

664 def build_context_entity_from_device(device: Device) -> ContextEntity: 

665 from filip.models.base import DataType 

666 

667 entity = ContextEntity(id=device.entity_name, type=device.entity_type) 

668 

669 for command in device.commands: 

670 entity.add_attributes( 

671 [ 

672 # Command attribute will be registered by the device_update 

673 NamedContextAttribute( 

674 name=f"{command.name}_info", type=DataType.COMMAND_RESULT 

675 ), 

676 NamedContextAttribute( 

677 name=f"{command.name}_status", type=DataType.COMMAND_STATUS 

678 ), 

679 ] 

680 ) 

681 for attribute in device.attributes: 

682 entity.add_attributes( 

683 [ 

684 NamedContextAttribute( 

685 name=attribute.name, 

686 type=DataType.STRUCTUREDVALUE, 

687 metadata=attribute.metadata, 

688 ) 

689 ] 

690 ) 

691 for static_attribute in device.static_attributes: 

692 entity.add_attributes( 

693 [ 

694 NamedContextAttribute( 

695 name=static_attribute.name, 

696 type=static_attribute.type, 

697 value=static_attribute.value, 

698 metadata=static_attribute.metadata, 

699 ) 

700 ] 

701 ) 

702 return entity 

703 

704 if patch_entity: 

705 from filip.clients.ngsi_v2 import ContextBrokerClient 

706 

707 if cb_client: 

708 cb_client_local = deepcopy(cb_client) 

709 else: 

710 warnings.warn( 

711 "No `ContextBrokerClient` object provided! " 

712 "Will try to generate one. " 

713 "This usage is not recommended." 

714 ) 

715 

716 cb_client_local = ContextBrokerClient( 

717 url=cb_url, fiware_header=self.fiware_headers, headers=self.headers 

718 ) 

719 

720 cb_client_local.patch_entity( 

721 entity=build_context_entity_from_device(device) 

722 ) 

723 cb_client_local.close() 

724 

725 def does_device_exists(self, device_id: str) -> bool: 

726 """ 

727 Test if a device with the given id exists in Fiware 

728 Args: 

729 device_id (str) 

730 Returns: 

731 bool 

732 """ 

733 try: 

734 self.get_device(device_id=device_id) 

735 return True 

736 except requests.RequestException as err: 

737 if err.response is None or not err.response.status_code == 404: 

738 self.log_error( 

739 err=err, 

740 msg=f"Error while checking existence for device {device_id}", 

741 ) 

742 raise 

743 return False 

744 

745 # LOG API 

746 def get_loglevel_of_agent(self): 

747 """ 

748 Get current loglevel of agent 

749 Returns: 

750 

751 """ 

752 url = urljoin(self.base_url, "admin/log") 

753 headers = self.headers.copy() 

754 del headers["fiware-service"] 

755 del headers["fiware-servicepath"] 

756 try: 

757 res = self.get(url=url, headers=headers) 

758 if res.ok: 

759 return res.json()["level"] 

760 res.raise_for_status() 

761 except requests.RequestException as err: 

762 self.log_error(err=err) 

763 raise 

764 

765 def change_loglevel_of_agent(self, level: str): 

766 """ 

767 Change current loglevel of agent 

768 

769 Args: 

770 level: 

771 

772 Returns: 

773 

774 """ 

775 level = level.upper() 

776 if level not in ["INFO", "ERROR", "FATAL", "DEBUG", "WARNING"]: 

777 raise KeyError("Given log level is not supported") 

778 

779 url = urljoin(self.base_url, "admin/log") 

780 headers = self.headers.copy() 

781 del headers["fiware-service"] 

782 del headers["fiware-servicepath"] 

783 try: 

784 res = self.put(url=url, headers=headers, params=level) 

785 if res.ok: 

786 self.logger.info( 

787 "Loglevel of agent at %s " "changed to '%s'", self.base_url, level 

788 ) 

789 else: 

790 res.raise_for_status() 

791 except requests.RequestException as err: 

792 self.log_error(err=err) 

793 raise