Coverage for filip/clients/ngsi_v2/iota.py: 82%
239 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-05 11:07 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-05 11:07 +0000
1"""
2IoT-Agent Module for API Client
3"""
5from __future__ import annotations
7import json
8from copy import deepcopy
9from typing import List, Dict, Set, TYPE_CHECKING, Union, Optional
10import warnings
11from urllib.parse import urljoin
12import requests
13from pydantic import AnyHttpUrl
14from pydantic.type_adapter import TypeAdapter
15from filip.config import settings
16from filip.clients.base_http_client import BaseHttpClient
17from filip.clients.exceptions import BaseHttpClientException
18from filip.models.base import FiwareHeader
19from filip.models.ngsi_v2.iot import Device, ServiceGroup
21from filip.utils.filter import filter_device_list, filter_group_list
23if TYPE_CHECKING:
24 from filip.clients.ngsi_v2.cb import ContextBrokerClient
27class IoTAClient(BaseHttpClient):
28 """
29 Client for FIWARE IoT-Agents. The implementation follows the API
30 specifications from here:
31 https://iotagent-node-lib.readthedocs.io/en/latest/
33 Args:
34 url: Url of IoT-Agent
35 session (requests.Session):
36 fiware_header (FiwareHeader): fiware service and fiware service path
37 **kwargs (Optional): Optional arguments that ``request`` takes.
38 """
40 def __init__(
41 self,
42 url: str = None,
43 *,
44 session: requests.Session = None,
45 fiware_header: FiwareHeader = None,
46 **kwargs,
47 ):
48 # set service url
49 url = url or settings.IOTA_URL
50 super().__init__(
51 url=url, session=session, fiware_header=fiware_header, **kwargs
52 )
54 # ABOUT API
55 def get_version(self) -> Dict:
56 """
57 Gets version of IoT Agent
59 Returns:
60 Dictionary with response
61 """
62 url = urljoin(self.base_url, "iot/about")
63 try:
64 res = self.get(url=url, headers=self.headers)
65 if res.ok:
66 return res.json()
67 res.raise_for_status()
68 except requests.RequestException as err:
69 self.logger.error(err)
70 msg = "Could not retrieve version because of following reason: " + str(
71 err.args[0]
72 )
73 raise BaseHttpClientException(message=msg, response=err.response) from err
75 # SERVICE GROUP API
76 def post_groups(
77 self,
78 service_groups: Union[ServiceGroup, List[ServiceGroup]],
79 update: bool = False,
80 ):
81 """
82 Creates a set of service groups for the given service and service_path.
83 The service_group and subservice information will taken from the
84 headers, overwriting any preexisting values.
86 Args:
87 service_groups (list of ServiceGroup): Service groups that will be
88 posted to the agent's API
89 update (bool): If service group already exists try to update its
91 Returns:
92 None
93 """
94 if not isinstance(service_groups, list):
95 service_groups = [service_groups]
96 for group in service_groups:
97 if group.service:
98 assert (
99 group.service == self.headers["fiware-service"]
100 ), "Service group service does not math fiware service"
101 if group.subservice:
102 assert (
103 group.subservice == self.headers["fiware-servicepath"]
104 ), "Service group subservice does not match fiware service path"
106 url = urljoin(self.base_url, "iot/services")
107 headers = self.headers
108 data = {
109 "services": [
110 group.model_dump(exclude={"service", "subservice"}, exclude_none=True)
111 for group in service_groups
112 ]
113 }
114 try:
115 res = self.post(url=url, headers=headers, json=data)
116 if res.ok:
117 self.logger.info("Services successfully posted")
118 elif res.status_code == 409:
119 self.logger.warning(res.text)
120 if len(service_groups) > 1:
121 self.logger.info(
122 "Trying to split bulk operation into " "single operations"
123 )
124 for group in service_groups:
125 self.post_group(service_group=group, update=update)
126 elif update is True:
127 self.update_group(service_group=service_groups[0], fields=None)
128 else:
129 res.raise_for_status()
130 else:
131 res.raise_for_status()
132 except requests.RequestException as err:
133 self.logger.error(err)
134 msg = "Could not post group because of following reason: " + str(
135 err.args[0]
136 )
137 raise BaseHttpClientException(message=msg, response=err.response) from err
139 def post_group(self, service_group: ServiceGroup, update: bool = False):
140 """
141 Single service registration but using the bulk operation in background
143 Args:
144 service_group (ServiceGroup): Service that will be posted to the
145 agent's API
146 update (bool):
148 Returns:
149 None
150 """
151 return self.post_groups(service_groups=[service_group], update=update)
153 def get_group_list(self) -> List[ServiceGroup]:
154 r"""
155 Retrieves service_group groups from the database. If the servicepath
156 header has the wildcard expression, /\*, all the subservices for the
157 service_group are returned. The specific subservice parameters are
158 returned in any other case.
160 Returns:
162 """
163 url = urljoin(self.base_url, "iot/services")
164 headers = self.headers
165 try:
166 res = self.get(url=url, headers=headers)
167 if res.ok:
168 ta = TypeAdapter(List[ServiceGroup])
169 return ta.validate_python(res.json()["services"])
170 res.raise_for_status()
171 except requests.RequestException as err:
172 self.logger.error(err)
173 msg = "Could not retrieve group list because of following reason: " + str(
174 err.args[0]
175 )
176 raise BaseHttpClientException(message=msg, response=err.response) from err
178 def get_group(self, *, resource: str, apikey: str) -> ServiceGroup:
179 """
180 Retrieves service_group groups from the database based on resource and
181 apikey
182 Args:
183 resource:
184 apikey:
185 Returns:
187 """
188 groups = self.get_group_list()
189 groups = filter_group_list(
190 group_list=groups, resources=resource, apikeys=apikey
191 )
192 if len(groups) == 1:
193 group = groups[0]
194 return group
195 elif len(groups) == 0:
196 raise KeyError(
197 f"Service group with resource={resource} and apikey={apikey} was not found"
198 )
199 else:
200 raise NotImplementedError(
201 "There is a wierd error, try get_group_list() for debugging"
202 )
204 def update_groups(
205 self,
206 *,
207 service_groups: Union[ServiceGroup, List[ServiceGroup]],
208 add: False,
209 fields: Union[Set[str], List[str]] = None,
210 ) -> None:
211 """
212 Bulk operation for service group update.
213 Args:
214 fields:
215 service_groups:
216 add:
218 Returns:
220 """
221 if not isinstance(service_groups, list):
222 service_groups = [service_groups]
223 for group in service_groups:
224 self.update_group(service_group=group, fields=fields, add=add)
226 def update_group(
227 self,
228 *,
229 service_group: ServiceGroup,
230 fields: Union[Set[str], List[str]] = None,
231 add: bool = True,
232 ):
233 """
234 Modifies the information for a service group configuration, identified
235 by the resource and apikey query parameters. Takes a service group body
236 as the payload. The body does not have to be complete: for incomplete
237 bodies, just the existing attributes will be updated
239 Args:
240 service_group (ServiceGroup): Service to update.
241 fields: Fields of the service_group to update. If 'None' all allowed
242 fields will be updated
243 add:
244 Returns:
245 None
246 """
247 if fields:
248 if isinstance(fields, list):
249 fields = set(fields)
250 else:
251 fields = None
252 url = urljoin(self.base_url, "iot/services")
253 headers = self.headers
254 params = service_group.model_dump(include={"resource", "apikey"})
255 try:
256 res = self.put(
257 url=url,
258 headers=headers,
259 params=params,
260 json=service_group.model_dump(
261 include=fields, exclude={"service", "subservice"}, exclude_none=True
262 ),
263 )
264 if res.ok:
265 self.logger.info("ServiceGroup updated!")
266 elif (res.status_code == 404) & (add is True):
267 self.post_group(service_group=service_group)
268 else:
269 res.raise_for_status()
270 except requests.RequestException as err:
271 self.logger.error(err)
272 msg = "Could not update group because of following reason: " + str(
273 err.args[0]
274 )
275 raise BaseHttpClientException(message=msg, response=err.response) from err
277 def delete_group(self, *, resource: str, apikey: str):
278 """
279 Deletes a service group in in the IoT-Agent
281 Args:
282 resource:
283 apikey:
285 Returns:
287 """
288 url = urljoin(self.base_url, "iot/services")
289 headers = self.headers
290 params = {"resource": resource, "apikey": apikey}
291 try:
292 res = self.delete(url=url, headers=headers, params=params)
293 if res.ok:
294 self.logger.info(
295 "ServiceGroup with resource: '%s' and "
296 "apikey: '%s' successfully deleted!",
297 resource,
298 apikey,
299 )
300 else:
301 res.raise_for_status()
302 except requests.RequestException as err:
303 self.logger.error(err)
304 msg = (
305 f"Could not delete ServiceGroup with resource "
306 f"'{resource}' and apikey '{apikey}' because of following reason: {str(err.args[0])}"
307 )
308 raise BaseHttpClientException(message=msg, response=err.response) from err
310 # DEVICE API
311 def post_devices(
312 self, *, devices: Union[Device, List[Device]], update: bool = False
313 ) -> None:
314 """
315 Post a device from the device registry. No payload is required
316 or received.
317 If a device already exists in can be updated with update = True
318 Args:
319 devices (list of Devices):
320 update (bool): Whether if the device is already existent it
321 should be updated
322 Returns:
323 None
324 """
325 if not isinstance(devices, list):
326 devices = [devices]
327 url = urljoin(self.base_url, "iot/devices")
328 headers = self.headers
330 data = {
331 "devices": [
332 json.loads(device.model_dump_json(exclude_none=True))
333 for device in devices
334 ]
335 }
336 try:
337 res = self.post(url=url, headers=headers, json=data)
338 if res.ok:
339 self.logger.info("Devices successfully posted!")
340 else:
341 res.raise_for_status()
342 except requests.RequestException as err:
343 if update:
344 return self.update_devices(devices=devices, add=False)
345 self.logger.error(err)
346 msg = "Could not post devices because of following reason: " + str(
347 err.args[0]
348 )
349 raise BaseHttpClientException(message=msg, response=err.response) from err
351 def post_device(self, *, device: Device, update: bool = False) -> None:
352 """
353 Post a device configuration to the IoT-Agent
355 Args:
356 device: IoT device configuration to send
357 update: update device if configuration already exists
359 Returns:
360 None
361 """
362 return self.post_devices(devices=[device], update=update)
364 def get_device_list(
365 self,
366 *,
367 limit: int = None,
368 offset: int = None,
369 device_ids: Union[str, List[str]] = None,
370 entity_names: Union[str, List[str]] = None,
371 entity_types: Union[str, List[str]] = None,
372 ) -> List[Device]:
373 """
374 Returns a list of all the devices in the device registry with all
375 its data. The IoTAgent now only supports "limit" and "offset" as
376 request parameters.
378 Args:
379 limit:
380 if present, limits the number of devices returned in the
381 list. Must be a number between 1 and 1000.
382 offset:
383 if present, skip that number of devices from the original
384 query.
385 device_ids:
386 List of device_ids. If given, only devices with matching ids
387 will be returned
388 entity_names:
389 The entity_ids of the devices. If given, only the devices
390 with the specified entity_id will be returned
391 entity_types:
392 The entity_type of the device. If given, only the devices
393 with the specified entity_type will be returned
395 Returns:
396 List of matching devices
397 """
398 params = {}
399 if limit:
400 if not 1 < limit < 1000:
401 self.logger.error("'limit' must be an integer between 1 and " "1000!")
402 raise ValueError
403 else:
404 params["limit"] = limit
405 if offset:
406 if not isinstance(offset, int):
407 self.logger.error("'offset' must be an integer!")
408 raise ValueError
409 else:
410 params["offset"] = offset
411 url = urljoin(self.base_url, "iot/devices")
412 headers = self.headers
413 try:
414 res = self.get(url=url, headers=headers, params=params)
415 if res.ok:
416 ta = TypeAdapter(List[Device])
417 devices = ta.validate_python(res.json()["devices"])
418 # filter by device_ids, entity_names or entity_types
419 devices = filter_device_list(
420 devices, device_ids, entity_names, entity_types
421 )
422 return devices
423 res.raise_for_status()
424 except requests.RequestException as err:
425 self.logger.error(err)
426 msg = (
427 "Not able to retrieve the device list because of the following reason:"
428 + str(err.args[0])
429 )
430 raise BaseHttpClientException(message=msg, response=err.response) from err
432 def get_device(self, *, device_id: str) -> Device:
433 """
434 Returns all the information about a particular device.
436 Args:
437 device_id:
438 Raises:
439 requests.RequestException, if device does not exist
440 Returns:
441 Device
443 """
444 url = urljoin(self.base_url, f"iot/devices/{device_id}")
445 headers = self.headers
446 try:
447 res = self.get(url=url, headers=headers)
448 if res.ok:
449 return Device.model_validate(res.json())
450 res.raise_for_status()
451 except requests.RequestException as err:
452 self.logger.error(err)
454 msg = f"Device '{device_id}' was not found because of the following reason: {str(err.args[0])}"
455 raise BaseHttpClientException(message=msg, response=err.response) from err
457 def update_device(self, *, device: Device, add: bool = True) -> None:
458 """
459 Updates a device from the device registry.
460 Adds, removes attributes from the device entry and changes
461 attributes values.
462 It does not change device settings (endpoint,..) and only adds
463 attributes to the corresponding entity, their it does not
464 change any attribute value and does not delete removed attributes
466 Args:
467 device:
468 add (bool): If device not found add it
469 Returns:
470 None
471 """
472 url = urljoin(self.base_url, f"iot/devices/{device.device_id}")
473 headers = self.headers
474 try:
475 res = self.put(
476 url=url,
477 headers=headers,
478 json=device.model_dump(
479 include={"attributes", "lazy", "commands", "static_attributes"},
480 exclude_none=True,
481 ),
482 )
483 if res.ok:
484 self.logger.info("Device '%s' successfully updated!", device.device_id)
485 elif (res.status_code == 404) & (add is True):
486 self.post_device(device=device, update=False)
487 else:
488 res.raise_for_status()
489 except requests.RequestException as err:
490 self.logger.error(err)
491 msg = f"Could not update device '{device.device_id}' because of the following reason: {str(err.args[0])} "
492 raise BaseHttpClientException(message=msg, response=err.response) from err
494 def update_devices(
495 self, *, devices: Union[Device, List[Device]], add: False
496 ) -> None:
497 """
498 Bulk operation for device update.
499 Args:
500 devices:
501 add:
503 Returns:
505 """
506 if not isinstance(devices, list):
507 devices = [devices]
508 for device in devices:
509 self.update_device(device=device, add=add)
511 def delete_device(
512 self,
513 *,
514 device_id: str,
515 cb_url: AnyHttpUrl = settings.CB_URL,
516 delete_entity: bool = False,
517 force_entity_deletion: bool = False,
518 cb_client: ContextBrokerClient = None,
519 ) -> None:
520 """
521 Remove a device from the device registry. No payload is required
522 or received.
524 Args:
525 device_id: str, ID of Device
526 delete_entity: False -> Only delete the device entry,
527 the automatically created and linked
528 context-entity will continue to
529 exist in Fiware
530 True -> Also delete the automatically
531 created and linked context-entity
532 If multiple devices are linked to this
533 entity, this operation is not executed and
534 an exception is raised
535 force_entity_deletion:
536 bool, if delete_entity is true and multiple devices are linked
537 to the linked entity, delete it and do not raise an error
538 cb_client (ContextBrokerClient):
539 Corresponding ContextBrokerClient object for entity manipulation
540 cb_url (AnyHttpUrl):
541 Url of the ContextBroker where the entity is found.
542 This will autogenerate an CB-Client, mirroring the information
543 of the IoTA-Client, e.g. FiwareHeader, and other headers
544 (not recommended!)
546 Returns:
547 None
548 """
549 url = urljoin(
550 self.base_url,
551 f"iot/devices/{device_id}",
552 )
553 headers = self.headers
555 device = self.get_device(device_id=device_id)
557 try:
558 res = self.delete(url=url, headers=headers)
559 if res.ok:
560 self.logger.info("Device '%s' successfully deleted!", device_id)
561 else:
562 res.raise_for_status()
563 except requests.RequestException as err:
564 msg = f"Could not delete device {device_id}!"
565 raise BaseHttpClientException(message=msg, response=err.response) from err
567 if delete_entity:
568 # An entity can technically belong to multiple devices
569 # Only delete the entity if
570 devices = self.get_device_list(entity_names=[device.entity_name])
572 # Zero because we count the remaining devices
573 if len(devices) > 0 and not force_entity_deletion:
574 raise Exception(
575 f"The corresponding entity to the device "
576 f"{device_id} was not deleted because it is "
577 f"linked to multiple devices. "
578 )
579 else:
580 cb_client_local = None
581 try:
582 from filip.clients.ngsi_v2 import ContextBrokerClient
584 if cb_client:
585 cb_client_local = deepcopy(cb_client)
586 else:
587 warnings.warn(
588 "No `ContextBrokerClient` "
589 "object provided! Will try to generate "
590 "one. This usage is not recommended."
591 )
593 cb_client_local = ContextBrokerClient(
594 url=cb_url,
595 fiware_header=self.fiware_headers,
596 headers=headers,
597 )
599 cb_client_local.delete_entity(
600 entity_id=device.entity_name, entity_type=device.entity_type
601 )
603 except requests.RequestException as err:
604 # Do not throw an error
605 # It is only important that the entity does not exist after
606 # this methode, not if this methode actively deleted it
607 pass
609 if cb_client_local:
610 cb_client_local.close()
612 def patch_device(
613 self,
614 device: Device,
615 patch_entity: bool = True,
616 cb_client: ContextBrokerClient = None,
617 cb_url: AnyHttpUrl = settings.CB_URL,
618 ) -> None:
619 """
620 Updates a device state in Fiware, if the device does not exist it
621 is created, else its values are updated.
622 If the device settings were changed the device and
623 entity are deleted and re-added.
625 If patch_entity is true the corresponding entity in the ContextBroker is
626 also correctly updated. Else only new attributes are added there.
628 Args:
629 device (Device): Device to be posted to /updated in Fiware
630 patch_entity (bool): If true the corresponding entity is
631 completely synced
632 cb_client (ContextBrokerClient):
633 Corresponding ContextBrokerClient object for entity manipulation
634 cb_url (AnyHttpUrl):
635 Url of the ContextBroker where the entity is found.
636 This will autogenerate an CB-Client, mirroring the information
637 of the IoTA-Client, e.g. FiwareHeader, and other headers
638 (not recommended!)
640 Returns:
641 None
642 """
643 try:
644 live_device = self.get_device(device_id=device.device_id)
645 except requests.RequestException:
646 # device does not exist yet, post it
647 self.post_device(device=device)
648 return
650 # if the device settings were changed we need to delete the device
651 # and repost it
652 settings_dict = {
653 "device_id",
654 "service",
655 "service_path",
656 "entity_name",
657 "entity_type",
658 "timestamp",
659 "apikey",
660 "endpoint",
661 "protocol",
662 "transport",
663 "expressionLanguage",
664 }
666 live_settings = live_device.model_dump(include=settings_dict)
667 new_settings = device.model_dump(include=settings_dict)
669 if not live_settings == new_settings:
670 self.delete_device(
671 device_id=device.device_id,
672 delete_entity=True,
673 force_entity_deletion=True,
674 cb_client=cb_client,
675 )
676 self.post_device(device=device)
677 return
679 # We are at a state where the device exists, but only attributes were
680 # changed.
681 # we need to update the device, and the context entry separately,
682 # as update device only takes over a part of the changes to the
683 # ContextBroker.
685 # update device
686 self.update_device(device=device)
688 # update context entry
689 # 1. build context entity from information in device
690 # 2. patch it
691 from filip.models.ngsi_v2.context import ContextEntity, NamedContextAttribute
693 def build_context_entity_from_device(device: Device) -> ContextEntity:
694 from filip.models.base import DataType
696 entity = ContextEntity(id=device.entity_name, type=device.entity_type)
698 for command in device.commands:
699 entity.add_attributes(
700 [
701 # Command attribute will be registered by the device_update
702 NamedContextAttribute(
703 name=f"{command.name}_info", type=DataType.COMMAND_RESULT
704 ),
705 NamedContextAttribute(
706 name=f"{command.name}_status", type=DataType.COMMAND_STATUS
707 ),
708 ]
709 )
710 for attribute in device.attributes:
711 entity.add_attributes(
712 [
713 NamedContextAttribute(
714 name=attribute.name,
715 type=DataType.STRUCTUREDVALUE,
716 metadata=attribute.metadata,
717 )
718 ]
719 )
720 for static_attribute in device.static_attributes:
721 entity.add_attributes(
722 [
723 NamedContextAttribute(
724 name=static_attribute.name,
725 type=static_attribute.type,
726 value=static_attribute.value,
727 metadata=static_attribute.metadata,
728 )
729 ]
730 )
731 return entity
733 if patch_entity:
734 from filip.clients.ngsi_v2 import ContextBrokerClient
736 if cb_client:
737 cb_client_local = deepcopy(cb_client)
738 else:
739 warnings.warn(
740 "No `ContextBrokerClient` object provided! "
741 "Will try to generate one. "
742 "This usage is not recommended."
743 )
745 cb_client_local = ContextBrokerClient(
746 url=cb_url, fiware_header=self.fiware_headers, headers=self.headers
747 )
749 cb_client_local.override_entity(
750 entity=build_context_entity_from_device(device)
751 )
752 cb_client_local.close()
754 def does_device_exists(self, device_id: str) -> bool:
755 """
756 Test if a device with the given id exists in Fiware
757 Args:
758 device_id (str)
759 Returns:
760 bool
761 """
762 try:
763 self.get_device(device_id=device_id)
764 return True
765 except requests.RequestException as err:
766 if err.response is None or not err.response.status_code == 404:
767 self.logger.error(err)
768 msg = f"Could not check device status because of the following reason: {str(err.args[0])}"
769 raise BaseHttpClientException(
770 message=msg, response=err.response
771 ) from err
772 return False
774 # LOG API
775 def get_loglevel_of_agent(self):
776 """
777 Get current loglevel of agent
778 Returns:
780 """
781 url = urljoin(self.base_url, "admin/log")
782 headers = self.headers.copy()
783 del headers["fiware-service"]
784 del headers["fiware-servicepath"]
785 try:
786 res = self.get(url=url, headers=headers)
787 if res.ok:
788 return res.json()["level"]
789 res.raise_for_status()
790 except requests.RequestException as err:
791 self.log_error(err=err)
792 msg = f"Could not check for loglevel status because of the following reason: {str(err.args[0])}"
793 raise BaseHttpClientException(message=msg, response=err.response) from err
795 def change_loglevel_of_agent(self, level: str):
796 """
797 Change current loglevel of agent
799 Args:
800 level:
802 Returns:
804 """
805 level = level.upper()
806 if level not in ["INFO", "ERROR", "FATAL", "DEBUG", "WARNING"]:
807 raise KeyError("Given log level is not supported")
809 url = urljoin(self.base_url, "admin/log")
810 headers = self.headers.copy()
811 new_loglevel = {"level": level}
812 del headers["fiware-service"]
813 del headers["fiware-servicepath"]
814 try:
815 res = self.put(url=url, headers=headers, params=new_loglevel)
816 if res.ok:
817 self.logger.info(
818 "Loglevel of agent at %s " "changed to '%s'",
819 self.base_url,
820 new_loglevel,
821 )
822 else:
823 res.raise_for_status()
824 except requests.RequestException as err:
825 self.log_error(err=err)
826 msg = f"Could not change loglevel because of the following reason: {str(err.args[0])}"
827 raise BaseHttpClientException(message=msg, response=err.response) from err