Coverage for filip/clients/ngsi_v2/iota.py: 77%
231 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-03-10 13:43 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-03-10 13:43 +0000
1"""
2IoT-Agent Module for API Client
3"""
5from __future__ import annotations
7import json
8from copy import deepcopy
9from typing import List, Dict, Set, TYPE_CHECKING, Union, Optional
10import warnings
11from urllib.parse import urljoin
12import requests
13from pydantic import AnyHttpUrl
14from pydantic.type_adapter import TypeAdapter
15from filip.config import settings
16from filip.clients.base_http_client import BaseHttpClient
17from filip.models.base import FiwareHeader
18from filip.models.ngsi_v2.iot import Device, ServiceGroup
20from filip.utils.filter import filter_device_list, filter_group_list
22if TYPE_CHECKING:
23 from filip.clients.ngsi_v2.cb import ContextBrokerClient
26class IoTAClient(BaseHttpClient):
27 """
28 Client for FIWARE IoT-Agents. The implementation follows the API
29 specifications from here:
30 https://iotagent-node-lib.readthedocs.io/en/latest/
32 Args:
33 url: Url of IoT-Agent
34 session (requests.Session):
35 fiware_header (FiwareHeader): fiware service and fiware service path
36 **kwargs (Optional): Optional arguments that ``request`` takes.
37 """
39 def __init__(
40 self,
41 url: str = None,
42 *,
43 session: requests.Session = None,
44 fiware_header: FiwareHeader = None,
45 **kwargs,
46 ):
47 # set service url
48 url = url or settings.IOTA_URL
49 super().__init__(
50 url=url, session=session, fiware_header=fiware_header, **kwargs
51 )
53 # ABOUT API
54 def get_version(self) -> Dict:
55 """
56 Gets version of IoT Agent
58 Returns:
59 Dictionary with response
60 """
61 url = urljoin(self.base_url, "iot/about")
62 try:
63 res = self.get(url=url, headers=self.headers)
64 if res.ok:
65 return res.json()
66 res.raise_for_status()
67 except requests.RequestException as err:
68 self.logger.error(err)
69 raise
71 # SERVICE GROUP API
72 def post_groups(
73 self,
74 service_groups: Union[ServiceGroup, List[ServiceGroup]],
75 update: bool = False,
76 ):
77 """
78 Creates a set of service groups for the given service and service_path.
79 The service_group and subservice information will taken from the
80 headers, overwriting any preexisting values.
82 Args:
83 service_groups (list of ServiceGroup): Service groups that will be
84 posted to the agent's API
85 update (bool): If service group already exists try to update its
87 Returns:
88 None
89 """
90 if not isinstance(service_groups, list):
91 service_groups = [service_groups]
92 for group in service_groups:
93 if group.service:
94 assert (
95 group.service == self.headers["fiware-service"]
96 ), "Service group service does not math fiware service"
97 if group.subservice:
98 assert (
99 group.subservice == self.headers["fiware-servicepath"]
100 ), "Service group subservice does not math fiware service path"
102 url = urljoin(self.base_url, "iot/services")
103 headers = self.headers
104 data = {
105 "services": [
106 group.model_dump(exclude={"service", "subservice"}, exclude_none=True)
107 for group in service_groups
108 ]
109 }
110 try:
111 res = self.post(url=url, headers=headers, json=data)
112 if res.ok:
113 self.logger.info("Services successfully posted")
114 elif res.status_code == 409:
115 self.logger.warning(res.text)
116 if len(service_groups) > 1:
117 self.logger.info(
118 "Trying to split bulk operation into " "single operations"
119 )
120 for group in service_groups:
121 self.post_group(service_group=group, update=update)
122 elif update is True:
123 self.update_group(service_group=service_groups[0], fields=None)
124 else:
125 res.raise_for_status()
126 else:
127 res.raise_for_status()
128 except requests.RequestException as err:
129 self.log_error(err=err, msg=None)
130 raise
132 def post_group(self, service_group: ServiceGroup, update: bool = False):
133 """
134 Single service registration but using the bulk operation in background
136 Args:
137 service_group (ServiceGroup): Service that will be posted to the
138 agent's API
139 update (bool):
141 Returns:
142 None
143 """
144 return self.post_groups(service_groups=[service_group], update=update)
146 def get_group_list(self) -> List[ServiceGroup]:
147 r"""
148 Retrieves service_group groups from the database. If the servicepath
149 header has the wildcard expression, /\*, all the subservices for the
150 service_group are returned. The specific subservice parameters are
151 returned in any other case.
153 Returns:
155 """
156 url = urljoin(self.base_url, "iot/services")
157 headers = self.headers
158 try:
159 res = self.get(url=url, headers=headers)
160 if res.ok:
161 ta = TypeAdapter(List[ServiceGroup])
162 return ta.validate_python(res.json()["services"])
163 res.raise_for_status()
164 except requests.RequestException as err:
165 self.log_error(err=err, msg=None)
166 raise
168 def get_group(self, *, resource: str, apikey: str) -> ServiceGroup:
169 """
170 Retrieves service_group groups from the database based on resource and
171 apikey
172 Args:
173 resource:
174 apikey:
175 Returns:
177 """
178 groups = self.get_group_list()
179 groups = filter_group_list(
180 group_list=groups, resources=resource, apikeys=apikey
181 )
182 if len(groups) == 1:
183 group = groups[0]
184 return group
185 elif len(groups) == 0:
186 raise KeyError(
187 f"Service group with resource={resource} and apikey={apikey} was not found"
188 )
189 else:
190 raise NotImplementedError(
191 "There is a wierd error, try get_group_list() for debugging"
192 )
194 def update_groups(
195 self,
196 *,
197 service_groups: Union[ServiceGroup, List[ServiceGroup]],
198 add: False,
199 fields: Union[Set[str], List[str]] = None,
200 ) -> None:
201 """
202 Bulk operation for service group update.
203 Args:
204 fields:
205 service_groups:
206 add:
208 Returns:
210 """
211 if not isinstance(service_groups, list):
212 service_groups = [service_groups]
213 for group in service_groups:
214 self.update_group(service_group=group, fields=fields, add=add)
216 def update_group(
217 self,
218 *,
219 service_group: ServiceGroup,
220 fields: Union[Set[str], List[str]] = None,
221 add: bool = True,
222 ):
223 """
224 Modifies the information for a service group configuration, identified
225 by the resource and apikey query parameters. Takes a service group body
226 as the payload. The body does not have to be complete: for incomplete
227 bodies, just the existing attributes will be updated
229 Args:
230 service_group (ServiceGroup): Service to update.
231 fields: Fields of the service_group to update. If 'None' all allowed
232 fields will be updated
233 add:
234 Returns:
235 None
236 """
237 if fields:
238 if isinstance(fields, list):
239 fields = set(fields)
240 else:
241 fields = None
242 url = urljoin(self.base_url, "iot/services")
243 headers = self.headers
244 params = service_group.model_dump(include={"resource", "apikey"})
245 try:
246 res = self.put(
247 url=url,
248 headers=headers,
249 params=params,
250 json=service_group.model_dump(
251 include=fields, exclude={"service", "subservice"}, exclude_none=True
252 ),
253 )
254 if res.ok:
255 self.logger.info("ServiceGroup updated!")
256 elif (res.status_code == 404) & (add is True):
257 self.post_group(service_group=service_group)
258 else:
259 res.raise_for_status()
260 except requests.RequestException as err:
261 self.log_error(err=err, msg=None)
262 raise
264 def delete_group(self, *, resource: str, apikey: str):
265 """
266 Deletes a service group in in the IoT-Agent
268 Args:
269 resource:
270 apikey:
272 Returns:
274 """
275 url = urljoin(self.base_url, "iot/services")
276 headers = self.headers
277 params = {"resource": resource, "apikey": apikey}
278 try:
279 res = self.delete(url=url, headers=headers, params=params)
280 if res.ok:
281 self.logger.info(
282 "ServiceGroup with resource: '%s' and "
283 "apikey: '%s' successfully deleted!",
284 resource,
285 apikey,
286 )
287 else:
288 res.raise_for_status()
289 except requests.RequestException as err:
290 msg = (
291 f"Could not delete ServiceGroup with resource "
292 f"'{resource}' and apikey '{apikey}'!"
293 )
294 self.log_error(err=err, msg=msg)
295 raise
297 # DEVICE API
298 def post_devices(
299 self, *, devices: Union[Device, List[Device]], update: bool = False
300 ) -> None:
301 """
302 Post a device from the device registry. No payload is required
303 or received.
304 If a device already exists in can be updated with update = True
305 Args:
306 devices (list of Devices):
307 update (bool): Whether if the device is already existent it
308 should be updated
309 Returns:
310 None
311 """
312 if not isinstance(devices, list):
313 devices = [devices]
314 url = urljoin(self.base_url, "iot/devices")
315 headers = self.headers
317 data = {
318 "devices": [
319 json.loads(device.model_dump_json(exclude_none=True))
320 for device in devices
321 ]
322 }
323 try:
324 res = self.post(url=url, headers=headers, json=data)
325 if res.ok:
326 self.logger.info("Devices successfully posted!")
327 else:
328 res.raise_for_status()
329 except requests.RequestException as err:
330 if update:
331 return self.update_devices(devices=devices, add=False)
332 msg = "Could not post devices"
333 self.log_error(err=err, msg=msg)
334 raise
336 def post_device(self, *, device: Device, update: bool = False) -> None:
337 """
338 Post a device configuration to the IoT-Agent
340 Args:
341 device: IoT device configuration to send
342 update: update device if configuration already exists
344 Returns:
345 None
346 """
347 return self.post_devices(devices=[device], update=update)
349 def get_device_list(
350 self,
351 *,
352 limit: int = None,
353 offset: int = None,
354 device_ids: Union[str, List[str]] = None,
355 entity_names: Union[str, List[str]] = None,
356 entity_types: Union[str, List[str]] = None,
357 ) -> List[Device]:
358 """
359 Returns a list of all the devices in the device registry with all
360 its data. The IoTAgent now only supports "limit" and "offset" as
361 request parameters.
363 Args:
364 limit:
365 if present, limits the number of devices returned in the
366 list. Must be a number between 1 and 1000.
367 offset:
368 if present, skip that number of devices from the original
369 query.
370 device_ids:
371 List of device_ids. If given, only devices with matching ids
372 will be returned
373 entity_names:
374 The entity_ids of the devices. If given, only the devices
375 with the specified entity_id will be returned
376 entity_types:
377 The entity_type of the device. If given, only the devices
378 with the specified entity_type will be returned
380 Returns:
381 List of matching devices
382 """
383 if limit:
384 if not 1 < limit < 1000:
385 self.logger.error("'limit' must be an integer between 1 and " "1000!")
386 raise ValueError
387 url = urljoin(self.base_url, "iot/devices")
388 headers = self.headers
389 params = {key: value for key, value in locals().items() if value is not None}
390 try:
391 res = self.get(url=url, headers=headers, params=params)
392 if res.ok:
393 ta = TypeAdapter(List[Device])
394 devices = ta.validate_python(res.json()["devices"])
395 # filter by device_ids, entity_names or entity_types
396 devices = filter_device_list(
397 devices, device_ids, entity_names, entity_types
398 )
399 return devices
400 res.raise_for_status()
401 except requests.RequestException as err:
402 self.log_error(err=err, msg=None)
403 raise
405 def get_device(self, *, device_id: str) -> Device:
406 """
407 Returns all the information about a particular device.
409 Args:
410 device_id:
411 Raises:
412 requests.RequestException, if device does not exist
413 Returns:
414 Device
416 """
417 url = urljoin(self.base_url, f"iot/devices/{device_id}")
418 headers = self.headers
419 try:
420 res = self.get(url=url, headers=headers)
421 if res.ok:
422 return Device.model_validate(res.json())
423 res.raise_for_status()
424 except requests.RequestException as err:
425 msg = f"Device {device_id} was not found"
426 self.log_error(err=err, msg=msg)
427 raise
429 def update_device(self, *, device: Device, add: bool = True) -> None:
430 """
431 Updates a device from the device registry.
432 Adds, removes attributes from the device entry and changes
433 attributes values.
434 It does not change device settings (endpoint,..) and only adds
435 attributes to the corresponding entity, their it does not
436 change any attribute value and does not delete removed attributes
438 Args:
439 device:
440 add (bool): If device not found add it
441 Returns:
442 None
443 """
444 url = urljoin(self.base_url, f"iot/devices/{device.device_id}")
445 headers = self.headers
446 try:
447 res = self.put(
448 url=url,
449 headers=headers,
450 json=device.model_dump(
451 include={"attributes", "lazy", "commands", "static_attributes"},
452 exclude_none=True,
453 ),
454 )
455 if res.ok:
456 self.logger.info("Device '%s' successfully updated!", device.device_id)
457 elif (res.status_code == 404) & (add is True):
458 self.post_device(device=device, update=False)
459 else:
460 res.raise_for_status()
461 except requests.RequestException as err:
462 msg = f"Could not update device '{device.device_id}'"
463 self.log_error(err=err, msg=msg)
464 raise
466 def update_devices(
467 self, *, devices: Union[Device, List[Device]], add: False
468 ) -> None:
469 """
470 Bulk operation for device update.
471 Args:
472 devices:
473 add:
475 Returns:
477 """
478 if not isinstance(devices, list):
479 devices = [devices]
480 for device in devices:
481 self.update_device(device=device, add=add)
483 def delete_device(
484 self,
485 *,
486 device_id: str,
487 cb_url: AnyHttpUrl = settings.CB_URL,
488 delete_entity: bool = False,
489 force_entity_deletion: bool = False,
490 cb_client: ContextBrokerClient = None,
491 ) -> None:
492 """
493 Remove a device from the device registry. No payload is required
494 or received.
496 Args:
497 device_id: str, ID of Device
498 delete_entity: False -> Only delete the device entry,
499 the automatically created and linked
500 context-entity will continue to
501 exist in Fiware
502 True -> Also delete the automatically
503 created and linked context-entity
504 If multiple devices are linked to this
505 entity, this operation is not executed and
506 an exception is raised
507 force_entity_deletion:
508 bool, if delete_entity is true and multiple devices are linked
509 to the linked entity, delete it and do not raise an error
510 cb_client (ContextBrokerClient):
511 Corresponding ContextBrokerClient object for entity manipulation
512 cb_url (AnyHttpUrl):
513 Url of the ContextBroker where the entity is found.
514 This will autogenerate an CB-Client, mirroring the information
515 of the IoTA-Client, e.g. FiwareHeader, and other headers
516 (not recommended!)
518 Returns:
519 None
520 """
521 url = urljoin(
522 self.base_url,
523 f"iot/devices/{device_id}",
524 )
525 headers = self.headers
527 device = self.get_device(device_id=device_id)
529 try:
530 res = self.delete(url=url, headers=headers)
531 if res.ok:
532 self.logger.info("Device '%s' successfully deleted!", device_id)
533 else:
534 res.raise_for_status()
535 except requests.RequestException as err:
536 msg = f"Could not delete device {device_id}!"
537 self.log_error(err=err, msg=msg)
538 raise
540 if delete_entity:
541 # An entity can technically belong to multiple devices
542 # Only delete the entity if
543 devices = self.get_device_list(entity_names=[device.entity_name])
545 # Zero because we count the remaining devices
546 if len(devices) > 0 and not force_entity_deletion:
547 raise Exception(
548 f"The corresponding entity to the device "
549 f"{device_id} was not deleted because it is "
550 f"linked to multiple devices. "
551 )
552 else:
553 try:
554 from filip.clients.ngsi_v2 import ContextBrokerClient
556 if cb_client:
557 cb_client_local = deepcopy(cb_client)
558 else:
559 warnings.warn(
560 "No `ContextBrokerClient` "
561 "object providesd! Will try to generate "
562 "one. This usage is not recommended."
563 )
565 cb_client_local = ContextBrokerClient(
566 url=cb_url,
567 fiware_header=self.fiware_headers,
568 headers=headers,
569 )
571 cb_client_local.delete_entity(
572 entity_id=device.entity_name, entity_type=device.entity_type
573 )
575 except requests.RequestException as err:
576 # Do not throw an error
577 # It is only important that the entity does not exist after
578 # this methode, not if this methode actively deleted it
579 pass
581 cb_client_local.close()
583 def patch_device(
584 self,
585 device: Device,
586 patch_entity: bool = True,
587 cb_client: ContextBrokerClient = None,
588 cb_url: AnyHttpUrl = settings.CB_URL,
589 ) -> None:
590 """
591 Updates a device state in Fiware, if the device does not exists it
592 is created, else its values are updated.
593 If the device settings (endpoint,..) were changed the device and
594 entity are deleted and re-added.
596 If patch_entity is true the corresponding entity in the ContextBroker is
597 also correctly updated. Else only new attributes are added there.
599 Args:
600 device (Device): Device to be posted to /updated in Fiware
601 patch_entity (bool): If true the corresponding entity is
602 completely synced
603 cb_client (ContextBrokerClient):
604 Corresponding ContextBrokerClient object for entity manipulation
605 cb_url (AnyHttpUrl):
606 Url of the ContextBroker where the entity is found.
607 This will autogenerate an CB-Client, mirroring the information
608 of the IoTA-Client, e.g. FiwareHeader, and other headers
609 (not recommended!)
611 Returns:
612 None
613 """
614 try:
615 live_device = self.get_device(device_id=device.device_id)
616 except requests.RequestException:
617 # device does not exist yet, post it
618 self.post_device(device=device)
619 return
621 # if the device settings were changed we need to delete the device
622 # and repost it
623 settings_dict = {
624 "device_id",
625 "service",
626 "service_path",
627 "entity_name",
628 "entity_type",
629 "timestamp",
630 "apikey",
631 "endpoint",
632 "protocol",
633 "transport",
634 "expressionLanguage",
635 }
637 live_settings = live_device.model_dump(include=settings_dict)
638 new_settings = device.model_dump(include=settings_dict)
640 if not live_settings == new_settings:
641 self.delete_device(
642 device_id=device.device_id,
643 delete_entity=True,
644 force_entity_deletion=True,
645 cb_client=cb_client,
646 )
647 self.post_device(device=device)
648 return
650 # We are at a state where the device exists, but only attributes were
651 # changed.
652 # we need to update the device, and the context entry separately,
653 # as update device only takes over a part of the changes to the
654 # ContextBroker.
656 # update device
657 self.update_device(device=device)
659 # update context entry
660 # 1. build context entity from information in device
661 # 2. patch it
662 from filip.models.ngsi_v2.context import ContextEntity, NamedContextAttribute
664 def build_context_entity_from_device(device: Device) -> ContextEntity:
665 from filip.models.base import DataType
667 entity = ContextEntity(id=device.entity_name, type=device.entity_type)
669 for command in device.commands:
670 entity.add_attributes(
671 [
672 # Command attribute will be registered by the device_update
673 NamedContextAttribute(
674 name=f"{command.name}_info", type=DataType.COMMAND_RESULT
675 ),
676 NamedContextAttribute(
677 name=f"{command.name}_status", type=DataType.COMMAND_STATUS
678 ),
679 ]
680 )
681 for attribute in device.attributes:
682 entity.add_attributes(
683 [
684 NamedContextAttribute(
685 name=attribute.name,
686 type=DataType.STRUCTUREDVALUE,
687 metadata=attribute.metadata,
688 )
689 ]
690 )
691 for static_attribute in device.static_attributes:
692 entity.add_attributes(
693 [
694 NamedContextAttribute(
695 name=static_attribute.name,
696 type=static_attribute.type,
697 value=static_attribute.value,
698 metadata=static_attribute.metadata,
699 )
700 ]
701 )
702 return entity
704 if patch_entity:
705 from filip.clients.ngsi_v2 import ContextBrokerClient
707 if cb_client:
708 cb_client_local = deepcopy(cb_client)
709 else:
710 warnings.warn(
711 "No `ContextBrokerClient` object provided! "
712 "Will try to generate one. "
713 "This usage is not recommended."
714 )
716 cb_client_local = ContextBrokerClient(
717 url=cb_url, fiware_header=self.fiware_headers, headers=self.headers
718 )
720 cb_client_local.patch_entity(
721 entity=build_context_entity_from_device(device)
722 )
723 cb_client_local.close()
725 def does_device_exists(self, device_id: str) -> bool:
726 """
727 Test if a device with the given id exists in Fiware
728 Args:
729 device_id (str)
730 Returns:
731 bool
732 """
733 try:
734 self.get_device(device_id=device_id)
735 return True
736 except requests.RequestException as err:
737 if err.response is None or not err.response.status_code == 404:
738 self.log_error(
739 err=err,
740 msg=f"Error while checking existence for device {device_id}",
741 )
742 raise
743 return False
745 # LOG API
746 def get_loglevel_of_agent(self):
747 """
748 Get current loglevel of agent
749 Returns:
751 """
752 url = urljoin(self.base_url, "admin/log")
753 headers = self.headers.copy()
754 del headers["fiware-service"]
755 del headers["fiware-servicepath"]
756 try:
757 res = self.get(url=url, headers=headers)
758 if res.ok:
759 return res.json()["level"]
760 res.raise_for_status()
761 except requests.RequestException as err:
762 self.log_error(err=err)
763 raise
765 def change_loglevel_of_agent(self, level: str):
766 """
767 Change current loglevel of agent
769 Args:
770 level:
772 Returns:
774 """
775 level = level.upper()
776 if level not in ["INFO", "ERROR", "FATAL", "DEBUG", "WARNING"]:
777 raise KeyError("Given log level is not supported")
779 url = urljoin(self.base_url, "admin/log")
780 headers = self.headers.copy()
781 del headers["fiware-service"]
782 del headers["fiware-servicepath"]
783 try:
784 res = self.put(url=url, headers=headers, params=level)
785 if res.ok:
786 self.logger.info(
787 "Loglevel of agent at %s " "changed to '%s'", self.base_url, level
788 )
789 else:
790 res.raise_for_status()
791 except requests.RequestException as err:
792 self.log_error(err=err)
793 raise