Coverage for filip/clients/ngsi_v2/iota.py: 83%

247 statements  

« prev     ^ index     » next       coverage.py v7.10.5, created at 2025-08-26 14:36 +0000

1""" 

2IoT-Agent Module for API Client 

3""" 

4 

5from __future__ import annotations 

6 

7import json 

8from copy import deepcopy 

9from typing import List, Dict, Set, TYPE_CHECKING, Union, Optional 

10import warnings 

11from urllib.parse import urljoin 

12import requests 

13from pydantic import AnyHttpUrl, ValidationError 

14from pydantic.type_adapter import TypeAdapter 

15from filip.config import settings 

16from filip.clients.base_http_client import BaseHttpClient 

17from filip.clients.exceptions import BaseHttpClientException 

18from filip.models.base import FiwareHeader 

19from filip.models.ngsi_v2.iot import ( 

20 Device, 

21 ServiceGroup, 

22 DeviceValidationList, 

23 DeviceList, 

24) 

25 

26from filip.utils.filter import filter_device_list, filter_group_list 

27 

28if TYPE_CHECKING: 

29 from filip.clients.ngsi_v2.cb import ContextBrokerClient 

30 

31 

32class IoTAClient(BaseHttpClient): 

33 """ 

34 Client for FIWARE IoT-Agents. The implementation follows the API 

35 specifications from here: 

36 https://iotagent-node-lib.readthedocs.io/en/latest/ 

37 

38 Args: 

39 url: Url of IoT-Agent 

40 session (requests.Session): 

41 fiware_header (FiwareHeader): fiware service and fiware service path 

42 **kwargs (Optional): Optional arguments that ``request`` takes. 

43 """ 

44 

45 def __init__( 

46 self, 

47 url: str = None, 

48 *, 

49 session: requests.Session = None, 

50 fiware_header: FiwareHeader = None, 

51 **kwargs, 

52 ): 

53 # set service url 

54 url = url or settings.IOTA_URL 

55 super().__init__( 

56 url=url, session=session, fiware_header=fiware_header, **kwargs 

57 ) 

58 

59 # ABOUT API 

60 def get_version(self) -> Dict: 

61 """ 

62 Gets version of IoT Agent 

63 

64 Returns: 

65 Dictionary with response 

66 """ 

67 url = urljoin(self.base_url, "iot/about") 

68 try: 

69 res = self.get(url=url, headers=self.headers) 

70 if res.ok: 

71 return res.json() 

72 res.raise_for_status() 

73 except requests.RequestException as err: 

74 self.logger.error(err) 

75 msg = "Could not retrieve version because of following reason: " + str( 

76 err.args[0] 

77 ) 

78 raise BaseHttpClientException(message=msg, response=err.response) from err 

79 

80 # SERVICE GROUP API 

81 def post_groups( 

82 self, 

83 service_groups: Union[ServiceGroup, List[ServiceGroup]], 

84 update: bool = False, 

85 ): 

86 """ 

87 Creates a set of service groups for the given service and service_path. 

88 The service_group and subservice information will taken from the 

89 headers, overwriting any preexisting values. 

90 

91 Args: 

92 service_groups (list of ServiceGroup): Service groups that will be 

93 posted to the agent's API 

94 update (bool): If service group already exists try to update its 

95 

96 Returns: 

97 None 

98 """ 

99 if not isinstance(service_groups, list): 

100 service_groups = [service_groups] 

101 for group in service_groups: 

102 if group.service: 

103 assert ( 

104 group.service == self.headers["fiware-service"] 

105 ), "Service group service does not math fiware service" 

106 if group.subservice: 

107 assert ( 

108 group.subservice == self.headers["fiware-servicepath"] 

109 ), "Service group subservice does not match fiware service path" 

110 

111 url = urljoin(self.base_url, "iot/services") 

112 headers = self.headers 

113 data = { 

114 "services": [ 

115 group.model_dump(exclude={"service", "subservice"}, exclude_none=True) 

116 for group in service_groups 

117 ] 

118 } 

119 try: 

120 res = self.post(url=url, headers=headers, json=data) 

121 if res.ok: 

122 self.logger.info("Services successfully posted") 

123 elif res.status_code == 409: 

124 self.logger.warning(res.text) 

125 if len(service_groups) > 1: 

126 self.logger.info( 

127 "Trying to split bulk operation into " "single operations" 

128 ) 

129 for group in service_groups: 

130 self.post_group(service_group=group, update=update) 

131 elif update is True: 

132 self.update_group(service_group=service_groups[0], fields=None) 

133 else: 

134 res.raise_for_status() 

135 else: 

136 res.raise_for_status() 

137 except requests.RequestException as err: 

138 self.logger.error(err) 

139 msg = "Could not post group because of following reason: " + str( 

140 err.args[0] 

141 ) 

142 raise BaseHttpClientException(message=msg, response=err.response) from err 

143 

144 def post_group(self, service_group: ServiceGroup, update: bool = False): 

145 """ 

146 Single service registration but using the bulk operation in background 

147 

148 Args: 

149 service_group (ServiceGroup): Service that will be posted to the 

150 agent's API 

151 update (bool): 

152 

153 Returns: 

154 None 

155 """ 

156 return self.post_groups(service_groups=[service_group], update=update) 

157 

158 def get_group_list(self) -> List[ServiceGroup]: 

159 r""" 

160 Retrieves service_group groups from the database. If the servicepath 

161 header has the wildcard expression, /\*, all the subservices for the 

162 service_group are returned. The specific subservice parameters are 

163 returned in any other case. 

164 

165 Returns: 

166 

167 """ 

168 url = urljoin(self.base_url, "iot/services") 

169 headers = self.headers 

170 try: 

171 res = self.get(url=url, headers=headers) 

172 if res.ok: 

173 ta = TypeAdapter(List[ServiceGroup]) 

174 return ta.validate_python(res.json()["services"]) 

175 res.raise_for_status() 

176 except requests.RequestException as err: 

177 self.logger.error(err) 

178 msg = "Could not retrieve group list because of following reason: " + str( 

179 err.args[0] 

180 ) 

181 raise BaseHttpClientException(message=msg, response=err.response) from err 

182 

183 def get_group(self, *, resource: str, apikey: str) -> ServiceGroup: 

184 """ 

185 Retrieves service_group groups from the database based on resource and 

186 apikey 

187 Args: 

188 resource: 

189 apikey: 

190 Returns: 

191 

192 """ 

193 groups = self.get_group_list() 

194 groups = filter_group_list( 

195 group_list=groups, resources=resource, apikeys=apikey 

196 ) 

197 if len(groups) == 1: 

198 group = groups[0] 

199 return group 

200 elif len(groups) == 0: 

201 raise KeyError( 

202 f"Service group with resource={resource} and apikey={apikey} was not found" 

203 ) 

204 else: 

205 raise NotImplementedError( 

206 "There is a wierd error, try get_group_list() for debugging" 

207 ) 

208 

209 def update_groups( 

210 self, 

211 *, 

212 service_groups: Union[ServiceGroup, List[ServiceGroup]], 

213 add: False, 

214 fields: Union[Set[str], List[str]] = None, 

215 ) -> None: 

216 """ 

217 Bulk operation for service group update. 

218 Args: 

219 fields: 

220 service_groups: 

221 add: 

222 

223 Returns: 

224 

225 """ 

226 if not isinstance(service_groups, list): 

227 service_groups = [service_groups] 

228 for group in service_groups: 

229 self.update_group(service_group=group, fields=fields, add=add) 

230 

231 def update_group( 

232 self, 

233 *, 

234 service_group: ServiceGroup, 

235 fields: Union[Set[str], List[str]] = None, 

236 add: bool = True, 

237 ): 

238 """ 

239 Modifies the information for a service group configuration, identified 

240 by the resource and apikey query parameters. Takes a service group body 

241 as the payload. The body does not have to be complete: for incomplete 

242 bodies, just the existing attributes will be updated 

243 

244 Args: 

245 service_group (ServiceGroup): Service to update. 

246 fields: Fields of the service_group to update. If 'None' all allowed 

247 fields will be updated 

248 add: 

249 Returns: 

250 None 

251 """ 

252 if fields: 

253 if isinstance(fields, list): 

254 fields = set(fields) 

255 else: 

256 fields = None 

257 url = urljoin(self.base_url, "iot/services") 

258 headers = self.headers 

259 params = service_group.model_dump(include={"resource", "apikey"}) 

260 try: 

261 res = self.put( 

262 url=url, 

263 headers=headers, 

264 params=params, 

265 json=service_group.model_dump( 

266 include=fields, exclude={"service", "subservice"}, exclude_none=True 

267 ), 

268 ) 

269 if res.ok: 

270 self.logger.info("ServiceGroup updated!") 

271 elif (res.status_code == 404) & (add is True): 

272 self.post_group(service_group=service_group) 

273 else: 

274 res.raise_for_status() 

275 except requests.RequestException as err: 

276 self.logger.error(err) 

277 msg = "Could not update group because of following reason: " + str( 

278 err.args[0] 

279 ) 

280 raise BaseHttpClientException(message=msg, response=err.response) from err 

281 

282 def delete_group(self, *, resource: str, apikey: str): 

283 """ 

284 Deletes a service group in in the IoT-Agent 

285 

286 Args: 

287 resource: 

288 apikey: 

289 

290 Returns: 

291 

292 """ 

293 url = urljoin(self.base_url, "iot/services") 

294 headers = self.headers 

295 params = {"resource": resource, "apikey": apikey} 

296 try: 

297 res = self.delete(url=url, headers=headers, params=params) 

298 if res.ok: 

299 self.logger.info( 

300 "ServiceGroup with resource: '%s' and " 

301 "apikey: '%s' successfully deleted!", 

302 resource, 

303 apikey, 

304 ) 

305 else: 

306 res.raise_for_status() 

307 except requests.RequestException as err: 

308 self.logger.error(err) 

309 msg = ( 

310 f"Could not delete ServiceGroup with resource " 

311 f"'{resource}' and apikey '{apikey}' because of following reason: {str(err.args[0])}" 

312 ) 

313 raise BaseHttpClientException(message=msg, response=err.response) from err 

314 

315 # DEVICE API 

316 def post_devices( 

317 self, *, devices: Union[Device, List[Device]], update: bool = False 

318 ) -> None: 

319 """ 

320 Post a device from the device registry. No payload is required 

321 or received. 

322 If a device already exists in can be updated with update = True 

323 Args: 

324 devices (list of Devices): 

325 update (bool): Whether if the device is already existent it 

326 should be updated 

327 Returns: 

328 None 

329 """ 

330 if not isinstance(devices, list): 

331 devices = [devices] 

332 url = urljoin(self.base_url, "iot/devices") 

333 headers = self.headers 

334 

335 data = { 

336 "devices": [ 

337 json.loads(device.model_dump_json(exclude_none=True)) 

338 for device in devices 

339 ] 

340 } 

341 try: 

342 res = self.post(url=url, headers=headers, json=data) 

343 if res.ok: 

344 self.logger.info("Devices successfully posted!") 

345 else: 

346 res.raise_for_status() 

347 except requests.RequestException as err: 

348 if update: 

349 return self.update_devices(devices=devices, add=False) 

350 self.logger.error(err) 

351 msg = "Could not post devices because of following reason: " + str( 

352 err.args[0] 

353 ) 

354 raise BaseHttpClientException(message=msg, response=err.response) from err 

355 

356 def post_device(self, *, device: Device, update: bool = False) -> None: 

357 """ 

358 Post a device configuration to the IoT-Agent 

359 

360 Args: 

361 device: IoT device configuration to send 

362 update: update device if configuration already exists 

363 

364 Returns: 

365 None 

366 """ 

367 return self.post_devices(devices=[device], update=update) 

368 

369 def get_device_list( 

370 self, 

371 *, 

372 limit: int = None, 

373 offset: int = None, 

374 device_ids: Union[str, List[str]] = None, 

375 entity_names: Union[str, List[str]] = None, 

376 entity_types: Union[str, List[str]] = None, 

377 include_invalid: bool = False, 

378 ) -> Union[List[Union[Device, DeviceList]], DeviceValidationList]: 

379 """ 

380 Returns a list of all the devices in the device registry with all 

381 its data. The IoTAgent now only supports "limit" and "offset" as 

382 request parameters. 

383 

384 Args: 

385 limit: 

386 if present, limits the number of devices returned in the 

387 list. Must be a number between 1 and 1000. 

388 offset: 

389 if present, skip that number of devices from the original 

390 query. 

391 device_ids: 

392 List of device_ids. If given, only devices with matching ids 

393 will be returned 

394 entity_names: 

395 The entity_ids of the devices. If given, only the devices 

396 with the specified entity_id will be returned 

397 entity_types: 

398 The entity_type of the device. If given, only the devices 

399 with the specified entity_type will be returned 

400 include_invalid: Specify if the returned list should also contain a list of invalid device IDs or not. 

401 Returns: 

402 List of matching devices 

403 """ 

404 params = {} 

405 if limit: 

406 if not 1 < limit < 1000: 

407 self.logger.error("'limit' must be an integer between 1 and " "1000!") 

408 raise ValueError 

409 else: 

410 params["limit"] = limit 

411 if offset: 

412 if not isinstance(offset, int): 

413 self.logger.error("'offset' must be an integer!") 

414 raise ValueError 

415 else: 

416 params["offset"] = offset 

417 url = urljoin(self.base_url, "iot/devices") 

418 headers = self.headers 

419 try: 

420 res = self.get(url=url, headers=headers, params=params) 

421 if res.ok: 

422 if include_invalid: 

423 valid_devices = [] 

424 invalid_devices = [] 

425 ta = TypeAdapter(Device) 

426 for device in res.json()["devices"]: 

427 try: 

428 valid_device = ta.validate_python(device) 

429 valid_devices.append(valid_device) 

430 except ValidationError: 

431 invalid_devices.append(device.get("device_id")) 

432 valid_devices = filter_device_list( 

433 valid_devices, device_ids, entity_names, entity_types 

434 ) 

435 invalid_devices = filter_device_list( 

436 invalid_devices, device_ids, entity_names, entity_types 

437 ) 

438 return DeviceValidationList.model_validate( 

439 { 

440 "devices": valid_devices, 

441 "invalid_devices": invalid_devices, 

442 } 

443 ) 

444 else: 

445 return filter_device_list( 

446 devices=DeviceList.model_validate( 

447 {"devices": res.json()["devices"]} 

448 ).devices, 

449 device_ids=device_ids, 

450 entity_names=entity_names, 

451 entity_types=entity_types, 

452 ) 

453 res.raise_for_status() 

454 except requests.RequestException as err: 

455 self.logger.error(err) 

456 msg = ( 

457 "Not able to retrieve the device list because of the following reason:" 

458 + str(err.args[0]) 

459 ) 

460 raise BaseHttpClientException(message=msg, response=err.response) from err 

461 

462 def get_device(self, *, device_id: str) -> Device: 

463 """ 

464 Returns all the information about a particular device. 

465 

466 Args: 

467 device_id: 

468 Raises: 

469 requests.RequestException, if device does not exist 

470 Returns: 

471 Device 

472 

473 """ 

474 url = urljoin(self.base_url, f"iot/devices/{device_id}") 

475 headers = self.headers 

476 try: 

477 res = self.get(url=url, headers=headers) 

478 if res.ok: 

479 return Device.model_validate(res.json()) 

480 res.raise_for_status() 

481 except requests.RequestException as err: 

482 self.logger.error(err) 

483 

484 msg = f"Device '{device_id}' was not found because of the following reason: {str(err.args[0])}" 

485 raise BaseHttpClientException(message=msg, response=err.response) from err 

486 

487 def update_device(self, *, device: Device, add: bool = True) -> None: 

488 """ 

489 Updates a device from the device registry. 

490 Adds, removes attributes from the device entry and changes 

491 attributes values. 

492 It does not change device settings (endpoint,..) and only adds 

493 attributes to the corresponding entity, their it does not 

494 change any attribute value and does not delete removed attributes 

495 

496 Args: 

497 device: 

498 add (bool): If device not found add it 

499 Returns: 

500 None 

501 """ 

502 url = urljoin(self.base_url, f"iot/devices/{device.device_id}") 

503 headers = self.headers 

504 try: 

505 res = self.put( 

506 url=url, 

507 headers=headers, 

508 json=device.model_dump( 

509 include={"attributes", "lazy", "commands", "static_attributes"}, 

510 exclude_none=True, 

511 ), 

512 ) 

513 if res.ok: 

514 self.logger.info("Device '%s' successfully updated!", device.device_id) 

515 elif (res.status_code == 404) & (add is True): 

516 self.post_device(device=device, update=False) 

517 else: 

518 res.raise_for_status() 

519 except requests.RequestException as err: 

520 self.logger.error(err) 

521 msg = f"Could not update device '{device.device_id}' because of the following reason: {str(err.args[0])} " 

522 raise BaseHttpClientException(message=msg, response=err.response) from err 

523 

524 def update_devices( 

525 self, *, devices: Union[Device, List[Device]], add: False 

526 ) -> None: 

527 """ 

528 Bulk operation for device update. 

529 Args: 

530 devices: 

531 add: 

532 

533 Returns: 

534 

535 """ 

536 if not isinstance(devices, list): 

537 devices = [devices] 

538 for device in devices: 

539 self.update_device(device=device, add=add) 

540 

541 def delete_device( 

542 self, 

543 *, 

544 device_id: str, 

545 cb_url: AnyHttpUrl = settings.CB_URL, 

546 delete_entity: bool = False, 

547 force_entity_deletion: bool = False, 

548 cb_client: ContextBrokerClient = None, 

549 ) -> None: 

550 """ 

551 Remove a device from the device registry. No payload is required 

552 or received. 

553 

554 Args: 

555 device_id: str, ID of Device 

556 delete_entity: False -> Only delete the device entry, 

557 the automatically created and linked 

558 context-entity will continue to 

559 exist in Fiware 

560 True -> Also delete the automatically 

561 created and linked context-entity 

562 If multiple devices are linked to this 

563 entity, this operation is not executed and 

564 an exception is raised 

565 force_entity_deletion: 

566 bool, if delete_entity is true and multiple devices are linked 

567 to the linked entity, delete it and do not raise an error 

568 cb_client (ContextBrokerClient): 

569 Corresponding ContextBrokerClient object for entity manipulation 

570 cb_url (AnyHttpUrl): 

571 Url of the ContextBroker where the entity is found. 

572 This will autogenerate an CB-Client, mirroring the information 

573 of the IoTA-Client, e.g. FiwareHeader, and other headers 

574 (not recommended!) 

575 

576 Returns: 

577 None 

578 """ 

579 url = urljoin( 

580 self.base_url, 

581 f"iot/devices/{device_id}", 

582 ) 

583 headers = self.headers 

584 

585 device = self.get_device(device_id=device_id) 

586 

587 try: 

588 res = self.delete(url=url, headers=headers) 

589 if res.ok: 

590 self.logger.info("Device '%s' successfully deleted!", device_id) 

591 else: 

592 res.raise_for_status() 

593 except requests.RequestException as err: 

594 msg = f"Could not delete device {device_id}!" 

595 raise BaseHttpClientException(message=msg, response=err.response) from err 

596 

597 if delete_entity: 

598 # An entity can technically belong to multiple devices 

599 # Only delete the entity if 

600 devices = self.get_device_list(entity_names=[device.entity_name]) 

601 

602 # Zero because we count the remaining devices 

603 if len(devices) > 0 and not force_entity_deletion: 

604 raise Exception( 

605 f"The corresponding entity to the device " 

606 f"{device_id} was not deleted because it is " 

607 f"linked to multiple devices. " 

608 ) 

609 else: 

610 cb_client_local = None 

611 try: 

612 from filip.clients.ngsi_v2 import ContextBrokerClient 

613 

614 if cb_client: 

615 cb_client_local = deepcopy(cb_client) 

616 else: 

617 warnings.warn( 

618 "No `ContextBrokerClient` " 

619 "object provided! Will try to generate " 

620 "one. This usage is not recommended." 

621 ) 

622 

623 cb_client_local = ContextBrokerClient( 

624 url=cb_url, 

625 fiware_header=self.fiware_headers, 

626 headers=headers, 

627 ) 

628 

629 cb_client_local.delete_entity( 

630 entity_id=device.entity_name, entity_type=device.entity_type 

631 ) 

632 

633 except requests.RequestException as err: 

634 # Do not throw an error 

635 # It is only important that the entity does not exist after 

636 # this methode, not if this methode actively deleted it 

637 pass 

638 

639 if cb_client_local: 

640 cb_client_local.close() 

641 

642 def patch_device( 

643 self, 

644 device: Device, 

645 patch_entity: bool = True, 

646 cb_client: ContextBrokerClient = None, 

647 cb_url: AnyHttpUrl = settings.CB_URL, 

648 ) -> None: 

649 """ 

650 Updates a device state in Fiware, if the device does not exist it 

651 is created, else its values are updated. 

652 If the device settings were changed the device and 

653 entity are deleted and re-added. 

654 

655 If patch_entity is true the corresponding entity in the ContextBroker is 

656 also correctly updated. Else only new attributes are added there. 

657 

658 Args: 

659 device (Device): Device to be posted to /updated in Fiware 

660 patch_entity (bool): If true the corresponding entity is 

661 completely synced 

662 cb_client (ContextBrokerClient): 

663 Corresponding ContextBrokerClient object for entity manipulation 

664 cb_url (AnyHttpUrl): 

665 Url of the ContextBroker where the entity is found. 

666 This will autogenerate an CB-Client, mirroring the information 

667 of the IoTA-Client, e.g. FiwareHeader, and other headers 

668 (not recommended!) 

669 

670 Returns: 

671 None 

672 """ 

673 try: 

674 live_device = self.get_device(device_id=device.device_id) 

675 except requests.RequestException: 

676 # device does not exist yet, post it 

677 self.post_device(device=device) 

678 return 

679 

680 # if the device settings were changed we need to delete the device 

681 # and repost it 

682 settings_dict = { 

683 "device_id", 

684 "service", 

685 "service_path", 

686 "entity_name", 

687 "entity_type", 

688 "timestamp", 

689 "apikey", 

690 "endpoint", 

691 "protocol", 

692 "transport", 

693 "expressionLanguage", 

694 } 

695 

696 live_settings = live_device.model_dump(include=settings_dict) 

697 new_settings = device.model_dump(include=settings_dict) 

698 

699 if not live_settings == new_settings: 

700 self.delete_device( 

701 device_id=device.device_id, 

702 delete_entity=True, 

703 force_entity_deletion=True, 

704 cb_client=cb_client, 

705 ) 

706 self.post_device(device=device) 

707 return 

708 

709 # We are at a state where the device exists, but only attributes were 

710 # changed. 

711 # we need to update the device, and the context entry separately, 

712 # as update device only takes over a part of the changes to the 

713 # ContextBroker. 

714 

715 # update device 

716 self.update_device(device=device) 

717 

718 # update context entry 

719 # 1. build context entity from information in device 

720 # 2. patch it 

721 from filip.models.ngsi_v2.context import ContextEntity, NamedContextAttribute 

722 

723 def build_context_entity_from_device(device: Device) -> ContextEntity: 

724 from filip.models.base import DataType 

725 

726 entity = ContextEntity(id=device.entity_name, type=device.entity_type) 

727 

728 for command in device.commands: 

729 entity.add_attributes( 

730 [ 

731 # Command attribute will be registered by the device_update 

732 NamedContextAttribute( 

733 name=f"{command.name}_info", type=DataType.COMMAND_RESULT 

734 ), 

735 NamedContextAttribute( 

736 name=f"{command.name}_status", type=DataType.COMMAND_STATUS 

737 ), 

738 ] 

739 ) 

740 for attribute in device.attributes: 

741 entity.add_attributes( 

742 [ 

743 NamedContextAttribute( 

744 name=attribute.name, 

745 type=DataType.STRUCTUREDVALUE, 

746 metadata=attribute.metadata, 

747 ) 

748 ] 

749 ) 

750 for static_attribute in device.static_attributes: 

751 entity.add_attributes( 

752 [ 

753 NamedContextAttribute( 

754 name=static_attribute.name, 

755 type=static_attribute.type, 

756 value=static_attribute.value, 

757 metadata=static_attribute.metadata, 

758 ) 

759 ] 

760 ) 

761 return entity 

762 

763 if patch_entity: 

764 from filip.clients.ngsi_v2 import ContextBrokerClient 

765 

766 if cb_client: 

767 cb_client_local = deepcopy(cb_client) 

768 else: 

769 warnings.warn( 

770 "No `ContextBrokerClient` object provided! " 

771 "Will try to generate one. " 

772 "This usage is not recommended." 

773 ) 

774 

775 cb_client_local = ContextBrokerClient( 

776 url=cb_url, fiware_header=self.fiware_headers, headers=self.headers 

777 ) 

778 

779 cb_client_local.override_entity( 

780 entity=build_context_entity_from_device(device) 

781 ) 

782 cb_client_local.close() 

783 

784 def does_device_exists(self, device_id: str) -> bool: 

785 """ 

786 Test if a device with the given id exists in Fiware 

787 Args: 

788 device_id (str) 

789 Returns: 

790 bool 

791 """ 

792 try: 

793 self.get_device(device_id=device_id) 

794 return True 

795 except requests.RequestException as err: 

796 if err.response is None or not err.response.status_code == 404: 

797 self.logger.error(err) 

798 msg = f"Could not check device status because of the following reason: {str(err.args[0])}" 

799 raise BaseHttpClientException( 

800 message=msg, response=err.response 

801 ) from err 

802 return False 

803 

804 # LOG API 

805 def get_loglevel_of_agent(self): 

806 """ 

807 Get current loglevel of agent 

808 Returns: 

809 

810 """ 

811 url = urljoin(self.base_url, "admin/log") 

812 headers = self.headers.copy() 

813 del headers["fiware-service"] 

814 del headers["fiware-servicepath"] 

815 try: 

816 res = self.get(url=url, headers=headers) 

817 if res.ok: 

818 return res.json()["level"] 

819 res.raise_for_status() 

820 except requests.RequestException as err: 

821 self.log_error(err=err) 

822 msg = f"Could not check for loglevel status because of the following reason: {str(err.args[0])}" 

823 raise BaseHttpClientException(message=msg, response=err.response) from err 

824 

825 def change_loglevel_of_agent(self, level: str): 

826 """ 

827 Change current loglevel of agent 

828 

829 Args: 

830 level: 

831 

832 Returns: 

833 

834 """ 

835 level = level.upper() 

836 if level not in ["INFO", "ERROR", "FATAL", "DEBUG", "WARNING"]: 

837 raise KeyError("Given log level is not supported") 

838 

839 url = urljoin(self.base_url, "admin/log") 

840 headers = self.headers.copy() 

841 new_loglevel = {"level": level} 

842 del headers["fiware-service"] 

843 del headers["fiware-servicepath"] 

844 try: 

845 res = self.put(url=url, headers=headers, params=new_loglevel) 

846 if res.ok: 

847 self.logger.info( 

848 "Loglevel of agent at %s " "changed to '%s'", 

849 self.base_url, 

850 new_loglevel, 

851 ) 

852 else: 

853 res.raise_for_status() 

854 except requests.RequestException as err: 

855 self.log_error(err=err) 

856 msg = f"Could not change loglevel because of the following reason: {str(err.args[0])}" 

857 raise BaseHttpClientException(message=msg, response=err.response) from err