Coverage for filip/clients/ngsi_v2/iota.py: 83%
247 statements
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-26 14:36 +0000
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-26 14:36 +0000
1"""
2IoT-Agent Module for API Client
3"""
5from __future__ import annotations
7import json
8from copy import deepcopy
9from typing import List, Dict, Set, TYPE_CHECKING, Union, Optional
10import warnings
11from urllib.parse import urljoin
12import requests
13from pydantic import AnyHttpUrl, ValidationError
14from pydantic.type_adapter import TypeAdapter
15from filip.config import settings
16from filip.clients.base_http_client import BaseHttpClient
17from filip.clients.exceptions import BaseHttpClientException
18from filip.models.base import FiwareHeader
19from filip.models.ngsi_v2.iot import (
20 Device,
21 ServiceGroup,
22 DeviceValidationList,
23 DeviceList,
24)
26from filip.utils.filter import filter_device_list, filter_group_list
28if TYPE_CHECKING:
29 from filip.clients.ngsi_v2.cb import ContextBrokerClient
32class IoTAClient(BaseHttpClient):
33 """
34 Client for FIWARE IoT-Agents. The implementation follows the API
35 specifications from here:
36 https://iotagent-node-lib.readthedocs.io/en/latest/
38 Args:
39 url: Url of IoT-Agent
40 session (requests.Session):
41 fiware_header (FiwareHeader): fiware service and fiware service path
42 **kwargs (Optional): Optional arguments that ``request`` takes.
43 """
45 def __init__(
46 self,
47 url: str = None,
48 *,
49 session: requests.Session = None,
50 fiware_header: FiwareHeader = None,
51 **kwargs,
52 ):
53 # set service url
54 url = url or settings.IOTA_URL
55 super().__init__(
56 url=url, session=session, fiware_header=fiware_header, **kwargs
57 )
59 # ABOUT API
60 def get_version(self) -> Dict:
61 """
62 Gets version of IoT Agent
64 Returns:
65 Dictionary with response
66 """
67 url = urljoin(self.base_url, "iot/about")
68 try:
69 res = self.get(url=url, headers=self.headers)
70 if res.ok:
71 return res.json()
72 res.raise_for_status()
73 except requests.RequestException as err:
74 self.logger.error(err)
75 msg = "Could not retrieve version because of following reason: " + str(
76 err.args[0]
77 )
78 raise BaseHttpClientException(message=msg, response=err.response) from err
80 # SERVICE GROUP API
81 def post_groups(
82 self,
83 service_groups: Union[ServiceGroup, List[ServiceGroup]],
84 update: bool = False,
85 ):
86 """
87 Creates a set of service groups for the given service and service_path.
88 The service_group and subservice information will taken from the
89 headers, overwriting any preexisting values.
91 Args:
92 service_groups (list of ServiceGroup): Service groups that will be
93 posted to the agent's API
94 update (bool): If service group already exists try to update its
96 Returns:
97 None
98 """
99 if not isinstance(service_groups, list):
100 service_groups = [service_groups]
101 for group in service_groups:
102 if group.service:
103 assert (
104 group.service == self.headers["fiware-service"]
105 ), "Service group service does not math fiware service"
106 if group.subservice:
107 assert (
108 group.subservice == self.headers["fiware-servicepath"]
109 ), "Service group subservice does not match fiware service path"
111 url = urljoin(self.base_url, "iot/services")
112 headers = self.headers
113 data = {
114 "services": [
115 group.model_dump(exclude={"service", "subservice"}, exclude_none=True)
116 for group in service_groups
117 ]
118 }
119 try:
120 res = self.post(url=url, headers=headers, json=data)
121 if res.ok:
122 self.logger.info("Services successfully posted")
123 elif res.status_code == 409:
124 self.logger.warning(res.text)
125 if len(service_groups) > 1:
126 self.logger.info(
127 "Trying to split bulk operation into " "single operations"
128 )
129 for group in service_groups:
130 self.post_group(service_group=group, update=update)
131 elif update is True:
132 self.update_group(service_group=service_groups[0], fields=None)
133 else:
134 res.raise_for_status()
135 else:
136 res.raise_for_status()
137 except requests.RequestException as err:
138 self.logger.error(err)
139 msg = "Could not post group because of following reason: " + str(
140 err.args[0]
141 )
142 raise BaseHttpClientException(message=msg, response=err.response) from err
144 def post_group(self, service_group: ServiceGroup, update: bool = False):
145 """
146 Single service registration but using the bulk operation in background
148 Args:
149 service_group (ServiceGroup): Service that will be posted to the
150 agent's API
151 update (bool):
153 Returns:
154 None
155 """
156 return self.post_groups(service_groups=[service_group], update=update)
158 def get_group_list(self) -> List[ServiceGroup]:
159 r"""
160 Retrieves service_group groups from the database. If the servicepath
161 header has the wildcard expression, /\*, all the subservices for the
162 service_group are returned. The specific subservice parameters are
163 returned in any other case.
165 Returns:
167 """
168 url = urljoin(self.base_url, "iot/services")
169 headers = self.headers
170 try:
171 res = self.get(url=url, headers=headers)
172 if res.ok:
173 ta = TypeAdapter(List[ServiceGroup])
174 return ta.validate_python(res.json()["services"])
175 res.raise_for_status()
176 except requests.RequestException as err:
177 self.logger.error(err)
178 msg = "Could not retrieve group list because of following reason: " + str(
179 err.args[0]
180 )
181 raise BaseHttpClientException(message=msg, response=err.response) from err
183 def get_group(self, *, resource: str, apikey: str) -> ServiceGroup:
184 """
185 Retrieves service_group groups from the database based on resource and
186 apikey
187 Args:
188 resource:
189 apikey:
190 Returns:
192 """
193 groups = self.get_group_list()
194 groups = filter_group_list(
195 group_list=groups, resources=resource, apikeys=apikey
196 )
197 if len(groups) == 1:
198 group = groups[0]
199 return group
200 elif len(groups) == 0:
201 raise KeyError(
202 f"Service group with resource={resource} and apikey={apikey} was not found"
203 )
204 else:
205 raise NotImplementedError(
206 "There is a wierd error, try get_group_list() for debugging"
207 )
209 def update_groups(
210 self,
211 *,
212 service_groups: Union[ServiceGroup, List[ServiceGroup]],
213 add: False,
214 fields: Union[Set[str], List[str]] = None,
215 ) -> None:
216 """
217 Bulk operation for service group update.
218 Args:
219 fields:
220 service_groups:
221 add:
223 Returns:
225 """
226 if not isinstance(service_groups, list):
227 service_groups = [service_groups]
228 for group in service_groups:
229 self.update_group(service_group=group, fields=fields, add=add)
231 def update_group(
232 self,
233 *,
234 service_group: ServiceGroup,
235 fields: Union[Set[str], List[str]] = None,
236 add: bool = True,
237 ):
238 """
239 Modifies the information for a service group configuration, identified
240 by the resource and apikey query parameters. Takes a service group body
241 as the payload. The body does not have to be complete: for incomplete
242 bodies, just the existing attributes will be updated
244 Args:
245 service_group (ServiceGroup): Service to update.
246 fields: Fields of the service_group to update. If 'None' all allowed
247 fields will be updated
248 add:
249 Returns:
250 None
251 """
252 if fields:
253 if isinstance(fields, list):
254 fields = set(fields)
255 else:
256 fields = None
257 url = urljoin(self.base_url, "iot/services")
258 headers = self.headers
259 params = service_group.model_dump(include={"resource", "apikey"})
260 try:
261 res = self.put(
262 url=url,
263 headers=headers,
264 params=params,
265 json=service_group.model_dump(
266 include=fields, exclude={"service", "subservice"}, exclude_none=True
267 ),
268 )
269 if res.ok:
270 self.logger.info("ServiceGroup updated!")
271 elif (res.status_code == 404) & (add is True):
272 self.post_group(service_group=service_group)
273 else:
274 res.raise_for_status()
275 except requests.RequestException as err:
276 self.logger.error(err)
277 msg = "Could not update group because of following reason: " + str(
278 err.args[0]
279 )
280 raise BaseHttpClientException(message=msg, response=err.response) from err
282 def delete_group(self, *, resource: str, apikey: str):
283 """
284 Deletes a service group in in the IoT-Agent
286 Args:
287 resource:
288 apikey:
290 Returns:
292 """
293 url = urljoin(self.base_url, "iot/services")
294 headers = self.headers
295 params = {"resource": resource, "apikey": apikey}
296 try:
297 res = self.delete(url=url, headers=headers, params=params)
298 if res.ok:
299 self.logger.info(
300 "ServiceGroup with resource: '%s' and "
301 "apikey: '%s' successfully deleted!",
302 resource,
303 apikey,
304 )
305 else:
306 res.raise_for_status()
307 except requests.RequestException as err:
308 self.logger.error(err)
309 msg = (
310 f"Could not delete ServiceGroup with resource "
311 f"'{resource}' and apikey '{apikey}' because of following reason: {str(err.args[0])}"
312 )
313 raise BaseHttpClientException(message=msg, response=err.response) from err
315 # DEVICE API
316 def post_devices(
317 self, *, devices: Union[Device, List[Device]], update: bool = False
318 ) -> None:
319 """
320 Post a device from the device registry. No payload is required
321 or received.
322 If a device already exists in can be updated with update = True
323 Args:
324 devices (list of Devices):
325 update (bool): Whether if the device is already existent it
326 should be updated
327 Returns:
328 None
329 """
330 if not isinstance(devices, list):
331 devices = [devices]
332 url = urljoin(self.base_url, "iot/devices")
333 headers = self.headers
335 data = {
336 "devices": [
337 json.loads(device.model_dump_json(exclude_none=True))
338 for device in devices
339 ]
340 }
341 try:
342 res = self.post(url=url, headers=headers, json=data)
343 if res.ok:
344 self.logger.info("Devices successfully posted!")
345 else:
346 res.raise_for_status()
347 except requests.RequestException as err:
348 if update:
349 return self.update_devices(devices=devices, add=False)
350 self.logger.error(err)
351 msg = "Could not post devices because of following reason: " + str(
352 err.args[0]
353 )
354 raise BaseHttpClientException(message=msg, response=err.response) from err
356 def post_device(self, *, device: Device, update: bool = False) -> None:
357 """
358 Post a device configuration to the IoT-Agent
360 Args:
361 device: IoT device configuration to send
362 update: update device if configuration already exists
364 Returns:
365 None
366 """
367 return self.post_devices(devices=[device], update=update)
369 def get_device_list(
370 self,
371 *,
372 limit: int = None,
373 offset: int = None,
374 device_ids: Union[str, List[str]] = None,
375 entity_names: Union[str, List[str]] = None,
376 entity_types: Union[str, List[str]] = None,
377 include_invalid: bool = False,
378 ) -> Union[List[Union[Device, DeviceList]], DeviceValidationList]:
379 """
380 Returns a list of all the devices in the device registry with all
381 its data. The IoTAgent now only supports "limit" and "offset" as
382 request parameters.
384 Args:
385 limit:
386 if present, limits the number of devices returned in the
387 list. Must be a number between 1 and 1000.
388 offset:
389 if present, skip that number of devices from the original
390 query.
391 device_ids:
392 List of device_ids. If given, only devices with matching ids
393 will be returned
394 entity_names:
395 The entity_ids of the devices. If given, only the devices
396 with the specified entity_id will be returned
397 entity_types:
398 The entity_type of the device. If given, only the devices
399 with the specified entity_type will be returned
400 include_invalid: Specify if the returned list should also contain a list of invalid device IDs or not.
401 Returns:
402 List of matching devices
403 """
404 params = {}
405 if limit:
406 if not 1 < limit < 1000:
407 self.logger.error("'limit' must be an integer between 1 and " "1000!")
408 raise ValueError
409 else:
410 params["limit"] = limit
411 if offset:
412 if not isinstance(offset, int):
413 self.logger.error("'offset' must be an integer!")
414 raise ValueError
415 else:
416 params["offset"] = offset
417 url = urljoin(self.base_url, "iot/devices")
418 headers = self.headers
419 try:
420 res = self.get(url=url, headers=headers, params=params)
421 if res.ok:
422 if include_invalid:
423 valid_devices = []
424 invalid_devices = []
425 ta = TypeAdapter(Device)
426 for device in res.json()["devices"]:
427 try:
428 valid_device = ta.validate_python(device)
429 valid_devices.append(valid_device)
430 except ValidationError:
431 invalid_devices.append(device.get("device_id"))
432 valid_devices = filter_device_list(
433 valid_devices, device_ids, entity_names, entity_types
434 )
435 invalid_devices = filter_device_list(
436 invalid_devices, device_ids, entity_names, entity_types
437 )
438 return DeviceValidationList.model_validate(
439 {
440 "devices": valid_devices,
441 "invalid_devices": invalid_devices,
442 }
443 )
444 else:
445 return filter_device_list(
446 devices=DeviceList.model_validate(
447 {"devices": res.json()["devices"]}
448 ).devices,
449 device_ids=device_ids,
450 entity_names=entity_names,
451 entity_types=entity_types,
452 )
453 res.raise_for_status()
454 except requests.RequestException as err:
455 self.logger.error(err)
456 msg = (
457 "Not able to retrieve the device list because of the following reason:"
458 + str(err.args[0])
459 )
460 raise BaseHttpClientException(message=msg, response=err.response) from err
462 def get_device(self, *, device_id: str) -> Device:
463 """
464 Returns all the information about a particular device.
466 Args:
467 device_id:
468 Raises:
469 requests.RequestException, if device does not exist
470 Returns:
471 Device
473 """
474 url = urljoin(self.base_url, f"iot/devices/{device_id}")
475 headers = self.headers
476 try:
477 res = self.get(url=url, headers=headers)
478 if res.ok:
479 return Device.model_validate(res.json())
480 res.raise_for_status()
481 except requests.RequestException as err:
482 self.logger.error(err)
484 msg = f"Device '{device_id}' was not found because of the following reason: {str(err.args[0])}"
485 raise BaseHttpClientException(message=msg, response=err.response) from err
487 def update_device(self, *, device: Device, add: bool = True) -> None:
488 """
489 Updates a device from the device registry.
490 Adds, removes attributes from the device entry and changes
491 attributes values.
492 It does not change device settings (endpoint,..) and only adds
493 attributes to the corresponding entity, their it does not
494 change any attribute value and does not delete removed attributes
496 Args:
497 device:
498 add (bool): If device not found add it
499 Returns:
500 None
501 """
502 url = urljoin(self.base_url, f"iot/devices/{device.device_id}")
503 headers = self.headers
504 try:
505 res = self.put(
506 url=url,
507 headers=headers,
508 json=device.model_dump(
509 include={"attributes", "lazy", "commands", "static_attributes"},
510 exclude_none=True,
511 ),
512 )
513 if res.ok:
514 self.logger.info("Device '%s' successfully updated!", device.device_id)
515 elif (res.status_code == 404) & (add is True):
516 self.post_device(device=device, update=False)
517 else:
518 res.raise_for_status()
519 except requests.RequestException as err:
520 self.logger.error(err)
521 msg = f"Could not update device '{device.device_id}' because of the following reason: {str(err.args[0])} "
522 raise BaseHttpClientException(message=msg, response=err.response) from err
524 def update_devices(
525 self, *, devices: Union[Device, List[Device]], add: False
526 ) -> None:
527 """
528 Bulk operation for device update.
529 Args:
530 devices:
531 add:
533 Returns:
535 """
536 if not isinstance(devices, list):
537 devices = [devices]
538 for device in devices:
539 self.update_device(device=device, add=add)
541 def delete_device(
542 self,
543 *,
544 device_id: str,
545 cb_url: AnyHttpUrl = settings.CB_URL,
546 delete_entity: bool = False,
547 force_entity_deletion: bool = False,
548 cb_client: ContextBrokerClient = None,
549 ) -> None:
550 """
551 Remove a device from the device registry. No payload is required
552 or received.
554 Args:
555 device_id: str, ID of Device
556 delete_entity: False -> Only delete the device entry,
557 the automatically created and linked
558 context-entity will continue to
559 exist in Fiware
560 True -> Also delete the automatically
561 created and linked context-entity
562 If multiple devices are linked to this
563 entity, this operation is not executed and
564 an exception is raised
565 force_entity_deletion:
566 bool, if delete_entity is true and multiple devices are linked
567 to the linked entity, delete it and do not raise an error
568 cb_client (ContextBrokerClient):
569 Corresponding ContextBrokerClient object for entity manipulation
570 cb_url (AnyHttpUrl):
571 Url of the ContextBroker where the entity is found.
572 This will autogenerate an CB-Client, mirroring the information
573 of the IoTA-Client, e.g. FiwareHeader, and other headers
574 (not recommended!)
576 Returns:
577 None
578 """
579 url = urljoin(
580 self.base_url,
581 f"iot/devices/{device_id}",
582 )
583 headers = self.headers
585 device = self.get_device(device_id=device_id)
587 try:
588 res = self.delete(url=url, headers=headers)
589 if res.ok:
590 self.logger.info("Device '%s' successfully deleted!", device_id)
591 else:
592 res.raise_for_status()
593 except requests.RequestException as err:
594 msg = f"Could not delete device {device_id}!"
595 raise BaseHttpClientException(message=msg, response=err.response) from err
597 if delete_entity:
598 # An entity can technically belong to multiple devices
599 # Only delete the entity if
600 devices = self.get_device_list(entity_names=[device.entity_name])
602 # Zero because we count the remaining devices
603 if len(devices) > 0 and not force_entity_deletion:
604 raise Exception(
605 f"The corresponding entity to the device "
606 f"{device_id} was not deleted because it is "
607 f"linked to multiple devices. "
608 )
609 else:
610 cb_client_local = None
611 try:
612 from filip.clients.ngsi_v2 import ContextBrokerClient
614 if cb_client:
615 cb_client_local = deepcopy(cb_client)
616 else:
617 warnings.warn(
618 "No `ContextBrokerClient` "
619 "object provided! Will try to generate "
620 "one. This usage is not recommended."
621 )
623 cb_client_local = ContextBrokerClient(
624 url=cb_url,
625 fiware_header=self.fiware_headers,
626 headers=headers,
627 )
629 cb_client_local.delete_entity(
630 entity_id=device.entity_name, entity_type=device.entity_type
631 )
633 except requests.RequestException as err:
634 # Do not throw an error
635 # It is only important that the entity does not exist after
636 # this methode, not if this methode actively deleted it
637 pass
639 if cb_client_local:
640 cb_client_local.close()
642 def patch_device(
643 self,
644 device: Device,
645 patch_entity: bool = True,
646 cb_client: ContextBrokerClient = None,
647 cb_url: AnyHttpUrl = settings.CB_URL,
648 ) -> None:
649 """
650 Updates a device state in Fiware, if the device does not exist it
651 is created, else its values are updated.
652 If the device settings were changed the device and
653 entity are deleted and re-added.
655 If patch_entity is true the corresponding entity in the ContextBroker is
656 also correctly updated. Else only new attributes are added there.
658 Args:
659 device (Device): Device to be posted to /updated in Fiware
660 patch_entity (bool): If true the corresponding entity is
661 completely synced
662 cb_client (ContextBrokerClient):
663 Corresponding ContextBrokerClient object for entity manipulation
664 cb_url (AnyHttpUrl):
665 Url of the ContextBroker where the entity is found.
666 This will autogenerate an CB-Client, mirroring the information
667 of the IoTA-Client, e.g. FiwareHeader, and other headers
668 (not recommended!)
670 Returns:
671 None
672 """
673 try:
674 live_device = self.get_device(device_id=device.device_id)
675 except requests.RequestException:
676 # device does not exist yet, post it
677 self.post_device(device=device)
678 return
680 # if the device settings were changed we need to delete the device
681 # and repost it
682 settings_dict = {
683 "device_id",
684 "service",
685 "service_path",
686 "entity_name",
687 "entity_type",
688 "timestamp",
689 "apikey",
690 "endpoint",
691 "protocol",
692 "transport",
693 "expressionLanguage",
694 }
696 live_settings = live_device.model_dump(include=settings_dict)
697 new_settings = device.model_dump(include=settings_dict)
699 if not live_settings == new_settings:
700 self.delete_device(
701 device_id=device.device_id,
702 delete_entity=True,
703 force_entity_deletion=True,
704 cb_client=cb_client,
705 )
706 self.post_device(device=device)
707 return
709 # We are at a state where the device exists, but only attributes were
710 # changed.
711 # we need to update the device, and the context entry separately,
712 # as update device only takes over a part of the changes to the
713 # ContextBroker.
715 # update device
716 self.update_device(device=device)
718 # update context entry
719 # 1. build context entity from information in device
720 # 2. patch it
721 from filip.models.ngsi_v2.context import ContextEntity, NamedContextAttribute
723 def build_context_entity_from_device(device: Device) -> ContextEntity:
724 from filip.models.base import DataType
726 entity = ContextEntity(id=device.entity_name, type=device.entity_type)
728 for command in device.commands:
729 entity.add_attributes(
730 [
731 # Command attribute will be registered by the device_update
732 NamedContextAttribute(
733 name=f"{command.name}_info", type=DataType.COMMAND_RESULT
734 ),
735 NamedContextAttribute(
736 name=f"{command.name}_status", type=DataType.COMMAND_STATUS
737 ),
738 ]
739 )
740 for attribute in device.attributes:
741 entity.add_attributes(
742 [
743 NamedContextAttribute(
744 name=attribute.name,
745 type=DataType.STRUCTUREDVALUE,
746 metadata=attribute.metadata,
747 )
748 ]
749 )
750 for static_attribute in device.static_attributes:
751 entity.add_attributes(
752 [
753 NamedContextAttribute(
754 name=static_attribute.name,
755 type=static_attribute.type,
756 value=static_attribute.value,
757 metadata=static_attribute.metadata,
758 )
759 ]
760 )
761 return entity
763 if patch_entity:
764 from filip.clients.ngsi_v2 import ContextBrokerClient
766 if cb_client:
767 cb_client_local = deepcopy(cb_client)
768 else:
769 warnings.warn(
770 "No `ContextBrokerClient` object provided! "
771 "Will try to generate one. "
772 "This usage is not recommended."
773 )
775 cb_client_local = ContextBrokerClient(
776 url=cb_url, fiware_header=self.fiware_headers, headers=self.headers
777 )
779 cb_client_local.override_entity(
780 entity=build_context_entity_from_device(device)
781 )
782 cb_client_local.close()
784 def does_device_exists(self, device_id: str) -> bool:
785 """
786 Test if a device with the given id exists in Fiware
787 Args:
788 device_id (str)
789 Returns:
790 bool
791 """
792 try:
793 self.get_device(device_id=device_id)
794 return True
795 except requests.RequestException as err:
796 if err.response is None or not err.response.status_code == 404:
797 self.logger.error(err)
798 msg = f"Could not check device status because of the following reason: {str(err.args[0])}"
799 raise BaseHttpClientException(
800 message=msg, response=err.response
801 ) from err
802 return False
804 # LOG API
805 def get_loglevel_of_agent(self):
806 """
807 Get current loglevel of agent
808 Returns:
810 """
811 url = urljoin(self.base_url, "admin/log")
812 headers = self.headers.copy()
813 del headers["fiware-service"]
814 del headers["fiware-servicepath"]
815 try:
816 res = self.get(url=url, headers=headers)
817 if res.ok:
818 return res.json()["level"]
819 res.raise_for_status()
820 except requests.RequestException as err:
821 self.log_error(err=err)
822 msg = f"Could not check for loglevel status because of the following reason: {str(err.args[0])}"
823 raise BaseHttpClientException(message=msg, response=err.response) from err
825 def change_loglevel_of_agent(self, level: str):
826 """
827 Change current loglevel of agent
829 Args:
830 level:
832 Returns:
834 """
835 level = level.upper()
836 if level not in ["INFO", "ERROR", "FATAL", "DEBUG", "WARNING"]:
837 raise KeyError("Given log level is not supported")
839 url = urljoin(self.base_url, "admin/log")
840 headers = self.headers.copy()
841 new_loglevel = {"level": level}
842 del headers["fiware-service"]
843 del headers["fiware-servicepath"]
844 try:
845 res = self.put(url=url, headers=headers, params=new_loglevel)
846 if res.ok:
847 self.logger.info(
848 "Loglevel of agent at %s " "changed to '%s'",
849 self.base_url,
850 new_loglevel,
851 )
852 else:
853 res.raise_for_status()
854 except requests.RequestException as err:
855 self.log_error(err=err)
856 msg = f"Could not change loglevel because of the following reason: {str(err.args[0])}"
857 raise BaseHttpClientException(message=msg, response=err.response) from err