Coverage for filip/clients/ngsi_v2/iota.py: 77%

231 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-11-20 16:54 +0000

1""" 

2IoT-Agent Module for API Client 

3""" 

4from __future__ import annotations 

5 

6import json 

7from copy import deepcopy 

8from typing import List, Dict, Set, TYPE_CHECKING, Union, Optional 

9import warnings 

10from urllib.parse import urljoin 

11import requests 

12from pydantic import AnyHttpUrl 

13from pydantic.type_adapter import TypeAdapter 

14from filip.config import settings 

15from filip.clients.base_http_client import BaseHttpClient 

16from filip.models.base import FiwareHeader 

17from filip.models.ngsi_v2.iot import Device, ServiceGroup 

18 

19from filip.utils.filter import filter_device_list, filter_group_list 

20 

21if TYPE_CHECKING: 

22 from filip.clients.ngsi_v2.cb import ContextBrokerClient 

23 

24 

25class IoTAClient(BaseHttpClient): 

26 """ 

27 Client for FIWARE IoT-Agents. The implementation follows the API 

28 specifications from here: 

29 https://iotagent-node-lib.readthedocs.io/en/latest/ 

30 

31 Args: 

32 url: Url of IoT-Agent 

33 session (requests.Session): 

34 fiware_header (FiwareHeader): fiware service and fiware service path 

35 **kwargs (Optional): Optional arguments that ``request`` takes. 

36 """ 

37 

38 def __init__(self, 

39 url: str = None, 

40 *, 

41 session: requests.Session = None, 

42 fiware_header: FiwareHeader = None, 

43 **kwargs): 

44 # set service url 

45 url = url or settings.IOTA_URL 

46 super().__init__(url=url, 

47 session=session, 

48 fiware_header=fiware_header, 

49 **kwargs) 

50 

51 # ABOUT API 

52 def get_version(self) -> Dict: 

53 """ 

54 Gets version of IoT Agent 

55 

56 Returns: 

57 Dictionary with response 

58 """ 

59 url = urljoin(self.base_url, 'iot/about') 

60 try: 

61 res = self.get(url=url, headers=self.headers) 

62 if res.ok: 

63 return res.json() 

64 res.raise_for_status() 

65 except requests.RequestException as err: 

66 self.logger.error(err) 

67 raise 

68 

69 # SERVICE GROUP API 

70 def post_groups(self, 

71 service_groups: Union[ServiceGroup, List[ServiceGroup]], 

72 update: bool = False): 

73 """ 

74 Creates a set of service groups for the given service and service_path. 

75 The service_group and subservice information will taken from the 

76 headers, overwriting any preexisting values. 

77 

78 Args: 

79 service_groups (list of ServiceGroup): Service groups that will be 

80 posted to the agent's API 

81 update (bool): If service group already exists try to update its 

82 

83 Returns: 

84 None 

85 """ 

86 if not isinstance(service_groups, list): 

87 service_groups = [service_groups] 

88 for group in service_groups: 

89 if group.service: 

90 assert group.service == self.headers['fiware-service'], \ 

91 "Service group service does not math fiware service" 

92 if group.subservice: 

93 assert group.subservice == self.headers['fiware-servicepath'], \ 

94 "Service group subservice does not math fiware service path" 

95 

96 url = urljoin(self.base_url, 'iot/services') 

97 headers = self.headers 

98 data = {'services': [group.model_dump(exclude={'service', 'subservice'}, 

99 exclude_none=True) 

100 for group in service_groups]} 

101 try: 

102 res = self.post(url=url, headers=headers, json=data) 

103 if res.ok: 

104 self.logger.info("Services successfully posted") 

105 elif res.status_code == 409: 

106 self.logger.warning(res.text) 

107 if len(service_groups) > 1: 

108 self.logger.info("Trying to split bulk operation into " 

109 "single operations") 

110 for group in service_groups: 

111 self.post_group(service_group=group, update=update) 

112 elif update is True: 

113 self.update_group(service_group=service_groups[0], 

114 fields=None) 

115 else: 

116 res.raise_for_status() 

117 else: 

118 res.raise_for_status() 

119 except requests.RequestException as err: 

120 self.log_error(err=err, msg=None) 

121 raise 

122 

123 def post_group(self, service_group: ServiceGroup, update: bool = False): 

124 """ 

125 Single service registration but using the bulk operation in background 

126 

127 Args: 

128 service_group (ServiceGroup): Service that will be posted to the 

129 agent's API 

130 update (bool): 

131 

132 Returns: 

133 None 

134 """ 

135 return self.post_groups(service_groups=[service_group], 

136 update=update) 

137 

138 def get_group_list(self) -> List[ServiceGroup]: 

139 r""" 

140 Retrieves service_group groups from the database. If the servicepath 

141 header has the wildcard expression, /\*, all the subservices for the 

142 service_group are returned. The specific subservice parameters are 

143 returned in any other case. 

144 

145 Returns: 

146 

147 """ 

148 url = urljoin(self.base_url, 'iot/services') 

149 headers = self.headers 

150 try: 

151 res = self.get(url=url, headers=headers) 

152 if res.ok: 

153 ta = TypeAdapter(List[ServiceGroup]) 

154 return ta.validate_python(res.json()['services']) 

155 res.raise_for_status() 

156 except requests.RequestException as err: 

157 self.log_error(err=err, msg=None) 

158 raise 

159 

160 def get_group(self, *, resource: str, apikey: str) -> ServiceGroup: 

161 """ 

162 Retrieves service_group groups from the database based on resource and 

163 apikey 

164 Args: 

165 resource: 

166 apikey: 

167 Returns: 

168 

169 """ 

170 groups = self.get_group_list() 

171 groups = filter_group_list(group_list=groups, resources=resource, apikeys=apikey) 

172 if len(groups) == 1: 

173 group = groups[0] 

174 return group 

175 elif len(groups) == 0: 

176 raise KeyError(f"Service group with resource={resource} and apikey={apikey} was not found") 

177 else: 

178 raise NotImplementedError("There is a wierd error, try get_group_list() for debugging") 

179 

180 def update_groups(self, *, 

181 service_groups: Union[ServiceGroup, List[ServiceGroup]], 

182 add: False, 

183 fields: Union[Set[str], List[str]] = None) -> None: 

184 """ 

185 Bulk operation for service group update. 

186 Args: 

187 fields: 

188 service_groups: 

189 add: 

190 

191 Returns: 

192 

193 """ 

194 if not isinstance(service_groups, list): 

195 service_groups = [service_groups] 

196 for group in service_groups: 

197 self.update_group(service_group=group, fields=fields, add=add) 

198 

199 def update_group(self, *, service_group: ServiceGroup, 

200 fields: Union[Set[str], List[str]] = None, 

201 add: bool = True): 

202 """ 

203 Modifies the information for a service group configuration, identified 

204 by the resource and apikey query parameters. Takes a service group body 

205 as the payload. The body does not have to be complete: for incomplete 

206 bodies, just the existing attributes will be updated 

207  

208 Args: 

209 service_group (ServiceGroup): Service to update. 

210 fields: Fields of the service_group to update. If 'None' all allowed 

211 fields will be updated 

212 add: 

213 Returns: 

214 None 

215 """ 

216 if fields: 

217 if isinstance(fields, list): 

218 fields = set(fields) 

219 else: 

220 fields = None 

221 url = urljoin(self.base_url, 'iot/services') 

222 headers = self.headers 

223 params = service_group.model_dump(include={'resource', 'apikey'}) 

224 try: 

225 res = self.put(url=url, 

226 headers=headers, 

227 params=params, 

228 json=service_group.model_dump( 

229 include=fields, 

230 exclude={'service', 'subservice'}, 

231 exclude_none=True)) 

232 if res.ok: 

233 self.logger.info("ServiceGroup updated!") 

234 elif (res.status_code == 404) & (add is True): 

235 self.post_group(service_group=service_group) 

236 else: 

237 res.raise_for_status() 

238 except requests.RequestException as err: 

239 self.log_error(err=err, msg=None) 

240 raise 

241 

242 def delete_group(self, *, resource: str, apikey: str): 

243 """ 

244 Deletes a service group in in the IoT-Agent 

245  

246 Args: 

247 resource: 

248 apikey: 

249 

250 Returns: 

251 

252 """ 

253 url = urljoin(self.base_url, 'iot/services') 

254 headers = self.headers 

255 params = {'resource': resource, 

256 'apikey': apikey} 

257 try: 

258 res = self.delete(url=url, headers=headers, params=params) 

259 if res.ok: 

260 self.logger.info("ServiceGroup with resource: '%s' and " 

261 "apikey: '%s' successfully deleted!", 

262 resource, apikey) 

263 else: 

264 res.raise_for_status() 

265 except requests.RequestException as err: 

266 msg = f"Could not delete ServiceGroup with resource " \ 

267 f"'{resource}' and apikey '{apikey}'!" 

268 self.log_error(err=err, msg=msg) 

269 raise 

270 

271 # DEVICE API 

272 def post_devices(self, *, devices: Union[Device, List[Device]], 

273 update: bool = False) -> None: 

274 """ 

275 Post a device from the device registry. No payload is required 

276 or received. 

277 If a device already exists in can be updated with update = True 

278 Args: 

279 devices (list of Devices): 

280 update (bool): Whether if the device is already existent it 

281 should be updated 

282 Returns: 

283 None 

284 """ 

285 if not isinstance(devices, list): 

286 devices = [devices] 

287 url = urljoin(self.base_url, 'iot/devices') 

288 headers = self.headers 

289 

290 data = {"devices": [json.loads(device.model_dump_json(exclude_none=True) 

291 ) for device in devices]} 

292 try: 

293 res = self.post(url=url, headers=headers, json=data) 

294 if res.ok: 

295 self.logger.info("Devices successfully posted!") 

296 else: 

297 res.raise_for_status() 

298 except requests.RequestException as err: 

299 if update: 

300 return self.update_devices(devices=devices, add=False) 

301 msg = "Could not post devices" 

302 self.log_error(err=err, msg=msg) 

303 raise 

304 

305 def post_device(self, *, device: Device, update: bool = False) -> None: 

306 """ 

307 Post a device configuration to the IoT-Agent 

308  

309 Args: 

310 device: IoT device configuration to send 

311 update: update device if configuration already exists 

312 

313 Returns: 

314 None 

315 """ 

316 return self.post_devices(devices=[device], update=update) 

317 

318 def get_device_list(self, *, 

319 limit: int = None, 

320 offset: int = None, 

321 device_ids: Union[str, List[str]] = None, 

322 entity_names: Union[str, List[str]] = None, 

323 entity_types: Union[str, List[str]] = None) -> List[Device]: 

324 """ 

325 Returns a list of all the devices in the device registry with all 

326 its data. The IoTAgent now only supports "limit" and "offset" as 

327 request parameters. 

328 

329 Args: 

330 limit: 

331 if present, limits the number of devices returned in the 

332 list. Must be a number between 1 and 1000. 

333 offset: 

334 if present, skip that number of devices from the original 

335 query. 

336 device_ids: 

337 List of device_ids. If given, only devices with matching ids 

338 will be returned 

339 entity_names: 

340 The entity_ids of the devices. If given, only the devices 

341 with the specified entity_id will be returned 

342 entity_types: 

343 The entity_type of the device. If given, only the devices 

344 with the specified entity_type will be returned 

345 

346 Returns: 

347 List of matching devices 

348 """ 

349 if limit: 

350 if not 1 < limit < 1000: 

351 self.logger.error("'limit' must be an integer between 1 and " 

352 "1000!") 

353 raise ValueError 

354 url = urljoin(self.base_url, 'iot/devices') 

355 headers = self.headers 

356 params = {key: value for key, value in locals().items() if value is not 

357 None} 

358 try: 

359 res = self.get(url=url, headers=headers, params=params) 

360 if res.ok: 

361 ta = TypeAdapter(List[Device]) 

362 devices = ta.validate_python(res.json()['devices']) 

363 # filter by device_ids, entity_names or entity_types 

364 devices = filter_device_list(devices, 

365 device_ids, 

366 entity_names, 

367 entity_types) 

368 return devices 

369 res.raise_for_status() 

370 except requests.RequestException as err: 

371 self.log_error(err=err, msg=None) 

372 raise 

373 

374 def get_device(self, *, device_id: str) -> Device: 

375 """ 

376 Returns all the information about a particular device. 

377  

378 Args: 

379 device_id: 

380 Raises: 

381 requests.RequestException, if device does not exist 

382 Returns: 

383 Device 

384 

385 """ 

386 url = urljoin(self.base_url, f'iot/devices/{device_id}') 

387 headers = self.headers 

388 try: 

389 res = self.get(url=url, headers=headers) 

390 if res.ok: 

391 return Device.model_validate(res.json()) 

392 res.raise_for_status() 

393 except requests.RequestException as err: 

394 msg = f"Device {device_id} was not found" 

395 self.log_error(err=err, msg=msg) 

396 raise 

397 

398 def update_device(self, *, device: Device, add: bool = True) -> None: 

399 """ 

400 Updates a device from the device registry. 

401 Adds, removes attributes from the device entry and changes 

402 attributes values. 

403 It does not change device settings (endpoint,..) and only adds 

404 attributes to the corresponding entity, their it does not 

405 change any attribute value and does not delete removed attributes 

406 

407 Args: 

408 device: 

409 add (bool): If device not found add it 

410 Returns: 

411 None 

412 """ 

413 url = urljoin(self.base_url, f'iot/devices/{device.device_id}') 

414 headers = self.headers 

415 try: 

416 res = self.put(url=url, headers=headers, json=device.model_dump( 

417 include={'attributes', 'lazy', 'commands', 'static_attributes'}, 

418 exclude_none=True)) 

419 if res.ok: 

420 self.logger.info("Device '%s' successfully updated!", 

421 device.device_id) 

422 elif (res.status_code == 404) & (add is True): 

423 self.post_device(device=device, update=False) 

424 else: 

425 res.raise_for_status() 

426 except requests.RequestException as err: 

427 msg = f"Could not update device '{device.device_id}'" 

428 self.log_error(err=err, msg=msg) 

429 raise 

430 

431 def update_devices(self, *, devices: Union[Device, List[Device]], 

432 add: False) -> None: 

433 """ 

434 Bulk operation for device update. 

435 Args: 

436 devices: 

437 add: 

438 

439 Returns: 

440 

441 """ 

442 if not isinstance(devices, list): 

443 devices = [devices] 

444 for device in devices: 

445 self.update_device(device=device, add=add) 

446 

447 def delete_device(self, *, device_id: str, 

448 cb_url: AnyHttpUrl = settings.CB_URL, 

449 delete_entity: bool = False, 

450 force_entity_deletion: bool = False, 

451 cb_client: ContextBrokerClient = None, 

452 ) -> None: 

453 """ 

454 Remove a device from the device registry. No payload is required 

455 or received. 

456  

457 Args: 

458 device_id: str, ID of Device 

459 delete_entity: False -> Only delete the device entry, 

460 the automatically created and linked 

461 context-entity will continue to 

462 exist in Fiware 

463 True -> Also delete the automatically 

464 created and linked context-entity 

465 If multiple devices are linked to this 

466 entity, this operation is not executed and 

467 an exception is raised 

468 force_entity_deletion: 

469 bool, if delete_entity is true and multiple devices are linked 

470 to the linked entity, delete it and do not raise an error 

471 cb_client (ContextBrokerClient): 

472 Corresponding ContextBrokerClient object for entity manipulation 

473 cb_url (AnyHttpUrl): 

474 Url of the ContextBroker where the entity is found. 

475 This will autogenerate an CB-Client, mirroring the information 

476 of the IoTA-Client, e.g. FiwareHeader, and other headers 

477 (not recommended!) 

478 

479 Returns: 

480 None 

481 """ 

482 url = urljoin(self.base_url, f'iot/devices/{device_id}', ) 

483 headers = self.headers 

484 

485 device = self.get_device(device_id=device_id) 

486 

487 try: 

488 res = self.delete(url=url, headers=headers) 

489 if res.ok: 

490 self.logger.info("Device '%s' successfully deleted!", device_id) 

491 else: 

492 res.raise_for_status() 

493 except requests.RequestException as err: 

494 msg = f"Could not delete device {device_id}!" 

495 self.log_error(err=err, msg=msg) 

496 raise 

497 

498 if delete_entity: 

499 # An entity can technically belong to multiple devices 

500 # Only delete the entity if 

501 devices = self.get_device_list(entity_names=[device.entity_name]) 

502 

503 # Zero because we count the remaining devices 

504 if len(devices) > 0 and not force_entity_deletion: 

505 raise Exception(f"The corresponding entity to the device " 

506 f"{device_id} was not deleted because it is " 

507 f"linked to multiple devices. ") 

508 else: 

509 try: 

510 from filip.clients.ngsi_v2 import ContextBrokerClient 

511 

512 if cb_client: 

513 cb_client_local = deepcopy(cb_client) 

514 else: 

515 warnings.warn("No `ContextBrokerClient` " 

516 "object providesd! Will try to generate " 

517 "one. This usage is not recommended.") 

518 

519 cb_client_local = ContextBrokerClient( 

520 url=cb_url, 

521 fiware_header=self.fiware_headers, 

522 headers=headers) 

523 

524 cb_client_local.delete_entity( 

525 entity_id=device.entity_name, 

526 entity_type=device.entity_type) 

527 

528 except requests.RequestException as err: 

529 # Do not throw an error 

530 # It is only important that the entity does not exist after 

531 # this methode, not if this methode actively deleted it 

532 pass 

533 

534 cb_client_local.close() 

535 

536 def patch_device(self, 

537 device: Device, 

538 patch_entity: bool = True, 

539 cb_client: ContextBrokerClient = None, 

540 cb_url: AnyHttpUrl = settings.CB_URL) -> None: 

541 """ 

542 Updates a device state in Fiware, if the device does not exists it 

543 is created, else its values are updated. 

544 If the device settings (endpoint,..) were changed the device and 

545 entity are deleted and re-added. 

546 

547 If patch_entity is true the corresponding entity in the ContextBroker is 

548 also correctly updated. Else only new attributes are added there. 

549 

550 Args: 

551 device (Device): Device to be posted to /updated in Fiware 

552 patch_entity (bool): If true the corresponding entity is 

553 completely synced 

554 cb_client (ContextBrokerClient): 

555 Corresponding ContextBrokerClient object for entity manipulation 

556 cb_url (AnyHttpUrl): 

557 Url of the ContextBroker where the entity is found. 

558 This will autogenerate an CB-Client, mirroring the information 

559 of the IoTA-Client, e.g. FiwareHeader, and other headers 

560 (not recommended!) 

561 

562 Returns: 

563 None 

564 """ 

565 try: 

566 live_device = self.get_device(device_id=device.device_id) 

567 except requests.RequestException: 

568 # device does not exist yet, post it 

569 self.post_device(device=device) 

570 return 

571 

572 # if the device settings were changed we need to delete the device 

573 # and repost it 

574 settings_dict = {"device_id", "service", "service_path", 

575 "entity_name", "entity_type", 

576 "timestamp", "apikey", "endpoint", 

577 "protocol", "transport", 

578 "expressionLanguage"} 

579 

580 live_settings = live_device.model_dump(include=settings_dict) 

581 new_settings = device.model_dump(include=settings_dict) 

582 

583 if not live_settings == new_settings: 

584 self.delete_device(device_id=device.device_id, 

585 delete_entity=True, 

586 force_entity_deletion=True, 

587 cb_client=cb_client) 

588 self.post_device(device=device) 

589 return 

590 

591 # We are at a state where the device exists, but only attributes were 

592 # changed. 

593 # we need to update the device, and the context entry separately, 

594 # as update device only takes over a part of the changes to the 

595 # ContextBroker. 

596 

597 # update device 

598 self.update_device(device=device) 

599 

600 # update context entry 

601 # 1. build context entity from information in device 

602 # 2. patch it 

603 from filip.models.ngsi_v2.context import \ 

604 ContextEntity, NamedContextAttribute 

605 

606 def build_context_entity_from_device(device: Device) -> ContextEntity: 

607 from filip.models.base import DataType 

608 entity = ContextEntity(id=device.entity_name, 

609 type=device.entity_type) 

610 

611 for command in device.commands: 

612 entity.add_attributes([ 

613 # Command attribute will be registered by the device_update 

614 NamedContextAttribute( 

615 name=f"{command.name}_info", 

616 type=DataType.COMMAND_RESULT 

617 ), 

618 NamedContextAttribute( 

619 name=f"{command.name}_status", 

620 type=DataType.COMMAND_STATUS 

621 ) 

622 ]) 

623 for attribute in device.attributes: 

624 entity.add_attributes([ 

625 NamedContextAttribute( 

626 name=attribute.name, 

627 type=DataType.STRUCTUREDVALUE, 

628 metadata=attribute.metadata 

629 ) 

630 ]) 

631 for static_attribute in device.static_attributes: 

632 entity.add_attributes([ 

633 NamedContextAttribute( 

634 name=static_attribute.name, 

635 type=static_attribute.type, 

636 value=static_attribute.value, 

637 metadata=static_attribute.metadata 

638 ) 

639 ]) 

640 return entity 

641 

642 if patch_entity: 

643 from filip.clients.ngsi_v2 import ContextBrokerClient 

644 if cb_client: 

645 cb_client_local = deepcopy(cb_client) 

646 else: 

647 warnings.warn("No `ContextBrokerClient` object provided! " 

648 "Will try to generate one. " 

649 "This usage is not recommended.") 

650 

651 cb_client_local = ContextBrokerClient( 

652 url=cb_url, 

653 fiware_header=self.fiware_headers, 

654 headers=self.headers) 

655 

656 cb_client_local.patch_entity( 

657 entity=build_context_entity_from_device(device)) 

658 cb_client_local.close() 

659 

660 def does_device_exists(self, device_id: str) -> bool: 

661 """ 

662 Test if a device with the given id exists in Fiware 

663 Args: 

664 device_id (str) 

665 Returns: 

666 bool 

667 """ 

668 try: 

669 self.get_device(device_id=device_id) 

670 return True 

671 except requests.RequestException as err: 

672 if not err.response.status_code == 404: 

673 raise 

674 return False 

675 

676 # LOG API 

677 def get_loglevel_of_agent(self): 

678 """ 

679 Get current loglevel of agent 

680 Returns: 

681 

682 """ 

683 url = urljoin(self.base_url, 'admin/log') 

684 headers = self.headers.copy() 

685 del headers['fiware-service'] 

686 del headers['fiware-servicepath'] 

687 try: 

688 res = self.get(url=url, headers=headers) 

689 if res.ok: 

690 return res.json()['level'] 

691 res.raise_for_status() 

692 except requests.RequestException as err: 

693 self.log_error(err=err) 

694 raise 

695 

696 def change_loglevel_of_agent(self, level: str): 

697 """ 

698 Change current loglevel of agent 

699  

700 Args: 

701 level: 

702 

703 Returns: 

704 

705 """ 

706 level = level.upper() 

707 if level not in ['INFO', 'ERROR', 'FATAL', 'DEBUG', 'WARNING']: 

708 raise KeyError("Given log level is not supported") 

709 

710 url = urljoin(self.base_url, 'admin/log') 

711 headers = self.headers.copy() 

712 del headers['fiware-service'] 

713 del headers['fiware-servicepath'] 

714 try: 

715 res = self.put(url=url, headers=headers, params=level) 

716 if res.ok: 

717 self.logger.info("Loglevel of agent at %s " 

718 "changed to '%s'", self.base_url, level) 

719 else: 

720 res.raise_for_status() 

721 except requests.RequestException as err: 

722 self.log_error(err=err) 

723 raise