Coverage for filip/clients/ngsi_v2/iota.py: 82%

239 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-05 11:07 +0000

1""" 

2IoT-Agent Module for API Client 

3""" 

4 

5from __future__ import annotations 

6 

7import json 

8from copy import deepcopy 

9from typing import List, Dict, Set, TYPE_CHECKING, Union, Optional 

10import warnings 

11from urllib.parse import urljoin 

12import requests 

13from pydantic import AnyHttpUrl 

14from pydantic.type_adapter import TypeAdapter 

15from filip.config import settings 

16from filip.clients.base_http_client import BaseHttpClient 

17from filip.clients.exceptions import BaseHttpClientException 

18from filip.models.base import FiwareHeader 

19from filip.models.ngsi_v2.iot import Device, ServiceGroup 

20 

21from filip.utils.filter import filter_device_list, filter_group_list 

22 

23if TYPE_CHECKING: 

24 from filip.clients.ngsi_v2.cb import ContextBrokerClient 

25 

26 

27class IoTAClient(BaseHttpClient): 

28 """ 

29 Client for FIWARE IoT-Agents. The implementation follows the API 

30 specifications from here: 

31 https://iotagent-node-lib.readthedocs.io/en/latest/ 

32 

33 Args: 

34 url: Url of IoT-Agent 

35 session (requests.Session): 

36 fiware_header (FiwareHeader): fiware service and fiware service path 

37 **kwargs (Optional): Optional arguments that ``request`` takes. 

38 """ 

39 

40 def __init__( 

41 self, 

42 url: str = None, 

43 *, 

44 session: requests.Session = None, 

45 fiware_header: FiwareHeader = None, 

46 **kwargs, 

47 ): 

48 # set service url 

49 url = url or settings.IOTA_URL 

50 super().__init__( 

51 url=url, session=session, fiware_header=fiware_header, **kwargs 

52 ) 

53 

54 # ABOUT API 

55 def get_version(self) -> Dict: 

56 """ 

57 Gets version of IoT Agent 

58 

59 Returns: 

60 Dictionary with response 

61 """ 

62 url = urljoin(self.base_url, "iot/about") 

63 try: 

64 res = self.get(url=url, headers=self.headers) 

65 if res.ok: 

66 return res.json() 

67 res.raise_for_status() 

68 except requests.RequestException as err: 

69 self.logger.error(err) 

70 msg = "Could not retrieve version because of following reason: " + str( 

71 err.args[0] 

72 ) 

73 raise BaseHttpClientException(message=msg, response=err.response) from err 

74 

75 # SERVICE GROUP API 

76 def post_groups( 

77 self, 

78 service_groups: Union[ServiceGroup, List[ServiceGroup]], 

79 update: bool = False, 

80 ): 

81 """ 

82 Creates a set of service groups for the given service and service_path. 

83 The service_group and subservice information will taken from the 

84 headers, overwriting any preexisting values. 

85 

86 Args: 

87 service_groups (list of ServiceGroup): Service groups that will be 

88 posted to the agent's API 

89 update (bool): If service group already exists try to update its 

90 

91 Returns: 

92 None 

93 """ 

94 if not isinstance(service_groups, list): 

95 service_groups = [service_groups] 

96 for group in service_groups: 

97 if group.service: 

98 assert ( 

99 group.service == self.headers["fiware-service"] 

100 ), "Service group service does not math fiware service" 

101 if group.subservice: 

102 assert ( 

103 group.subservice == self.headers["fiware-servicepath"] 

104 ), "Service group subservice does not match fiware service path" 

105 

106 url = urljoin(self.base_url, "iot/services") 

107 headers = self.headers 

108 data = { 

109 "services": [ 

110 group.model_dump(exclude={"service", "subservice"}, exclude_none=True) 

111 for group in service_groups 

112 ] 

113 } 

114 try: 

115 res = self.post(url=url, headers=headers, json=data) 

116 if res.ok: 

117 self.logger.info("Services successfully posted") 

118 elif res.status_code == 409: 

119 self.logger.warning(res.text) 

120 if len(service_groups) > 1: 

121 self.logger.info( 

122 "Trying to split bulk operation into " "single operations" 

123 ) 

124 for group in service_groups: 

125 self.post_group(service_group=group, update=update) 

126 elif update is True: 

127 self.update_group(service_group=service_groups[0], fields=None) 

128 else: 

129 res.raise_for_status() 

130 else: 

131 res.raise_for_status() 

132 except requests.RequestException as err: 

133 self.logger.error(err) 

134 msg = "Could not post group because of following reason: " + str( 

135 err.args[0] 

136 ) 

137 raise BaseHttpClientException(message=msg, response=err.response) from err 

138 

139 def post_group(self, service_group: ServiceGroup, update: bool = False): 

140 """ 

141 Single service registration but using the bulk operation in background 

142 

143 Args: 

144 service_group (ServiceGroup): Service that will be posted to the 

145 agent's API 

146 update (bool): 

147 

148 Returns: 

149 None 

150 """ 

151 return self.post_groups(service_groups=[service_group], update=update) 

152 

153 def get_group_list(self) -> List[ServiceGroup]: 

154 r""" 

155 Retrieves service_group groups from the database. If the servicepath 

156 header has the wildcard expression, /\*, all the subservices for the 

157 service_group are returned. The specific subservice parameters are 

158 returned in any other case. 

159 

160 Returns: 

161 

162 """ 

163 url = urljoin(self.base_url, "iot/services") 

164 headers = self.headers 

165 try: 

166 res = self.get(url=url, headers=headers) 

167 if res.ok: 

168 ta = TypeAdapter(List[ServiceGroup]) 

169 return ta.validate_python(res.json()["services"]) 

170 res.raise_for_status() 

171 except requests.RequestException as err: 

172 self.logger.error(err) 

173 msg = "Could not retrieve group list because of following reason: " + str( 

174 err.args[0] 

175 ) 

176 raise BaseHttpClientException(message=msg, response=err.response) from err 

177 

178 def get_group(self, *, resource: str, apikey: str) -> ServiceGroup: 

179 """ 

180 Retrieves service_group groups from the database based on resource and 

181 apikey 

182 Args: 

183 resource: 

184 apikey: 

185 Returns: 

186 

187 """ 

188 groups = self.get_group_list() 

189 groups = filter_group_list( 

190 group_list=groups, resources=resource, apikeys=apikey 

191 ) 

192 if len(groups) == 1: 

193 group = groups[0] 

194 return group 

195 elif len(groups) == 0: 

196 raise KeyError( 

197 f"Service group with resource={resource} and apikey={apikey} was not found" 

198 ) 

199 else: 

200 raise NotImplementedError( 

201 "There is a wierd error, try get_group_list() for debugging" 

202 ) 

203 

204 def update_groups( 

205 self, 

206 *, 

207 service_groups: Union[ServiceGroup, List[ServiceGroup]], 

208 add: False, 

209 fields: Union[Set[str], List[str]] = None, 

210 ) -> None: 

211 """ 

212 Bulk operation for service group update. 

213 Args: 

214 fields: 

215 service_groups: 

216 add: 

217 

218 Returns: 

219 

220 """ 

221 if not isinstance(service_groups, list): 

222 service_groups = [service_groups] 

223 for group in service_groups: 

224 self.update_group(service_group=group, fields=fields, add=add) 

225 

226 def update_group( 

227 self, 

228 *, 

229 service_group: ServiceGroup, 

230 fields: Union[Set[str], List[str]] = None, 

231 add: bool = True, 

232 ): 

233 """ 

234 Modifies the information for a service group configuration, identified 

235 by the resource and apikey query parameters. Takes a service group body 

236 as the payload. The body does not have to be complete: for incomplete 

237 bodies, just the existing attributes will be updated 

238 

239 Args: 

240 service_group (ServiceGroup): Service to update. 

241 fields: Fields of the service_group to update. If 'None' all allowed 

242 fields will be updated 

243 add: 

244 Returns: 

245 None 

246 """ 

247 if fields: 

248 if isinstance(fields, list): 

249 fields = set(fields) 

250 else: 

251 fields = None 

252 url = urljoin(self.base_url, "iot/services") 

253 headers = self.headers 

254 params = service_group.model_dump(include={"resource", "apikey"}) 

255 try: 

256 res = self.put( 

257 url=url, 

258 headers=headers, 

259 params=params, 

260 json=service_group.model_dump( 

261 include=fields, exclude={"service", "subservice"}, exclude_none=True 

262 ), 

263 ) 

264 if res.ok: 

265 self.logger.info("ServiceGroup updated!") 

266 elif (res.status_code == 404) & (add is True): 

267 self.post_group(service_group=service_group) 

268 else: 

269 res.raise_for_status() 

270 except requests.RequestException as err: 

271 self.logger.error(err) 

272 msg = "Could not update group because of following reason: " + str( 

273 err.args[0] 

274 ) 

275 raise BaseHttpClientException(message=msg, response=err.response) from err 

276 

277 def delete_group(self, *, resource: str, apikey: str): 

278 """ 

279 Deletes a service group in in the IoT-Agent 

280 

281 Args: 

282 resource: 

283 apikey: 

284 

285 Returns: 

286 

287 """ 

288 url = urljoin(self.base_url, "iot/services") 

289 headers = self.headers 

290 params = {"resource": resource, "apikey": apikey} 

291 try: 

292 res = self.delete(url=url, headers=headers, params=params) 

293 if res.ok: 

294 self.logger.info( 

295 "ServiceGroup with resource: '%s' and " 

296 "apikey: '%s' successfully deleted!", 

297 resource, 

298 apikey, 

299 ) 

300 else: 

301 res.raise_for_status() 

302 except requests.RequestException as err: 

303 self.logger.error(err) 

304 msg = ( 

305 f"Could not delete ServiceGroup with resource " 

306 f"'{resource}' and apikey '{apikey}' because of following reason: {str(err.args[0])}" 

307 ) 

308 raise BaseHttpClientException(message=msg, response=err.response) from err 

309 

310 # DEVICE API 

311 def post_devices( 

312 self, *, devices: Union[Device, List[Device]], update: bool = False 

313 ) -> None: 

314 """ 

315 Post a device from the device registry. No payload is required 

316 or received. 

317 If a device already exists in can be updated with update = True 

318 Args: 

319 devices (list of Devices): 

320 update (bool): Whether if the device is already existent it 

321 should be updated 

322 Returns: 

323 None 

324 """ 

325 if not isinstance(devices, list): 

326 devices = [devices] 

327 url = urljoin(self.base_url, "iot/devices") 

328 headers = self.headers 

329 

330 data = { 

331 "devices": [ 

332 json.loads(device.model_dump_json(exclude_none=True)) 

333 for device in devices 

334 ] 

335 } 

336 try: 

337 res = self.post(url=url, headers=headers, json=data) 

338 if res.ok: 

339 self.logger.info("Devices successfully posted!") 

340 else: 

341 res.raise_for_status() 

342 except requests.RequestException as err: 

343 if update: 

344 return self.update_devices(devices=devices, add=False) 

345 self.logger.error(err) 

346 msg = "Could not post devices because of following reason: " + str( 

347 err.args[0] 

348 ) 

349 raise BaseHttpClientException(message=msg, response=err.response) from err 

350 

351 def post_device(self, *, device: Device, update: bool = False) -> None: 

352 """ 

353 Post a device configuration to the IoT-Agent 

354 

355 Args: 

356 device: IoT device configuration to send 

357 update: update device if configuration already exists 

358 

359 Returns: 

360 None 

361 """ 

362 return self.post_devices(devices=[device], update=update) 

363 

364 def get_device_list( 

365 self, 

366 *, 

367 limit: int = None, 

368 offset: int = None, 

369 device_ids: Union[str, List[str]] = None, 

370 entity_names: Union[str, List[str]] = None, 

371 entity_types: Union[str, List[str]] = None, 

372 ) -> List[Device]: 

373 """ 

374 Returns a list of all the devices in the device registry with all 

375 its data. The IoTAgent now only supports "limit" and "offset" as 

376 request parameters. 

377 

378 Args: 

379 limit: 

380 if present, limits the number of devices returned in the 

381 list. Must be a number between 1 and 1000. 

382 offset: 

383 if present, skip that number of devices from the original 

384 query. 

385 device_ids: 

386 List of device_ids. If given, only devices with matching ids 

387 will be returned 

388 entity_names: 

389 The entity_ids of the devices. If given, only the devices 

390 with the specified entity_id will be returned 

391 entity_types: 

392 The entity_type of the device. If given, only the devices 

393 with the specified entity_type will be returned 

394 

395 Returns: 

396 List of matching devices 

397 """ 

398 params = {} 

399 if limit: 

400 if not 1 < limit < 1000: 

401 self.logger.error("'limit' must be an integer between 1 and " "1000!") 

402 raise ValueError 

403 else: 

404 params["limit"] = limit 

405 if offset: 

406 if not isinstance(offset, int): 

407 self.logger.error("'offset' must be an integer!") 

408 raise ValueError 

409 else: 

410 params["offset"] = offset 

411 url = urljoin(self.base_url, "iot/devices") 

412 headers = self.headers 

413 try: 

414 res = self.get(url=url, headers=headers, params=params) 

415 if res.ok: 

416 ta = TypeAdapter(List[Device]) 

417 devices = ta.validate_python(res.json()["devices"]) 

418 # filter by device_ids, entity_names or entity_types 

419 devices = filter_device_list( 

420 devices, device_ids, entity_names, entity_types 

421 ) 

422 return devices 

423 res.raise_for_status() 

424 except requests.RequestException as err: 

425 self.logger.error(err) 

426 msg = ( 

427 "Not able to retrieve the device list because of the following reason:" 

428 + str(err.args[0]) 

429 ) 

430 raise BaseHttpClientException(message=msg, response=err.response) from err 

431 

432 def get_device(self, *, device_id: str) -> Device: 

433 """ 

434 Returns all the information about a particular device. 

435 

436 Args: 

437 device_id: 

438 Raises: 

439 requests.RequestException, if device does not exist 

440 Returns: 

441 Device 

442 

443 """ 

444 url = urljoin(self.base_url, f"iot/devices/{device_id}") 

445 headers = self.headers 

446 try: 

447 res = self.get(url=url, headers=headers) 

448 if res.ok: 

449 return Device.model_validate(res.json()) 

450 res.raise_for_status() 

451 except requests.RequestException as err: 

452 self.logger.error(err) 

453 

454 msg = f"Device '{device_id}' was not found because of the following reason: {str(err.args[0])}" 

455 raise BaseHttpClientException(message=msg, response=err.response) from err 

456 

457 def update_device(self, *, device: Device, add: bool = True) -> None: 

458 """ 

459 Updates a device from the device registry. 

460 Adds, removes attributes from the device entry and changes 

461 attributes values. 

462 It does not change device settings (endpoint,..) and only adds 

463 attributes to the corresponding entity, their it does not 

464 change any attribute value and does not delete removed attributes 

465 

466 Args: 

467 device: 

468 add (bool): If device not found add it 

469 Returns: 

470 None 

471 """ 

472 url = urljoin(self.base_url, f"iot/devices/{device.device_id}") 

473 headers = self.headers 

474 try: 

475 res = self.put( 

476 url=url, 

477 headers=headers, 

478 json=device.model_dump( 

479 include={"attributes", "lazy", "commands", "static_attributes"}, 

480 exclude_none=True, 

481 ), 

482 ) 

483 if res.ok: 

484 self.logger.info("Device '%s' successfully updated!", device.device_id) 

485 elif (res.status_code == 404) & (add is True): 

486 self.post_device(device=device, update=False) 

487 else: 

488 res.raise_for_status() 

489 except requests.RequestException as err: 

490 self.logger.error(err) 

491 msg = f"Could not update device '{device.device_id}' because of the following reason: {str(err.args[0])} " 

492 raise BaseHttpClientException(message=msg, response=err.response) from err 

493 

494 def update_devices( 

495 self, *, devices: Union[Device, List[Device]], add: False 

496 ) -> None: 

497 """ 

498 Bulk operation for device update. 

499 Args: 

500 devices: 

501 add: 

502 

503 Returns: 

504 

505 """ 

506 if not isinstance(devices, list): 

507 devices = [devices] 

508 for device in devices: 

509 self.update_device(device=device, add=add) 

510 

511 def delete_device( 

512 self, 

513 *, 

514 device_id: str, 

515 cb_url: AnyHttpUrl = settings.CB_URL, 

516 delete_entity: bool = False, 

517 force_entity_deletion: bool = False, 

518 cb_client: ContextBrokerClient = None, 

519 ) -> None: 

520 """ 

521 Remove a device from the device registry. No payload is required 

522 or received. 

523 

524 Args: 

525 device_id: str, ID of Device 

526 delete_entity: False -> Only delete the device entry, 

527 the automatically created and linked 

528 context-entity will continue to 

529 exist in Fiware 

530 True -> Also delete the automatically 

531 created and linked context-entity 

532 If multiple devices are linked to this 

533 entity, this operation is not executed and 

534 an exception is raised 

535 force_entity_deletion: 

536 bool, if delete_entity is true and multiple devices are linked 

537 to the linked entity, delete it and do not raise an error 

538 cb_client (ContextBrokerClient): 

539 Corresponding ContextBrokerClient object for entity manipulation 

540 cb_url (AnyHttpUrl): 

541 Url of the ContextBroker where the entity is found. 

542 This will autogenerate an CB-Client, mirroring the information 

543 of the IoTA-Client, e.g. FiwareHeader, and other headers 

544 (not recommended!) 

545 

546 Returns: 

547 None 

548 """ 

549 url = urljoin( 

550 self.base_url, 

551 f"iot/devices/{device_id}", 

552 ) 

553 headers = self.headers 

554 

555 device = self.get_device(device_id=device_id) 

556 

557 try: 

558 res = self.delete(url=url, headers=headers) 

559 if res.ok: 

560 self.logger.info("Device '%s' successfully deleted!", device_id) 

561 else: 

562 res.raise_for_status() 

563 except requests.RequestException as err: 

564 msg = f"Could not delete device {device_id}!" 

565 raise BaseHttpClientException(message=msg, response=err.response) from err 

566 

567 if delete_entity: 

568 # An entity can technically belong to multiple devices 

569 # Only delete the entity if 

570 devices = self.get_device_list(entity_names=[device.entity_name]) 

571 

572 # Zero because we count the remaining devices 

573 if len(devices) > 0 and not force_entity_deletion: 

574 raise Exception( 

575 f"The corresponding entity to the device " 

576 f"{device_id} was not deleted because it is " 

577 f"linked to multiple devices. " 

578 ) 

579 else: 

580 cb_client_local = None 

581 try: 

582 from filip.clients.ngsi_v2 import ContextBrokerClient 

583 

584 if cb_client: 

585 cb_client_local = deepcopy(cb_client) 

586 else: 

587 warnings.warn( 

588 "No `ContextBrokerClient` " 

589 "object provided! Will try to generate " 

590 "one. This usage is not recommended." 

591 ) 

592 

593 cb_client_local = ContextBrokerClient( 

594 url=cb_url, 

595 fiware_header=self.fiware_headers, 

596 headers=headers, 

597 ) 

598 

599 cb_client_local.delete_entity( 

600 entity_id=device.entity_name, entity_type=device.entity_type 

601 ) 

602 

603 except requests.RequestException as err: 

604 # Do not throw an error 

605 # It is only important that the entity does not exist after 

606 # this methode, not if this methode actively deleted it 

607 pass 

608 

609 if cb_client_local: 

610 cb_client_local.close() 

611 

612 def patch_device( 

613 self, 

614 device: Device, 

615 patch_entity: bool = True, 

616 cb_client: ContextBrokerClient = None, 

617 cb_url: AnyHttpUrl = settings.CB_URL, 

618 ) -> None: 

619 """ 

620 Updates a device state in Fiware, if the device does not exist it 

621 is created, else its values are updated. 

622 If the device settings were changed the device and 

623 entity are deleted and re-added. 

624 

625 If patch_entity is true the corresponding entity in the ContextBroker is 

626 also correctly updated. Else only new attributes are added there. 

627 

628 Args: 

629 device (Device): Device to be posted to /updated in Fiware 

630 patch_entity (bool): If true the corresponding entity is 

631 completely synced 

632 cb_client (ContextBrokerClient): 

633 Corresponding ContextBrokerClient object for entity manipulation 

634 cb_url (AnyHttpUrl): 

635 Url of the ContextBroker where the entity is found. 

636 This will autogenerate an CB-Client, mirroring the information 

637 of the IoTA-Client, e.g. FiwareHeader, and other headers 

638 (not recommended!) 

639 

640 Returns: 

641 None 

642 """ 

643 try: 

644 live_device = self.get_device(device_id=device.device_id) 

645 except requests.RequestException: 

646 # device does not exist yet, post it 

647 self.post_device(device=device) 

648 return 

649 

650 # if the device settings were changed we need to delete the device 

651 # and repost it 

652 settings_dict = { 

653 "device_id", 

654 "service", 

655 "service_path", 

656 "entity_name", 

657 "entity_type", 

658 "timestamp", 

659 "apikey", 

660 "endpoint", 

661 "protocol", 

662 "transport", 

663 "expressionLanguage", 

664 } 

665 

666 live_settings = live_device.model_dump(include=settings_dict) 

667 new_settings = device.model_dump(include=settings_dict) 

668 

669 if not live_settings == new_settings: 

670 self.delete_device( 

671 device_id=device.device_id, 

672 delete_entity=True, 

673 force_entity_deletion=True, 

674 cb_client=cb_client, 

675 ) 

676 self.post_device(device=device) 

677 return 

678 

679 # We are at a state where the device exists, but only attributes were 

680 # changed. 

681 # we need to update the device, and the context entry separately, 

682 # as update device only takes over a part of the changes to the 

683 # ContextBroker. 

684 

685 # update device 

686 self.update_device(device=device) 

687 

688 # update context entry 

689 # 1. build context entity from information in device 

690 # 2. patch it 

691 from filip.models.ngsi_v2.context import ContextEntity, NamedContextAttribute 

692 

693 def build_context_entity_from_device(device: Device) -> ContextEntity: 

694 from filip.models.base import DataType 

695 

696 entity = ContextEntity(id=device.entity_name, type=device.entity_type) 

697 

698 for command in device.commands: 

699 entity.add_attributes( 

700 [ 

701 # Command attribute will be registered by the device_update 

702 NamedContextAttribute( 

703 name=f"{command.name}_info", type=DataType.COMMAND_RESULT 

704 ), 

705 NamedContextAttribute( 

706 name=f"{command.name}_status", type=DataType.COMMAND_STATUS 

707 ), 

708 ] 

709 ) 

710 for attribute in device.attributes: 

711 entity.add_attributes( 

712 [ 

713 NamedContextAttribute( 

714 name=attribute.name, 

715 type=DataType.STRUCTUREDVALUE, 

716 metadata=attribute.metadata, 

717 ) 

718 ] 

719 ) 

720 for static_attribute in device.static_attributes: 

721 entity.add_attributes( 

722 [ 

723 NamedContextAttribute( 

724 name=static_attribute.name, 

725 type=static_attribute.type, 

726 value=static_attribute.value, 

727 metadata=static_attribute.metadata, 

728 ) 

729 ] 

730 ) 

731 return entity 

732 

733 if patch_entity: 

734 from filip.clients.ngsi_v2 import ContextBrokerClient 

735 

736 if cb_client: 

737 cb_client_local = deepcopy(cb_client) 

738 else: 

739 warnings.warn( 

740 "No `ContextBrokerClient` object provided! " 

741 "Will try to generate one. " 

742 "This usage is not recommended." 

743 ) 

744 

745 cb_client_local = ContextBrokerClient( 

746 url=cb_url, fiware_header=self.fiware_headers, headers=self.headers 

747 ) 

748 

749 cb_client_local.override_entity( 

750 entity=build_context_entity_from_device(device) 

751 ) 

752 cb_client_local.close() 

753 

754 def does_device_exists(self, device_id: str) -> bool: 

755 """ 

756 Test if a device with the given id exists in Fiware 

757 Args: 

758 device_id (str) 

759 Returns: 

760 bool 

761 """ 

762 try: 

763 self.get_device(device_id=device_id) 

764 return True 

765 except requests.RequestException as err: 

766 if err.response is None or not err.response.status_code == 404: 

767 self.logger.error(err) 

768 msg = f"Could not check device status because of the following reason: {str(err.args[0])}" 

769 raise BaseHttpClientException( 

770 message=msg, response=err.response 

771 ) from err 

772 return False 

773 

774 # LOG API 

775 def get_loglevel_of_agent(self): 

776 """ 

777 Get current loglevel of agent 

778 Returns: 

779 

780 """ 

781 url = urljoin(self.base_url, "admin/log") 

782 headers = self.headers.copy() 

783 del headers["fiware-service"] 

784 del headers["fiware-servicepath"] 

785 try: 

786 res = self.get(url=url, headers=headers) 

787 if res.ok: 

788 return res.json()["level"] 

789 res.raise_for_status() 

790 except requests.RequestException as err: 

791 self.log_error(err=err) 

792 msg = f"Could not check for loglevel status because of the following reason: {str(err.args[0])}" 

793 raise BaseHttpClientException(message=msg, response=err.response) from err 

794 

795 def change_loglevel_of_agent(self, level: str): 

796 """ 

797 Change current loglevel of agent 

798 

799 Args: 

800 level: 

801 

802 Returns: 

803 

804 """ 

805 level = level.upper() 

806 if level not in ["INFO", "ERROR", "FATAL", "DEBUG", "WARNING"]: 

807 raise KeyError("Given log level is not supported") 

808 

809 url = urljoin(self.base_url, "admin/log") 

810 headers = self.headers.copy() 

811 new_loglevel = {"level": level} 

812 del headers["fiware-service"] 

813 del headers["fiware-servicepath"] 

814 try: 

815 res = self.put(url=url, headers=headers, params=new_loglevel) 

816 if res.ok: 

817 self.logger.info( 

818 "Loglevel of agent at %s " "changed to '%s'", 

819 self.base_url, 

820 new_loglevel, 

821 ) 

822 else: 

823 res.raise_for_status() 

824 except requests.RequestException as err: 

825 self.log_error(err=err) 

826 msg = f"Could not change loglevel because of the following reason: {str(err.args[0])}" 

827 raise BaseHttpClientException(message=msg, response=err.response) from err