Coverage for filip/clients/ngsi_v2/iota.py: 77%
231 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-20 16:54 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-20 16:54 +0000
1"""
2IoT-Agent Module for API Client
3"""
4from __future__ import annotations
6import json
7from copy import deepcopy
8from typing import List, Dict, Set, TYPE_CHECKING, Union, Optional
9import warnings
10from urllib.parse import urljoin
11import requests
12from pydantic import AnyHttpUrl
13from pydantic.type_adapter import TypeAdapter
14from filip.config import settings
15from filip.clients.base_http_client import BaseHttpClient
16from filip.models.base import FiwareHeader
17from filip.models.ngsi_v2.iot import Device, ServiceGroup
19from filip.utils.filter import filter_device_list, filter_group_list
21if TYPE_CHECKING:
22 from filip.clients.ngsi_v2.cb import ContextBrokerClient
25class IoTAClient(BaseHttpClient):
26 """
27 Client for FIWARE IoT-Agents. The implementation follows the API
28 specifications from here:
29 https://iotagent-node-lib.readthedocs.io/en/latest/
31 Args:
32 url: Url of IoT-Agent
33 session (requests.Session):
34 fiware_header (FiwareHeader): fiware service and fiware service path
35 **kwargs (Optional): Optional arguments that ``request`` takes.
36 """
38 def __init__(self,
39 url: str = None,
40 *,
41 session: requests.Session = None,
42 fiware_header: FiwareHeader = None,
43 **kwargs):
44 # set service url
45 url = url or settings.IOTA_URL
46 super().__init__(url=url,
47 session=session,
48 fiware_header=fiware_header,
49 **kwargs)
51 # ABOUT API
52 def get_version(self) -> Dict:
53 """
54 Gets version of IoT Agent
56 Returns:
57 Dictionary with response
58 """
59 url = urljoin(self.base_url, 'iot/about')
60 try:
61 res = self.get(url=url, headers=self.headers)
62 if res.ok:
63 return res.json()
64 res.raise_for_status()
65 except requests.RequestException as err:
66 self.logger.error(err)
67 raise
69 # SERVICE GROUP API
70 def post_groups(self,
71 service_groups: Union[ServiceGroup, List[ServiceGroup]],
72 update: bool = False):
73 """
74 Creates a set of service groups for the given service and service_path.
75 The service_group and subservice information will taken from the
76 headers, overwriting any preexisting values.
78 Args:
79 service_groups (list of ServiceGroup): Service groups that will be
80 posted to the agent's API
81 update (bool): If service group already exists try to update its
83 Returns:
84 None
85 """
86 if not isinstance(service_groups, list):
87 service_groups = [service_groups]
88 for group in service_groups:
89 if group.service:
90 assert group.service == self.headers['fiware-service'], \
91 "Service group service does not math fiware service"
92 if group.subservice:
93 assert group.subservice == self.headers['fiware-servicepath'], \
94 "Service group subservice does not math fiware service path"
96 url = urljoin(self.base_url, 'iot/services')
97 headers = self.headers
98 data = {'services': [group.model_dump(exclude={'service', 'subservice'},
99 exclude_none=True)
100 for group in service_groups]}
101 try:
102 res = self.post(url=url, headers=headers, json=data)
103 if res.ok:
104 self.logger.info("Services successfully posted")
105 elif res.status_code == 409:
106 self.logger.warning(res.text)
107 if len(service_groups) > 1:
108 self.logger.info("Trying to split bulk operation into "
109 "single operations")
110 for group in service_groups:
111 self.post_group(service_group=group, update=update)
112 elif update is True:
113 self.update_group(service_group=service_groups[0],
114 fields=None)
115 else:
116 res.raise_for_status()
117 else:
118 res.raise_for_status()
119 except requests.RequestException as err:
120 self.log_error(err=err, msg=None)
121 raise
123 def post_group(self, service_group: ServiceGroup, update: bool = False):
124 """
125 Single service registration but using the bulk operation in background
127 Args:
128 service_group (ServiceGroup): Service that will be posted to the
129 agent's API
130 update (bool):
132 Returns:
133 None
134 """
135 return self.post_groups(service_groups=[service_group],
136 update=update)
138 def get_group_list(self) -> List[ServiceGroup]:
139 r"""
140 Retrieves service_group groups from the database. If the servicepath
141 header has the wildcard expression, /\*, all the subservices for the
142 service_group are returned. The specific subservice parameters are
143 returned in any other case.
145 Returns:
147 """
148 url = urljoin(self.base_url, 'iot/services')
149 headers = self.headers
150 try:
151 res = self.get(url=url, headers=headers)
152 if res.ok:
153 ta = TypeAdapter(List[ServiceGroup])
154 return ta.validate_python(res.json()['services'])
155 res.raise_for_status()
156 except requests.RequestException as err:
157 self.log_error(err=err, msg=None)
158 raise
160 def get_group(self, *, resource: str, apikey: str) -> ServiceGroup:
161 """
162 Retrieves service_group groups from the database based on resource and
163 apikey
164 Args:
165 resource:
166 apikey:
167 Returns:
169 """
170 groups = self.get_group_list()
171 groups = filter_group_list(group_list=groups, resources=resource, apikeys=apikey)
172 if len(groups) == 1:
173 group = groups[0]
174 return group
175 elif len(groups) == 0:
176 raise KeyError(f"Service group with resource={resource} and apikey={apikey} was not found")
177 else:
178 raise NotImplementedError("There is a wierd error, try get_group_list() for debugging")
180 def update_groups(self, *,
181 service_groups: Union[ServiceGroup, List[ServiceGroup]],
182 add: False,
183 fields: Union[Set[str], List[str]] = None) -> None:
184 """
185 Bulk operation for service group update.
186 Args:
187 fields:
188 service_groups:
189 add:
191 Returns:
193 """
194 if not isinstance(service_groups, list):
195 service_groups = [service_groups]
196 for group in service_groups:
197 self.update_group(service_group=group, fields=fields, add=add)
199 def update_group(self, *, service_group: ServiceGroup,
200 fields: Union[Set[str], List[str]] = None,
201 add: bool = True):
202 """
203 Modifies the information for a service group configuration, identified
204 by the resource and apikey query parameters. Takes a service group body
205 as the payload. The body does not have to be complete: for incomplete
206 bodies, just the existing attributes will be updated
208 Args:
209 service_group (ServiceGroup): Service to update.
210 fields: Fields of the service_group to update. If 'None' all allowed
211 fields will be updated
212 add:
213 Returns:
214 None
215 """
216 if fields:
217 if isinstance(fields, list):
218 fields = set(fields)
219 else:
220 fields = None
221 url = urljoin(self.base_url, 'iot/services')
222 headers = self.headers
223 params = service_group.model_dump(include={'resource', 'apikey'})
224 try:
225 res = self.put(url=url,
226 headers=headers,
227 params=params,
228 json=service_group.model_dump(
229 include=fields,
230 exclude={'service', 'subservice'},
231 exclude_none=True))
232 if res.ok:
233 self.logger.info("ServiceGroup updated!")
234 elif (res.status_code == 404) & (add is True):
235 self.post_group(service_group=service_group)
236 else:
237 res.raise_for_status()
238 except requests.RequestException as err:
239 self.log_error(err=err, msg=None)
240 raise
242 def delete_group(self, *, resource: str, apikey: str):
243 """
244 Deletes a service group in in the IoT-Agent
246 Args:
247 resource:
248 apikey:
250 Returns:
252 """
253 url = urljoin(self.base_url, 'iot/services')
254 headers = self.headers
255 params = {'resource': resource,
256 'apikey': apikey}
257 try:
258 res = self.delete(url=url, headers=headers, params=params)
259 if res.ok:
260 self.logger.info("ServiceGroup with resource: '%s' and "
261 "apikey: '%s' successfully deleted!",
262 resource, apikey)
263 else:
264 res.raise_for_status()
265 except requests.RequestException as err:
266 msg = f"Could not delete ServiceGroup with resource " \
267 f"'{resource}' and apikey '{apikey}'!"
268 self.log_error(err=err, msg=msg)
269 raise
271 # DEVICE API
272 def post_devices(self, *, devices: Union[Device, List[Device]],
273 update: bool = False) -> None:
274 """
275 Post a device from the device registry. No payload is required
276 or received.
277 If a device already exists in can be updated with update = True
278 Args:
279 devices (list of Devices):
280 update (bool): Whether if the device is already existent it
281 should be updated
282 Returns:
283 None
284 """
285 if not isinstance(devices, list):
286 devices = [devices]
287 url = urljoin(self.base_url, 'iot/devices')
288 headers = self.headers
290 data = {"devices": [json.loads(device.model_dump_json(exclude_none=True)
291 ) for device in devices]}
292 try:
293 res = self.post(url=url, headers=headers, json=data)
294 if res.ok:
295 self.logger.info("Devices successfully posted!")
296 else:
297 res.raise_for_status()
298 except requests.RequestException as err:
299 if update:
300 return self.update_devices(devices=devices, add=False)
301 msg = "Could not post devices"
302 self.log_error(err=err, msg=msg)
303 raise
305 def post_device(self, *, device: Device, update: bool = False) -> None:
306 """
307 Post a device configuration to the IoT-Agent
309 Args:
310 device: IoT device configuration to send
311 update: update device if configuration already exists
313 Returns:
314 None
315 """
316 return self.post_devices(devices=[device], update=update)
318 def get_device_list(self, *,
319 limit: int = None,
320 offset: int = None,
321 device_ids: Union[str, List[str]] = None,
322 entity_names: Union[str, List[str]] = None,
323 entity_types: Union[str, List[str]] = None) -> List[Device]:
324 """
325 Returns a list of all the devices in the device registry with all
326 its data. The IoTAgent now only supports "limit" and "offset" as
327 request parameters.
329 Args:
330 limit:
331 if present, limits the number of devices returned in the
332 list. Must be a number between 1 and 1000.
333 offset:
334 if present, skip that number of devices from the original
335 query.
336 device_ids:
337 List of device_ids. If given, only devices with matching ids
338 will be returned
339 entity_names:
340 The entity_ids of the devices. If given, only the devices
341 with the specified entity_id will be returned
342 entity_types:
343 The entity_type of the device. If given, only the devices
344 with the specified entity_type will be returned
346 Returns:
347 List of matching devices
348 """
349 if limit:
350 if not 1 < limit < 1000:
351 self.logger.error("'limit' must be an integer between 1 and "
352 "1000!")
353 raise ValueError
354 url = urljoin(self.base_url, 'iot/devices')
355 headers = self.headers
356 params = {key: value for key, value in locals().items() if value is not
357 None}
358 try:
359 res = self.get(url=url, headers=headers, params=params)
360 if res.ok:
361 ta = TypeAdapter(List[Device])
362 devices = ta.validate_python(res.json()['devices'])
363 # filter by device_ids, entity_names or entity_types
364 devices = filter_device_list(devices,
365 device_ids,
366 entity_names,
367 entity_types)
368 return devices
369 res.raise_for_status()
370 except requests.RequestException as err:
371 self.log_error(err=err, msg=None)
372 raise
374 def get_device(self, *, device_id: str) -> Device:
375 """
376 Returns all the information about a particular device.
378 Args:
379 device_id:
380 Raises:
381 requests.RequestException, if device does not exist
382 Returns:
383 Device
385 """
386 url = urljoin(self.base_url, f'iot/devices/{device_id}')
387 headers = self.headers
388 try:
389 res = self.get(url=url, headers=headers)
390 if res.ok:
391 return Device.model_validate(res.json())
392 res.raise_for_status()
393 except requests.RequestException as err:
394 msg = f"Device {device_id} was not found"
395 self.log_error(err=err, msg=msg)
396 raise
398 def update_device(self, *, device: Device, add: bool = True) -> None:
399 """
400 Updates a device from the device registry.
401 Adds, removes attributes from the device entry and changes
402 attributes values.
403 It does not change device settings (endpoint,..) and only adds
404 attributes to the corresponding entity, their it does not
405 change any attribute value and does not delete removed attributes
407 Args:
408 device:
409 add (bool): If device not found add it
410 Returns:
411 None
412 """
413 url = urljoin(self.base_url, f'iot/devices/{device.device_id}')
414 headers = self.headers
415 try:
416 res = self.put(url=url, headers=headers, json=device.model_dump(
417 include={'attributes', 'lazy', 'commands', 'static_attributes'},
418 exclude_none=True))
419 if res.ok:
420 self.logger.info("Device '%s' successfully updated!",
421 device.device_id)
422 elif (res.status_code == 404) & (add is True):
423 self.post_device(device=device, update=False)
424 else:
425 res.raise_for_status()
426 except requests.RequestException as err:
427 msg = f"Could not update device '{device.device_id}'"
428 self.log_error(err=err, msg=msg)
429 raise
431 def update_devices(self, *, devices: Union[Device, List[Device]],
432 add: False) -> None:
433 """
434 Bulk operation for device update.
435 Args:
436 devices:
437 add:
439 Returns:
441 """
442 if not isinstance(devices, list):
443 devices = [devices]
444 for device in devices:
445 self.update_device(device=device, add=add)
447 def delete_device(self, *, device_id: str,
448 cb_url: AnyHttpUrl = settings.CB_URL,
449 delete_entity: bool = False,
450 force_entity_deletion: bool = False,
451 cb_client: ContextBrokerClient = None,
452 ) -> None:
453 """
454 Remove a device from the device registry. No payload is required
455 or received.
457 Args:
458 device_id: str, ID of Device
459 delete_entity: False -> Only delete the device entry,
460 the automatically created and linked
461 context-entity will continue to
462 exist in Fiware
463 True -> Also delete the automatically
464 created and linked context-entity
465 If multiple devices are linked to this
466 entity, this operation is not executed and
467 an exception is raised
468 force_entity_deletion:
469 bool, if delete_entity is true and multiple devices are linked
470 to the linked entity, delete it and do not raise an error
471 cb_client (ContextBrokerClient):
472 Corresponding ContextBrokerClient object for entity manipulation
473 cb_url (AnyHttpUrl):
474 Url of the ContextBroker where the entity is found.
475 This will autogenerate an CB-Client, mirroring the information
476 of the IoTA-Client, e.g. FiwareHeader, and other headers
477 (not recommended!)
479 Returns:
480 None
481 """
482 url = urljoin(self.base_url, f'iot/devices/{device_id}', )
483 headers = self.headers
485 device = self.get_device(device_id=device_id)
487 try:
488 res = self.delete(url=url, headers=headers)
489 if res.ok:
490 self.logger.info("Device '%s' successfully deleted!", device_id)
491 else:
492 res.raise_for_status()
493 except requests.RequestException as err:
494 msg = f"Could not delete device {device_id}!"
495 self.log_error(err=err, msg=msg)
496 raise
498 if delete_entity:
499 # An entity can technically belong to multiple devices
500 # Only delete the entity if
501 devices = self.get_device_list(entity_names=[device.entity_name])
503 # Zero because we count the remaining devices
504 if len(devices) > 0 and not force_entity_deletion:
505 raise Exception(f"The corresponding entity to the device "
506 f"{device_id} was not deleted because it is "
507 f"linked to multiple devices. ")
508 else:
509 try:
510 from filip.clients.ngsi_v2 import ContextBrokerClient
512 if cb_client:
513 cb_client_local = deepcopy(cb_client)
514 else:
515 warnings.warn("No `ContextBrokerClient` "
516 "object providesd! Will try to generate "
517 "one. This usage is not recommended.")
519 cb_client_local = ContextBrokerClient(
520 url=cb_url,
521 fiware_header=self.fiware_headers,
522 headers=headers)
524 cb_client_local.delete_entity(
525 entity_id=device.entity_name,
526 entity_type=device.entity_type)
528 except requests.RequestException as err:
529 # Do not throw an error
530 # It is only important that the entity does not exist after
531 # this methode, not if this methode actively deleted it
532 pass
534 cb_client_local.close()
536 def patch_device(self,
537 device: Device,
538 patch_entity: bool = True,
539 cb_client: ContextBrokerClient = None,
540 cb_url: AnyHttpUrl = settings.CB_URL) -> None:
541 """
542 Updates a device state in Fiware, if the device does not exists it
543 is created, else its values are updated.
544 If the device settings (endpoint,..) were changed the device and
545 entity are deleted and re-added.
547 If patch_entity is true the corresponding entity in the ContextBroker is
548 also correctly updated. Else only new attributes are added there.
550 Args:
551 device (Device): Device to be posted to /updated in Fiware
552 patch_entity (bool): If true the corresponding entity is
553 completely synced
554 cb_client (ContextBrokerClient):
555 Corresponding ContextBrokerClient object for entity manipulation
556 cb_url (AnyHttpUrl):
557 Url of the ContextBroker where the entity is found.
558 This will autogenerate an CB-Client, mirroring the information
559 of the IoTA-Client, e.g. FiwareHeader, and other headers
560 (not recommended!)
562 Returns:
563 None
564 """
565 try:
566 live_device = self.get_device(device_id=device.device_id)
567 except requests.RequestException:
568 # device does not exist yet, post it
569 self.post_device(device=device)
570 return
572 # if the device settings were changed we need to delete the device
573 # and repost it
574 settings_dict = {"device_id", "service", "service_path",
575 "entity_name", "entity_type",
576 "timestamp", "apikey", "endpoint",
577 "protocol", "transport",
578 "expressionLanguage"}
580 live_settings = live_device.model_dump(include=settings_dict)
581 new_settings = device.model_dump(include=settings_dict)
583 if not live_settings == new_settings:
584 self.delete_device(device_id=device.device_id,
585 delete_entity=True,
586 force_entity_deletion=True,
587 cb_client=cb_client)
588 self.post_device(device=device)
589 return
591 # We are at a state where the device exists, but only attributes were
592 # changed.
593 # we need to update the device, and the context entry separately,
594 # as update device only takes over a part of the changes to the
595 # ContextBroker.
597 # update device
598 self.update_device(device=device)
600 # update context entry
601 # 1. build context entity from information in device
602 # 2. patch it
603 from filip.models.ngsi_v2.context import \
604 ContextEntity, NamedContextAttribute
606 def build_context_entity_from_device(device: Device) -> ContextEntity:
607 from filip.models.base import DataType
608 entity = ContextEntity(id=device.entity_name,
609 type=device.entity_type)
611 for command in device.commands:
612 entity.add_attributes([
613 # Command attribute will be registered by the device_update
614 NamedContextAttribute(
615 name=f"{command.name}_info",
616 type=DataType.COMMAND_RESULT
617 ),
618 NamedContextAttribute(
619 name=f"{command.name}_status",
620 type=DataType.COMMAND_STATUS
621 )
622 ])
623 for attribute in device.attributes:
624 entity.add_attributes([
625 NamedContextAttribute(
626 name=attribute.name,
627 type=DataType.STRUCTUREDVALUE,
628 metadata=attribute.metadata
629 )
630 ])
631 for static_attribute in device.static_attributes:
632 entity.add_attributes([
633 NamedContextAttribute(
634 name=static_attribute.name,
635 type=static_attribute.type,
636 value=static_attribute.value,
637 metadata=static_attribute.metadata
638 )
639 ])
640 return entity
642 if patch_entity:
643 from filip.clients.ngsi_v2 import ContextBrokerClient
644 if cb_client:
645 cb_client_local = deepcopy(cb_client)
646 else:
647 warnings.warn("No `ContextBrokerClient` object provided! "
648 "Will try to generate one. "
649 "This usage is not recommended.")
651 cb_client_local = ContextBrokerClient(
652 url=cb_url,
653 fiware_header=self.fiware_headers,
654 headers=self.headers)
656 cb_client_local.patch_entity(
657 entity=build_context_entity_from_device(device))
658 cb_client_local.close()
660 def does_device_exists(self, device_id: str) -> bool:
661 """
662 Test if a device with the given id exists in Fiware
663 Args:
664 device_id (str)
665 Returns:
666 bool
667 """
668 try:
669 self.get_device(device_id=device_id)
670 return True
671 except requests.RequestException as err:
672 if not err.response.status_code == 404:
673 raise
674 return False
676 # LOG API
677 def get_loglevel_of_agent(self):
678 """
679 Get current loglevel of agent
680 Returns:
682 """
683 url = urljoin(self.base_url, 'admin/log')
684 headers = self.headers.copy()
685 del headers['fiware-service']
686 del headers['fiware-servicepath']
687 try:
688 res = self.get(url=url, headers=headers)
689 if res.ok:
690 return res.json()['level']
691 res.raise_for_status()
692 except requests.RequestException as err:
693 self.log_error(err=err)
694 raise
696 def change_loglevel_of_agent(self, level: str):
697 """
698 Change current loglevel of agent
700 Args:
701 level:
703 Returns:
705 """
706 level = level.upper()
707 if level not in ['INFO', 'ERROR', 'FATAL', 'DEBUG', 'WARNING']:
708 raise KeyError("Given log level is not supported")
710 url = urljoin(self.base_url, 'admin/log')
711 headers = self.headers.copy()
712 del headers['fiware-service']
713 del headers['fiware-servicepath']
714 try:
715 res = self.put(url=url, headers=headers, params=level)
716 if res.ok:
717 self.logger.info("Loglevel of agent at %s "
718 "changed to '%s'", self.base_url, level)
719 else:
720 res.raise_for_status()
721 except requests.RequestException as err:
722 self.log_error(err=err)
723 raise