Coverage for filip/clients/ngsi_v2/cb.py: 81%
724 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-17 14:42 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-17 14:42 +0000
1"""
2Context Broker Module for API Client
3"""
5from __future__ import annotations
7import copy
8from copy import deepcopy
9from enum import Enum
10from math import inf
11from pkg_resources import parse_version
12from pydantic import PositiveInt, PositiveFloat, AnyHttpUrl, ValidationError
13from pydantic.type_adapter import TypeAdapter
14from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union
15import re
16import requests
17from urllib.parse import urljoin
18import warnings
19from filip.clients.base_http_client import BaseHttpClient, NgsiURLVersion
20from filip.config import settings
21from filip.models.base import FiwareHeader, PaginationMethod, DataType
22from filip.utils.simple_ql import QueryString
23from filip.models.ngsi_v2.context import (
24 ActionType,
25 Command,
26 ContextEntity,
27 ContextEntityKeyValues,
28 ContextAttribute,
29 NamedCommand,
30 NamedContextAttribute,
31 Query,
32 Update,
33 PropertyFormat,
34 ContextEntityList,
35 ContextEntityKeyValuesList,
36 ContextEntityValidationList,
37 ContextEntityKeyValuesValidationList,
38)
39from filip.models.ngsi_v2.base import AttrsFormat
40from filip.models.ngsi_v2.subscriptions import Subscription, Message
41from filip.models.ngsi_v2.registrations import Registration
43if TYPE_CHECKING:
44 from filip.clients.ngsi_v2.iota import IoTAClient
47class ContextBrokerClient(BaseHttpClient):
48 """
49 Implementation of NGSI Context Broker functionalities, such as creating
50 entities and subscriptions; retrieving, updating and deleting data.
51 Further documentation:
52 https://fiware-orion.readthedocs.io/en/master/
54 Api specifications for v2 are located here:
55 https://telefonicaid.github.io/fiware-orion/api/v2/stable/
57 Note:
58 We use the reference implementation for development. Therefore, some
59 other brokers may show slightly different behavior!
60 """
62 def __init__(
63 self,
64 url: str = None,
65 *,
66 session: requests.Session = None,
67 fiware_header: FiwareHeader = None,
68 **kwargs,
69 ):
70 """
72 Args:
73 url: Url of context broker server
74 session (requests.Session):
75 fiware_header (FiwareHeader): fiware service and fiware service path
76 **kwargs (Optional): Optional arguments that ``request`` takes.
77 """
78 # set service url
79 url = url or settings.CB_URL
80 self._url_version = NgsiURLVersion.v2_url.value
81 super().__init__(
82 url=url, session=session, fiware_header=fiware_header, **kwargs
83 )
85 def __pagination(
86 self,
87 *,
88 method: PaginationMethod = PaginationMethod.GET,
89 url: str,
90 headers: Dict,
91 limit: Union[PositiveInt, PositiveFloat] = None,
92 params: Dict = None,
93 data: str = None,
94 ) -> List[Dict]:
95 """
96 NGSIv2 implements a pagination mechanism in order to help clients to
97 retrieve large sets of resources. This mechanism works for all listing
98 operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions,
99 POST /v2/op/query, etc.). This function helps getting datasets that are
100 larger than the limit for the different GET operations.
102 https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html
104 Args:
105 url: Information about the url, obtained from the original function
106 headers: The headers from the original function
107 params:
108 limit:
110 Returns:
111 object:
113 """
114 if limit is None:
115 limit = inf
116 if limit > 1000:
117 params["limit"] = 1000 # maximum items per request
118 else:
119 params["limit"] = limit
121 if self.session:
122 session = self.session
123 else:
124 session = requests.Session()
125 with session:
126 res = session.request(
127 method=method, url=url, params=params, headers=headers, data=data
128 )
129 if res.ok:
130 items = res.json()
131 # do pagination
132 count = int(res.headers["Fiware-Total-Count"])
134 while len(items) < limit and len(items) < count:
135 # Establishing the offset from where entities are retrieved
136 params["offset"] = len(items)
137 params["limit"] = min(1000, (limit - len(items)))
138 res = session.request(
139 method=method,
140 url=url,
141 params=params,
142 headers=headers,
143 data=data,
144 )
145 if res.ok:
146 items.extend(res.json())
147 else:
148 res.raise_for_status()
149 self.logger.debug("Received: %s", items)
150 return items
151 res.raise_for_status()
153 # MANAGEMENT API
154 def get_version(self) -> Dict:
155 """
156 Gets version of IoT Agent
157 Returns:
158 Dictionary with response
159 """
160 url = urljoin(self.base_url, "version")
161 try:
162 res = self.get(url=url, headers=self.headers)
163 if res.ok:
164 return res.json()
165 res.raise_for_status()
166 except requests.RequestException as err:
167 self.logger.error(err)
168 raise
170 def get_resources(self) -> Dict:
171 """
172 Gets reo
174 Returns:
175 Dict
176 """
177 url = urljoin(self.base_url, self._url_version)
178 try:
179 res = self.get(url=url, headers=self.headers)
180 if res.ok:
181 return res.json()
182 res.raise_for_status()
183 except requests.RequestException as err:
184 self.logger.error(err)
185 raise
187 # STATISTICS API
188 def get_statistics(self) -> Dict:
189 """
190 Gets statistics of context broker
191 Returns:
192 Dictionary with response
193 """
194 url = urljoin(self.base_url, "statistics")
195 try:
196 res = self.get(url=url, headers=self.headers)
197 if res.ok:
198 return res.json()
199 res.raise_for_status()
200 except requests.RequestException as err:
201 self.logger.error(err)
202 raise
204 # CONTEXT MANAGEMENT API ENDPOINTS
205 # Entity Operations
206 def post_entity(
207 self,
208 entity: Union[ContextEntity, ContextEntityKeyValues],
209 update: bool = False,
210 patch: bool = False,
211 override_attr_metadata: bool = True,
212 key_values: bool = False,
213 ):
214 """
215 Function registers an Object with the NGSI Context Broker,
216 if it already exists it can be automatically updated (overwritten)
217 if the update bool is True.
218 First a post request with the entity is tried, if the response code
219 is 422 the entity is uncrossable, as it already exists there are two
220 options, either overwrite it, if the attribute have changed
221 (e.g. at least one new/new values) (update = True) or leave
222 it the way it is (update=False)
223 If you only want to manipulate the entities values, you need to set
224 patch argument.
226 Args:
227 entity (ContextEntity/ContextEntityKeyValues):
228 Context Entity Object
229 update (bool):
230 If the response.status_code is 422, whether the override and
231 existing entity
232 patch (bool):
233 If the response.status_code is 422, whether to manipulate the
234 existing entity. Omitted if update `True`.
235 override_attr_metadata:
236 Only applies for patch equal to `True`.
237 Whether to override or append the attribute's metadata.
238 `True` for overwrite or `False` for update/append
239 key_values(bool):
240 By default False. If set to True, "options=keyValues" will
241 be included in params of post request. The payload uses
242 the keyValues simplified entity representation, i.e.
243 ContextEntityKeyValues.
244 """
245 url = urljoin(self.base_url, f"{self._url_version}/entities")
246 headers = self.headers.copy()
247 params = {}
248 options = []
249 if key_values:
250 assert isinstance(entity, ContextEntityKeyValues)
251 options.append("keyValues")
252 else:
253 assert isinstance(entity, ContextEntity)
254 if options:
255 params.update({"options": ",".join(options)})
256 try:
257 res = self.post(
258 url=url,
259 headers=headers,
260 json=entity.model_dump(exclude_none=True),
261 params=params,
262 )
263 if res.ok:
264 self.logger.info("Entity successfully posted!")
265 return res.headers.get("Location")
266 res.raise_for_status()
267 except requests.RequestException as err:
268 if err.response is not None:
269 if update and err.response.status_code == 422:
270 return self.override_entity(entity=entity, key_values=key_values)
271 if patch and err.response.status_code == 422:
272 if not key_values:
273 return self.patch_entity(
274 entity=entity, override_attr_metadata=override_attr_metadata
275 )
276 else:
277 return self._patch_entity_key_values(entity=entity)
278 msg = f"Could not post entity {entity.id}"
279 self.log_error(err=err, msg=msg)
280 raise
282 def get_entity_list(
283 self,
284 *,
285 entity_ids: List[str] = None,
286 entity_types: List[str] = None,
287 id_pattern: str = None,
288 type_pattern: str = None,
289 q: Union[str, QueryString] = None,
290 mq: Union[str, QueryString] = None,
291 georel: str = None,
292 geometry: str = None,
293 coords: str = None,
294 limit: PositiveInt = inf,
295 attrs: List[str] = None,
296 metadata: str = None,
297 order_by: str = None,
298 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
299 include_invalid: bool = False,
300 ) -> Union[
301 List[Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]],
302 ContextEntityValidationList,
303 ContextEntityKeyValuesValidationList,
304 ]:
305 r"""
306 Retrieves a list of context entities that match different criteria by
307 id, type, pattern matching (either id or type) and/or those which
308 match a query or geographical query (see Simple Query Language and
309 Geographical Queries). A given entity has to match all the criteria
310 to be retrieved (i.e., the criteria is combined in a logical AND
311 way). Note that pattern matching query parameters are incompatible
312 (i.e. mutually exclusive) with their corresponding exact matching
313 parameters, i.e. idPattern with id and typePattern with type.
315 Args:
316 entity_ids: A comma-separated list of elements. Retrieve entities
317 whose ID matches one of the elements in the list.
318 Incompatible with idPattern,e.g. Boe_Idarium
319 entity_types: comma-separated list of elements. Retrieve entities
320 whose type matches one of the elements in the list.
321 Incompatible with typePattern. Example: Room.
322 id_pattern: A correctly formatted regular expression. Retrieve
323 entities whose ID matches the regular expression. Incompatible
324 with id, e.g. ngsi-ld.* or sensor.*
325 type_pattern: A correctly formatted regular expression. Retrieve
326 entities whose type matches the regular expression.
327 Incompatible with type, e.g. room.*
328 q (SimpleQuery): A query expression, composed of a list of
329 statements separated by ;, i.e.,
330 q=statement1;statement2;statement3. See Simple Query
331 Language specification. Example: temperature>40.
332 mq (SimpleQuery): A query expression for attribute metadata,
333 composed of a list of statements separated by ;, i.e.,
334 mq=statement1;statement2;statement3. See Simple Query
335 Language specification. Example: temperature.accuracy<0.9.
336 georel: Spatial relationship between matching entities and a
337 reference shape. See Geographical Queries. Example: 'near'.
338 geometry: Geographical area to which the query is restricted.
339 See Geographical Queries. Example: point.
340 coords: List of latitude-longitude pairs of coordinates separated
341 by ';'. See Geographical Queries. Example: 41.390205,
342 2.154007;48.8566,2.3522.
343 limit: Limits the number of entities to be retrieved Example: 20
344 attrs: Comma-separated list of attribute names whose data are to
345 be included in the response. The attributes are retrieved in
346 the order specified by this parameter. If this parameter is
347 not included, the attributes are retrieved in arbitrary
348 order. See "Filtering out attributes and metadata" section
349 for more detail. Example: seatNumber.
350 metadata: A list of metadata names to include in the response.
351 See "Filtering out attributes and metadata" section for more
352 detail. Example: accuracy.
353 order_by: Criteria for ordering results. See "Ordering Results"
354 section for details. Example: temperature,!speed.
355 response_format (AttrsFormat, str): Response Format. Note: That if
356 'keyValues' or 'values' are used the response model will
357 change to List[ContextEntityKeyValues] and to List[Dict[str,
358 Any]], respectively.
359 include_invalid: Specify if the returned list should also contain a list of invalid entity IDs or not.
360 Returns:
362 """
363 url = urljoin(self.base_url, f"{self._url_version}/entities/")
364 headers = self.headers.copy()
365 params = {}
367 if entity_ids and id_pattern:
368 raise ValueError
369 if entity_types and type_pattern:
370 raise ValueError
371 if entity_ids:
372 if not isinstance(entity_ids, list):
373 entity_ids = [entity_ids]
374 params.update({"id": ",".join(entity_ids)})
375 if id_pattern:
376 try:
377 re.compile(id_pattern)
378 except re.error as err:
379 raise ValueError(f"Invalid Pattern: {err}") from err
380 params.update({"idPattern": id_pattern})
381 if entity_types:
382 if not isinstance(entity_types, list):
383 entity_types = [entity_types]
384 params.update({"type": ",".join(entity_types)})
385 if type_pattern:
386 try:
387 re.compile(type_pattern)
388 except re.error as err:
389 raise ValueError(f"Invalid Pattern: {err.msg}") from err
390 params.update({"typePattern": type_pattern})
391 if attrs:
392 params.update({"attrs": ",".join(attrs)})
393 if metadata:
394 params.update({"metadata": ",".join(metadata)})
395 if q:
396 if isinstance(q, str):
397 q = QueryString.parse_str(q)
398 params.update({"q": str(q)})
399 if mq:
400 params.update({"mq": str(mq)})
401 if geometry:
402 params.update({"geometry": geometry})
403 if georel:
404 params.update({"georel": georel})
405 if coords:
406 params.update({"coords": coords})
407 if order_by:
408 params.update({"orderBy": order_by})
409 if response_format not in list(AttrsFormat):
410 raise ValueError(f"Value must be in {list(AttrsFormat)}")
411 response_format = ",".join(["count", response_format])
412 params.update({"options": response_format})
413 try:
414 items = self.__pagination(
415 method=PaginationMethod.GET,
416 limit=limit,
417 url=url,
418 params=params,
419 headers=headers,
420 )
421 if include_invalid:
422 valid_entities = []
423 invalid_entities = []
425 if AttrsFormat.NORMALIZED in response_format:
426 adapter = TypeAdapter(ContextEntity)
428 for entity in items:
429 try:
430 valid_entity = adapter.validate_python(entity)
431 valid_entities.append(valid_entity)
432 except ValidationError:
433 invalid_entities.append(entity.get("id"))
435 return ContextEntityValidationList.model_validate(
436 {
437 "entities": valid_entities,
438 "invalid_entities": invalid_entities,
439 }
440 )
441 elif AttrsFormat.KEY_VALUES in response_format:
442 adapter = TypeAdapter(ContextEntityKeyValues)
444 for entity in items:
445 try:
446 valid_entity = adapter.validate_python(entity)
447 valid_entities.append(valid_entity)
448 except ValidationError:
449 invalid_entities.append(entity.get("id"))
451 return ContextEntityKeyValuesValidationList.model_validate(
452 {
453 "entities": valid_entities,
454 "invalid_entities": invalid_entities,
455 }
456 )
457 else:
458 return items
459 else:
460 if AttrsFormat.NORMALIZED in response_format:
461 return ContextEntityList.model_validate(
462 {"entities": items}
463 ).entities
464 elif AttrsFormat.KEY_VALUES in response_format:
465 return ContextEntityKeyValuesList.model_validate(
466 {"entities": items}
467 ).entities
468 return items # in case of VALUES as response_format
470 except requests.RequestException as err:
471 msg = "Could not load entities"
472 self.log_error(err=err, msg=msg)
473 raise
475 def get_entity(
476 self,
477 entity_id: str,
478 entity_type: str = None,
479 attrs: List[str] = None,
480 metadata: List[str] = None,
481 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
482 ) -> Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]:
483 """
484 This operation must return one entity element only, but there may be
485 more than one entity with the same ID (e.g. entities with same ID but
486 different types). In such case, an error message is returned, with
487 the HTTP status code set to 409 Conflict.
489 Args:
490 entity_id (String): Id of the entity to be retrieved
491 entity_type (String): Entity type, to avoid ambiguity in case
492 there are several entities with the same entity id.
493 attrs (List of Strings): List of attribute names whose data must be
494 included in the response. The attributes are retrieved in the
495 order specified by this parameter.
496 See "Filtering out attributes and metadata" section for more
497 detail. If this parameter is not included, the attributes are
498 retrieved in arbitrary order, and all the attributes of the
499 entity are included in the response.
500 Example: temperature, humidity.
501 metadata (List of Strings): A list of metadata names to include in
502 the response. See "Filtering out attributes and metadata"
503 section for more detail. Example: accuracy.
504 response_format (AttrsFormat, str): Representation format of
505 response
506 Returns:
507 ContextEntity
508 """
509 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}")
510 headers = self.headers.copy()
511 params = {}
512 if entity_type:
513 params.update({"type": entity_type})
514 if attrs:
515 params.update({"attrs": ",".join(attrs)})
516 if metadata:
517 params.update({"metadata": ",".join(metadata)})
518 if response_format not in list(AttrsFormat):
519 raise ValueError(f"Value must be in {list(AttrsFormat)}")
520 params.update({"options": response_format})
522 try:
523 res = self.get(url=url, params=params, headers=headers)
524 if res.ok:
525 self.logger.info("Entity successfully retrieved!")
526 self.logger.debug("Received: %s", res.json())
527 if response_format == AttrsFormat.NORMALIZED:
528 return ContextEntity(**res.json())
529 if response_format == AttrsFormat.KEY_VALUES:
530 return ContextEntityKeyValues(**res.json())
531 return res.json()
532 res.raise_for_status()
533 except requests.RequestException as err:
534 msg = f"Could not load entity {entity_id}"
535 self.log_error(err=err, msg=msg)
536 raise
538 def get_entity_attributes(
539 self,
540 entity_id: str,
541 entity_type: str = None,
542 attrs: List[str] = None,
543 metadata: List[str] = None,
544 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
545 ) -> Dict[str, ContextAttribute]:
546 """
547 This request is similar to retrieving the whole entity, however this
548 one omits the id and type fields. Just like the general request of
549 getting an entire entity, this operation must return only one entity
550 element. If more than one entity with the same ID is found (e.g.
551 entities with same ID but different type), an error message is
552 returned, with the HTTP status code set to 409 Conflict.
554 Args:
555 entity_id (String): Id of the entity to be retrieved
556 entity_type (String): Entity type, to avoid ambiguity in case
557 there are several entities with the same entity id.
558 attrs (List of Strings): List of attribute names whose data must be
559 included in the response. The attributes are retrieved in the
560 order specified by this parameter.
561 See "Filtering out attributes and metadata" section for more
562 detail. If this parameter is not included, the attributes are
563 retrieved in arbitrary order, and all the attributes of the
564 entity are included in the response. Example: temperature,
565 humidity.
566 metadata (List of Strings): A list of metadata names to include in
567 the response. See "Filtering out attributes and metadata"
568 section for more detail. Example: accuracy.
569 response_format (AttrsFormat, str): Representation format of
570 response
571 Returns:
572 Dict
573 """
574 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
575 headers = self.headers.copy()
576 params = {}
577 if entity_type:
578 params.update({"type": entity_type})
579 if attrs:
580 params.update({"attrs": ",".join(attrs)})
581 if metadata:
582 params.update({"metadata": ",".join(metadata)})
583 if response_format not in list(AttrsFormat):
584 raise ValueError(f"Value must be in {list(AttrsFormat)}")
585 params.update({"options": response_format})
586 try:
587 res = self.get(url=url, params=params, headers=headers)
588 if res.ok:
589 if response_format == AttrsFormat.NORMALIZED:
590 return {
591 key: ContextAttribute(**values)
592 for key, values in res.json().items()
593 }
594 return res.json()
595 res.raise_for_status()
596 except requests.RequestException as err:
597 msg = f"Could not load attributes from entity {entity_id} !"
598 self.log_error(err=err, msg=msg)
599 raise
601 def update_entity(
602 self,
603 entity: Union[ContextEntity, ContextEntityKeyValues, dict],
604 append_strict: bool = False,
605 key_values: bool = False,
606 ):
607 """
608 The request payload is an object representing the attributes to
609 append or update.
611 Note:
612 Update means overwriting the existing entity. If you want to
613 manipulate you should rather use patch_entity.
615 Args:
616 entity (ContextEntity):
617 append_strict: If `False` the entity attributes are updated (if they
618 previously exist) or appended (if they don't previously exist)
619 with the ones in the payload.
620 If `True` all the attributes in the payload not
621 previously existing in the entity are appended. In addition
622 to that, in case some of the attributes in the payload
623 already exist in the entity, an error is returned.
624 More precisely this means a strict append procedure.
625 key_values: By default False. If set to True, the payload uses
626 the keyValues simplified entity representation, i.e.
627 ContextEntityKeyValues.
628 Returns:
629 None
630 """
631 if key_values:
632 if isinstance(entity, dict):
633 entity = copy.deepcopy(entity)
634 _id = entity.pop("id")
635 _type = entity.pop("type")
636 attrs = entity
637 entity = ContextEntityKeyValues(id=_id, type=_type)
638 else:
639 attrs = entity.model_dump(exclude={"id", "type"})
640 else:
641 attrs = entity.get_attributes()
642 self.update_or_append_entity_attributes(
643 entity_id=entity.id,
644 entity_type=entity.type,
645 attrs=attrs,
646 append_strict=append_strict,
647 key_values=key_values,
648 )
650 def update_entity_properties(
651 self, entity: ContextEntity, append_strict: bool = False
652 ):
653 """
654 The request payload is an object representing the attributes, of any type
655 but Relationship, to append or update.
657 Note:
658 Update means overwriting the existing entity. If you want to
659 manipulate you should rather use patch_entity.
661 Args:
662 entity (ContextEntity):
663 append_strict: If `False` the entity attributes are updated (if they
664 previously exist) or appended (if they don't previously exist)
665 with the ones in the payload.
666 If `True` all the attributes in the payload not
667 previously existing in the entity are appended. In addition
668 to that, in case some of the attributes in the payload
669 already exist in the entity, an error is returned.
670 More precisely this means a strict append procedure.
672 Returns:
673 None
674 """
675 self.update_or_append_entity_attributes(
676 entity_id=entity.id,
677 entity_type=entity.type,
678 attrs=entity.get_properties(),
679 append_strict=append_strict,
680 )
682 def update_entity_relationships(
683 self, entity: ContextEntity, append_strict: bool = False
684 ):
685 """
686 The request payload is an object representing only the attributes, of type
687 Relationship, to append or update.
689 Note:
690 Update means overwriting the existing entity. If you want to
691 manipulate you should rather use patch_entity.
693 Args:
694 entity (ContextEntity):
695 append_strict: If `False` the entity attributes are updated (if they
696 previously exist) or appended (if they don't previously exist)
697 with the ones in the payload.
698 If `True` all the attributes in the payload not
699 previously existing in the entity are appended. In addition
700 to that, in case some of the attributes in the payload
701 already exist in the entity, an error is returned.
702 More precisely this means a strict append procedure.
704 Returns:
705 None
706 """
707 self.update_or_append_entity_attributes(
708 entity_id=entity.id,
709 entity_type=entity.type,
710 attrs=entity.get_relationships(),
711 append_strict=append_strict,
712 )
714 def delete_entity(
715 self,
716 entity_id: str,
717 entity_type: str = None,
718 delete_devices: bool = False,
719 iota_client: IoTAClient = None,
720 iota_url: AnyHttpUrl = settings.IOTA_URL,
721 ) -> None:
722 """
723 Remove a entity from the context broker. No payload is required
724 or received.
726 Args:
727 entity_id:
728 Id of the entity to be deleted
729 entity_type:
730 Entity type, to avoid ambiguity in case there are several
731 entities with the same entity id.
732 delete_devices:
733 If True, also delete all devices that reference this
734 entity (entity_id as entity_name)
735 iota_client:
736 Corresponding IoTA-Client used to access IoTA-Agent
737 iota_url:
738 URL of the corresponding IoT-Agent. This will autogenerate
739 an IoTA-Client, mirroring the information of the
740 ContextBrokerClient, e.g. FiwareHeader, and other headers
742 Returns:
743 None
744 """
745 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}")
746 headers = self.headers.copy()
747 if entity_type:
748 params = {"type": entity_type}
749 else:
750 params = None
751 try:
752 res = self.delete(url=url, params=params, headers=headers)
753 if res.ok:
754 self.logger.info("Entity '%s' successfully deleted!", entity_id)
755 else:
756 res.raise_for_status()
757 except requests.RequestException as err:
758 msg = f"Could not delete entity {entity_id} !"
759 self.log_error(err=err, msg=msg)
760 raise
762 if delete_devices:
763 from filip.clients.ngsi_v2 import IoTAClient
765 if iota_client:
766 iota_client_local = deepcopy(iota_client)
767 else:
768 warnings.warn(
769 "No IoTA-Client object provided! "
770 "Will try to generate one. "
771 "This usage is not recommended."
772 )
774 iota_client_local = IoTAClient(
775 url=iota_url,
776 fiware_header=self.fiware_headers,
777 headers=self.headers,
778 )
780 for device in iota_client_local.get_device_list(entity_names=[entity_id]):
781 if entity_type:
782 if device.entity_type == entity_type:
783 iota_client_local.delete_device(device_id=device.device_id)
784 else:
785 iota_client_local.delete_device(device_id=device.device_id)
786 iota_client_local.close()
788 def delete_entities(self, entities: List[ContextEntity]) -> None:
789 """
790 Remove a list of entities from the context broker. This methode is
791 more efficient than to call delete_entity() for each entity
793 Args:
794 entities: List[ContextEntity]: List of entities to be deleted
796 Raises:
797 Exception, if one of the entities is not in the ContextBroker
799 Returns:
800 None
801 """
803 # update() delete, deletes all entities without attributes completely,
804 # and removes the attributes for the other
805 # The entities are sorted based on the fact if they have
806 # attributes.
807 entities_with_attributes: List[ContextEntity] = []
808 for entity in entities:
809 attribute_names = [
810 key
811 for key in entity.model_dump()
812 if key not in ContextEntity.model_fields
813 ]
814 if len(attribute_names) > 0:
815 entities_with_attributes.append(
816 ContextEntity(id=entity.id, type=entity.type)
817 )
819 # Post update_delete for those without attribute only once,
820 # for the other post update_delete again but for the changed entity
821 # in the ContextBroker (only id and type left)
822 if len(entities) > 0:
823 self.update(entities=entities, action_type="delete")
824 if len(entities_with_attributes) > 0:
825 self.update(entities=entities_with_attributes, action_type="delete")
827 def update_or_append_entity_attributes(
828 self,
829 entity_id: str,
830 attrs: Union[
831 List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any]
832 ],
833 entity_type: str = None,
834 append_strict: bool = False,
835 forcedUpdate: bool = False,
836 key_values: bool = False,
837 ):
838 """
839 The request payload is an object representing the attributes to
840 append or update. This corresponds to a 'POST' request if append is
841 set to 'False'
843 Note:
844 Be careful not to update attributes that are
845 provided via context registration, e.g. commands. Commands are
846 removed before sending the request. To avoid breaking things.
848 Args:
849 entity_id: Entity id to be updated
850 entity_type: Entity type, to avoid ambiguity in case there are
851 several entities with the same entity id.
852 attrs: List of attributes to update or to append
853 append_strict: If `False` the entity attributes are updated (if they
854 previously exist) or appended (if they don't previously exist)
855 with the ones in the payload.
856 If `True` all the attributes in the payload not
857 previously existing in the entity are appended. In addition
858 to that, in case some of the attributes in the payload
859 already exist in the entity, an error is returned.
860 More precisely this means a strict append procedure.
861 forcedUpdate: Update operation have to trigger any matching
862 subscription, no matter if there is an actual attribute
863 update or no instead of the default behavior, which is to
864 updated only if attribute is effectively updated.
865 key_values: By default False. If set to True, the payload uses
866 the keyValues simplified entity representation, i.e.
867 ContextEntityKeyValues.
868 Returns:
869 None
871 """
872 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
873 headers = self.headers.copy()
874 params = {}
875 if entity_type:
876 params.update({"type": entity_type})
877 else:
878 entity_type = "dummy"
880 options = []
881 if append_strict:
882 options.append("append")
883 if forcedUpdate:
884 options.append("forcedUpdate")
885 if key_values:
886 assert isinstance(attrs, dict), "for keyValues attrs has to be a dict"
887 options.append("keyValues")
888 if options:
889 params.update({"options": ",".join(options)})
891 if key_values:
892 entity = ContextEntityKeyValues(id=entity_id, type=entity_type, **attrs)
893 else:
894 entity = ContextEntity(id=entity_id, type=entity_type)
895 entity.add_attributes(attrs)
896 # exclude commands from the send data,
897 # as they live in the IoTA-agent
898 excluded_keys = {"id", "type"}
899 # excluded_keys.update(
900 # entity.get_commands(response_format=PropertyFormat.DICT).keys()
901 # )
902 try:
903 res = self.post(
904 url=url,
905 headers=headers,
906 json=entity.model_dump(exclude=excluded_keys, exclude_none=True),
907 params=params,
908 )
909 if res.ok:
910 self.logger.info("Entity '%s' successfully " "updated!", entity.id)
911 else:
912 res.raise_for_status()
913 except requests.RequestException as err:
914 msg = f"Could not update or append attributes of entity" f" {entity.id} !"
915 self.log_error(err=err, msg=msg)
916 raise
918 def _patch_entity_key_values(
919 self,
920 entity: Union[ContextEntityKeyValues, dict],
921 ):
922 """
923 The entity are updated with a ContextEntityKeyValues object or a
924 dictionary contain the simplified entity data. This corresponds to a
925 'PATCH' request.
926 Only existing attribute can be updated!
928 Args:
929 entity: A ContextEntityKeyValues object or a dictionary contain
930 the simplified entity data
932 """
933 if isinstance(entity, dict):
934 entity = ContextEntityKeyValues(**entity)
935 url = urljoin(self.base_url, f"v2/entities/{entity.id}/attrs")
936 headers = self.headers.copy()
937 params = {"type": entity.type, "options": AttrsFormat.KEY_VALUES.value}
938 try:
939 res = self.patch(
940 url=url,
941 headers=headers,
942 json=entity.model_dump(exclude={"id", "type"}, exclude_unset=True),
943 params=params,
944 )
945 if res.ok:
946 self.logger.info("Entity '%s' successfully " "updated!", entity.id)
947 else:
948 res.raise_for_status()
949 except requests.RequestException as err:
950 msg = f"Could not update attributes of entity" f" {entity.id} !"
951 self.log_error(err=err, msg=msg)
952 raise
954 def update_existing_entity_attributes(
955 self,
956 entity_id: str,
957 attrs: Union[
958 List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any]
959 ],
960 entity_type: str = None,
961 forcedUpdate: bool = False,
962 override_metadata: bool = False,
963 key_values: bool = False,
964 ):
965 """
966 The entity attributes are updated with the ones in the payload.
967 In addition to that, if one or more attributes in the payload doesn't
968 exist in the entity, an error is returned. This corresponds to a
969 'PATCH' request.
971 Args:
972 entity_id: Entity id to be updated
973 entity_type: Entity type, to avoid ambiguity in case there are
974 several entities with the same entity id.
975 attrs: List of attributes to update or to append
976 forcedUpdate: Update operation have to trigger any matching
977 subscription, no matter if there is an actual attribute
978 update or no instead of the default behavior, which is to
979 updated only if attribute is effectively updated.
980 override_metadata:
981 Bool,replace the existing metadata with the one provided in
982 the request
983 key_values: By default False. If set to True, the payload uses
984 the keyValues simplified entity representation, i.e.
985 ContextEntityKeyValues.
986 Returns:
987 None
989 """
990 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
991 headers = self.headers.copy()
992 if entity_type:
993 params = {"type": entity_type}
994 else:
995 params = None
996 entity_type = "dummy"
998 options = []
999 if override_metadata:
1000 options.append("overrideMetadata")
1001 if forcedUpdate:
1002 options.append("forcedUpdate")
1003 if key_values:
1004 assert isinstance(attrs, dict), "for keyValues the attrs must be dict"
1005 payload = attrs
1006 options.append("keyValues")
1007 else:
1008 entity = ContextEntity(id=entity_id, type=entity_type)
1009 entity.add_attributes(attrs)
1010 payload = entity.model_dump(exclude={"id", "type"}, exclude_none=True)
1011 if options:
1012 params.update({"options": ",".join(options)})
1014 try:
1015 res = self.patch(
1016 url=url,
1017 headers=headers,
1018 json=payload,
1019 params=params,
1020 )
1021 if res.ok:
1022 self.logger.info("Entity '%s' successfully " "updated!", entity_id)
1023 else:
1024 res.raise_for_status()
1025 except requests.RequestException as err:
1026 msg = f"Could not update attributes of entity" f" {entity_id} !"
1027 self.log_error(err=err, msg=msg)
1028 raise
1030 def override_entity(
1031 self, entity: Union[ContextEntity, ContextEntityKeyValues], **kwargs
1032 ):
1033 """
1034 The request payload is an object representing the attributes to
1035 override the existing entity.
1037 Note:
1038 If you want to manipulate you should rather use patch_entity.
1040 Args:
1041 entity (ContextEntity or ContextEntityKeyValues):
1042 Returns:
1043 None
1044 """
1045 return self.replace_entity_attributes(
1046 entity_id=entity.id,
1047 entity_type=entity.type,
1048 attrs=entity.get_attributes(),
1049 **kwargs,
1050 )
1052 def replace_entity_attributes(
1053 self,
1054 entity_id: str,
1055 attrs: Union[
1056 List[Union[NamedContextAttribute, Dict[str, ContextAttribute]]], Dict
1057 ],
1058 entity_type: str = None,
1059 forcedUpdate: bool = False,
1060 key_values: bool = False,
1061 ):
1062 """
1063 The attributes previously existing in the entity are removed and
1064 replaced by the ones in the request. This corresponds to a 'PUT'
1065 request.
1067 Args:
1068 entity_id: Entity id to be updated
1069 entity_type: Entity type, to avoid ambiguity in case there are
1070 several entities with the same entity id.
1071 attrs: List of attributes to add to the entity or dict of
1072 attributes in case of key_values=True.
1073 forcedUpdate: Update operation have to trigger any matching
1074 subscription, no matter if there is an actual attribute
1075 update or no instead of the default behavior, which is to
1076 updated only if attribute is effectively updated.
1077 key_values(bool):
1078 By default False. If set to True, "options=keyValues" will
1079 be included in params of the request. The payload uses
1080 the keyValues simplified entity representation, i.e.
1081 ContextEntityKeyValues.
1082 Returns:
1083 None
1084 """
1085 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
1086 headers = self.headers.copy()
1087 params = {}
1088 options = []
1089 if entity_type:
1090 params.update({"type": entity_type})
1091 else:
1092 entity_type = "dummy"
1094 if forcedUpdate:
1095 options.append("forcedUpdate")
1097 if key_values:
1098 options.append("keyValues")
1099 assert isinstance(attrs, dict)
1100 else:
1101 entity = ContextEntity(id=entity_id, type=entity_type)
1102 entity.add_attributes(attrs)
1103 attrs = entity.model_dump(exclude={"id", "type"}, exclude_none=True)
1104 if options:
1105 params.update({"options": ",".join(options)})
1107 try:
1108 res = self.put(
1109 url=url,
1110 headers=headers,
1111 json=attrs,
1112 params=params,
1113 )
1114 if res.ok:
1115 self.logger.info("Entity '%s' successfully " "updated!", entity_id)
1116 else:
1117 res.raise_for_status()
1118 except requests.RequestException as err:
1119 msg = f"Could not replace attribute of entity {entity_id} !"
1120 self.log_error(err=err, msg=msg)
1121 raise
1123 # Attribute operations
1124 def get_attribute(
1125 self,
1126 entity_id: str,
1127 attr_name: str,
1128 entity_type: str = None,
1129 metadata: str = None,
1130 response_format="",
1131 ) -> ContextAttribute:
1132 """
1133 Retrieves a specified attribute from an entity.
1135 Args:
1136 entity_id: Id of the entity. Example: Bcn_Welt
1137 attr_name: Name of the attribute to be retrieved.
1138 entity_type (Optional): Type of the entity to retrieve
1139 metadata (Optional): A list of metadata names to include in the
1140 response. See "Filtering out attributes and metadata" section
1141 for more detail.
1143 Returns:
1144 The content of the retrieved attribute as ContextAttribute
1146 Raises:
1147 Error
1149 """
1150 url = urljoin(
1151 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
1152 )
1153 headers = self.headers.copy()
1154 params = {}
1155 if entity_type:
1156 params.update({"type": entity_type})
1157 if metadata:
1158 params.update({"metadata": ",".join(metadata)})
1159 try:
1160 res = self.get(url=url, params=params, headers=headers)
1161 if res.ok:
1162 self.logger.debug("Received: %s", res.json())
1163 return ContextAttribute(**res.json())
1164 res.raise_for_status()
1165 except requests.RequestException as err:
1166 msg = (
1167 f"Could not load attribute '{attr_name}' from entity" f"'{entity_id}' "
1168 )
1169 self.log_error(err=err, msg=msg)
1170 raise
1172 def update_entity_attribute(
1173 self,
1174 entity_id: str,
1175 attr: Union[ContextAttribute, NamedContextAttribute],
1176 *,
1177 entity_type: str = None,
1178 attr_name: str = None,
1179 override_metadata: bool = True,
1180 forcedUpdate: bool = False,
1181 ):
1182 """
1183 Updates a specified attribute from an entity.
1185 Args:
1186 attr:
1187 context attribute to update
1188 entity_id:
1189 Id of the entity. Example: Bcn_Welt
1190 entity_type:
1191 Entity type, to avoid ambiguity in case there are
1192 several entities with the same entity id.
1193 forcedUpdate: Update operation have to trigger any matching
1194 subscription, no matter if there is an actual attribute
1195 update or no instead of the default behavior, which is to
1196 updated only if attribute is effectively updated.
1197 attr_name:
1198 Name of the attribute to be updated.
1199 override_metadata:
1200 Bool, if set to `True` (default) the metadata will be
1201 overwritten. This is for backwards compatibility reasons.
1202 If `False` the metadata values will be either updated if
1203 already existing or append if not.
1204 See also:
1205 https://fiware-orion.readthedocs.io/en/master/user/metadata.html
1206 """
1207 headers = self.headers.copy()
1208 if not isinstance(attr, NamedContextAttribute):
1209 assert attr_name is not None, (
1210 "Missing name for attribute. "
1211 "attr_name must be present if"
1212 "attr is of type ContextAttribute"
1213 )
1214 else:
1215 assert attr_name is None, (
1216 "Invalid argument attr_name. Do not set "
1217 "attr_name if attr is of type "
1218 "NamedContextAttribute"
1219 )
1220 attr_name = attr.name
1222 url = urljoin(
1223 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
1224 )
1225 params = {}
1226 if entity_type:
1227 params.update({"type": entity_type})
1228 # set overrideMetadata option (we assure backwards compatibility here)
1229 options = []
1230 if override_metadata:
1231 options.append("overrideMetadata")
1232 if forcedUpdate:
1233 options.append("forcedUpdate")
1234 if options:
1235 params.update({"options": ",".join(options)})
1236 try:
1237 res = self.put(
1238 url=url,
1239 headers=headers,
1240 params=params,
1241 json=attr.model_dump(exclude={"name"}, exclude_none=True),
1242 )
1243 if res.ok:
1244 self.logger.info(
1245 "Attribute '%s' of '%s' " "successfully updated!",
1246 attr_name,
1247 entity_id,
1248 )
1249 else:
1250 res.raise_for_status()
1251 except requests.RequestException as err:
1252 msg = (
1253 f"Could not update attribute '{attr_name}' of entity" f"'{entity_id}' "
1254 )
1255 self.log_error(err=err, msg=msg)
1256 raise
1258 def delete_entity_attribute(
1259 self, entity_id: str, attr_name: str, entity_type: str = None
1260 ) -> None:
1261 """
1262 Removes a specified attribute from an entity.
1264 Args:
1265 entity_id: Id of the entity.
1266 attr_name: Name of the attribute to be retrieved.
1267 entity_type: Entity type, to avoid ambiguity in case there are
1268 several entities with the same entity id.
1269 Raises:
1270 Error
1272 """
1273 url = urljoin(
1274 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
1275 )
1276 headers = self.headers.copy()
1277 params = {}
1278 if entity_type:
1279 params.update({"type": entity_type})
1280 try:
1281 res = self.delete(url=url, headers=headers)
1282 if res.ok:
1283 self.logger.info(
1284 "Attribute '%s' of '%s' " "successfully deleted!",
1285 attr_name,
1286 entity_id,
1287 )
1288 else:
1289 res.raise_for_status()
1290 except requests.RequestException as err:
1291 msg = f"Could not delete attribute '{attr_name}' of entity '{entity_id}'"
1292 self.log_error(err=err, msg=msg)
1293 raise
1295 # Attribute value operations
1296 def get_attribute_value(
1297 self, entity_id: str, attr_name: str, entity_type: str = None
1298 ) -> Any:
1299 """
1300 This operation returns the value property with the value of the
1301 attribute.
1303 Args:
1304 entity_id: Id of the entity. Example: Bcn_Welt
1305 attr_name: Name of the attribute to be retrieved.
1306 Example: temperature.
1307 entity_type: Entity type, to avoid ambiguity in case there are
1308 several entities with the same entity id.
1310 Returns:
1312 """
1313 url = urljoin(
1314 self.base_url,
1315 f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value",
1316 )
1317 headers = self.headers.copy()
1318 params = {}
1319 if entity_type:
1320 params.update({"type": entity_type})
1321 try:
1322 res = self.get(url=url, params=params, headers=headers)
1323 if res.ok:
1324 self.logger.debug("Received: %s", res.json())
1325 return res.json()
1326 res.raise_for_status()
1327 except requests.RequestException as err:
1328 msg = (
1329 f"Could not load value of attribute '{attr_name}' from "
1330 f"entity'{entity_id}' "
1331 )
1332 self.log_error(err=err, msg=msg)
1333 raise
1335 def update_attribute_value(
1336 self,
1337 *,
1338 entity_id: str,
1339 attr_name: str,
1340 value: Any,
1341 entity_type: str = None,
1342 forcedUpdate: bool = False,
1343 ):
1344 """
1345 Updates the value of a specified attribute of an entity
1347 Args:
1348 value: update value
1349 entity_id: Id of the entity. Example: Bcn_Welt
1350 attr_name: Name of the attribute to be retrieved.
1351 Example: temperature.
1352 entity_type: Entity type, to avoid ambiguity in case there are
1353 several entities with the same entity id.
1354 forcedUpdate: Update operation have to trigger any matching
1355 subscription, no matter if there is an actual attribute
1356 update or no instead of the default behavior, which is to
1357 updated only if attribute is effectively updated.
1358 Returns:
1360 """
1361 url = urljoin(
1362 self.base_url,
1363 f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value",
1364 )
1365 headers = self.headers.copy()
1366 params = {}
1367 if entity_type:
1368 params.update({"type": entity_type})
1369 options = []
1370 if forcedUpdate:
1371 options.append("forcedUpdate")
1372 if options:
1373 params.update({"options": ",".join(options)})
1374 try:
1375 if not isinstance(value, (dict, list)):
1376 headers.update({"Content-Type": "text/plain"})
1377 if isinstance(value, str):
1378 value = f"{value}"
1379 res = self.put(url=url, headers=headers, json=value, params=params)
1380 else:
1381 res = self.put(url=url, headers=headers, json=value, params=params)
1382 if res.ok:
1383 self.logger.info(
1384 "Attribute '%s' of '%s' " "successfully updated!",
1385 attr_name,
1386 entity_id,
1387 )
1388 else:
1389 res.raise_for_status()
1390 except requests.RequestException as err:
1391 msg = (
1392 f"Could not update value of attribute '{attr_name}' from "
1393 f"entity '{entity_id}' "
1394 )
1395 self.log_error(err=err, msg=msg)
1396 raise
1398 # Types Operations
1399 def get_entity_types(
1400 self, *, limit: int = None, offset: int = None, options: str = None
1401 ) -> List[Dict[str, Any]]:
1402 """
1404 Args:
1405 limit: Limit the number of types to be retrieved.
1406 offset: Skip a number of records.
1407 options: Options dictionary. Allowed: count, values
1409 Returns:
1411 """
1412 url = urljoin(self.base_url, f"{self._url_version}/types")
1413 headers = self.headers.copy()
1414 params = {}
1415 if limit:
1416 params.update({"limit": limit})
1417 if offset:
1418 params.update({"offset": offset})
1419 if options:
1420 params.update({"options": options})
1421 try:
1422 res = self.get(url=url, params=params, headers=headers)
1423 if res.ok:
1424 self.logger.debug("Received: %s", res.json())
1425 return res.json()
1426 res.raise_for_status()
1427 except requests.RequestException as err:
1428 msg = "Could not load entity types!"
1429 self.log_error(err=err, msg=msg)
1430 raise
1432 def get_entity_type(self, entity_type: str) -> Dict[str, Any]:
1433 """
1435 Args:
1436 entity_type: Entity Type. Example: Room
1438 Returns:
1440 """
1441 url = urljoin(self.base_url, f"{self._url_version}/types/{entity_type}")
1442 headers = self.headers.copy()
1443 params = {}
1444 try:
1445 res = self.get(url=url, params=params, headers=headers)
1446 if res.ok:
1447 self.logger.debug("Received: %s", res.json())
1448 return res.json()
1449 res.raise_for_status()
1450 except requests.RequestException as err:
1451 msg = f"Could not load entities of type" f"'{entity_type}' "
1452 self.log_error(err=err, msg=msg)
1453 raise
1455 # SUBSCRIPTION API ENDPOINTS
1456 def get_subscription_list(self, limit: PositiveInt = inf) -> List[Subscription]:
1457 """
1458 Returns a list of all the subscriptions present in the system.
1459 Args:
1460 limit: Limit the number of subscriptions to be retrieved
1461 Returns:
1462 list of subscriptions
1463 """
1464 url = urljoin(self.base_url, f"{self._url_version}/subscriptions/")
1465 headers = self.headers.copy()
1466 params = {}
1468 # We always use the 'count' option to check weather pagination is
1469 # required
1470 params.update({"options": "count"})
1471 try:
1472 items = self.__pagination(
1473 limit=limit, url=url, params=params, headers=headers
1474 )
1475 adapter = TypeAdapter(List[Subscription])
1476 return adapter.validate_python(items)
1477 except requests.RequestException as err:
1478 msg = "Could not load subscriptions!"
1479 self.log_error(err=err, msg=msg)
1480 raise
1482 def post_subscription(
1483 self,
1484 subscription: Subscription,
1485 update: bool = False,
1486 skip_initial_notification: bool = False,
1487 ) -> str:
1488 """
1489 Creates a new subscription. The subscription is represented by a
1490 Subscription object defined in filip.cb.models.
1492 If the subscription already exists, the adding is prevented and the id
1493 of the existing subscription is returned.
1495 A subscription is deemed as already existing if there exists a
1496 subscription with the exact same subject and notification fields. All
1497 optional fields are not considered.
1499 Args:
1500 subscription: Subscription
1501 update: True - If the subscription already exists, update it
1502 False- If the subscription already exists, throw warning
1503 skip_initial_notification: True - Initial Notifications will be
1504 sent to recipient containing the whole data. This is
1505 deprecated and removed from version 3.0 of the context broker.
1506 False - skip the initial notification
1507 Returns:
1508 str: Id of the (created) subscription
1510 """
1511 existing_subscriptions = self.get_subscription_list()
1513 sub_dict = subscription.model_dump(include={"subject", "notification"})
1514 for ex_sub in existing_subscriptions:
1515 if self._subscription_dicts_are_equal(
1516 sub_dict, ex_sub.model_dump(include={"subject", "notification"})
1517 ):
1518 self.logger.info("Subscription already exists")
1519 if update:
1520 self.logger.info("Updated subscription")
1521 subscription.id = ex_sub.id
1522 self.update_subscription(subscription)
1523 else:
1524 warnings.warn(
1525 f"Subscription existed already with the id" f" {ex_sub.id}"
1526 )
1527 return ex_sub.id
1529 params = {}
1530 if skip_initial_notification:
1531 version = self.get_version()["orion"]["version"]
1532 if parse_version(version) <= parse_version("3.1"):
1533 params.update({"options": "skipInitialNotification"})
1534 else:
1535 pass
1536 warnings.warn(
1537 f"Skip initial notifications is a deprecated "
1538 f"feature of older versions <=3.1 of the context "
1539 f"broker. The Context Broker that you requesting has "
1540 f"version: {version}. For newer versions we "
1541 f"automatically skip this option. Consider "
1542 f"refactoring and updating your services",
1543 DeprecationWarning,
1544 )
1546 url = urljoin(self.base_url, "v2/subscriptions")
1547 headers = self.headers.copy()
1548 headers.update({"Content-Type": "application/json"})
1549 try:
1550 res = self.post(
1551 url=url,
1552 headers=headers,
1553 data=subscription.model_dump_json(exclude={"id"}, exclude_none=True),
1554 params=params,
1555 )
1556 if res.ok:
1557 self.logger.info("Subscription successfully created!")
1558 return res.headers["Location"].split("/")[-1]
1559 res.raise_for_status()
1560 except requests.RequestException as err:
1561 msg = "Could not send subscription!"
1562 self.log_error(err=err, msg=msg)
1563 raise
1565 def get_subscription(self, subscription_id: str) -> Subscription:
1566 """
1567 Retrieves a subscription from
1568 Args:
1569 subscription_id: id of the subscription
1571 Returns:
1573 """
1574 url = urljoin(
1575 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}"
1576 )
1577 headers = self.headers.copy()
1578 try:
1579 res = self.get(url=url, headers=headers)
1580 if res.ok:
1581 self.logger.debug("Received: %s", res.json())
1582 return Subscription(**res.json())
1583 res.raise_for_status()
1584 except requests.RequestException as err:
1585 msg = f"Could not load subscription {subscription_id}"
1586 self.log_error(err=err, msg=msg)
1587 raise
1589 def update_subscription(
1590 self, subscription: Subscription, skip_initial_notification: bool = False
1591 ):
1592 """
1593 Only the fields included in the request are updated in the subscription.
1595 Args:
1596 subscription: Subscription to update
1597 skip_initial_notification: True - Initial Notifications will be
1598 sent to recipient containing the whole data. This is
1599 deprecated and removed from version 3.0 of the context broker.
1600 False - skip the initial notification
1602 Returns:
1603 None
1604 """
1605 params = {}
1606 if skip_initial_notification:
1607 version = self.get_version()["orion"]["version"]
1608 if parse_version(version) <= parse_version("3.1"):
1609 params.update({"options": "skipInitialNotification"})
1610 else:
1611 pass
1612 warnings.warn(
1613 f"Skip initial notifications is a deprecated "
1614 f"feature of older versions <3.1 of the context "
1615 f"broker. The Context Broker that you requesting has "
1616 f"version: {version}. For newer versions we "
1617 f"automatically skip this option. Consider "
1618 f"refactoring and updating your services",
1619 DeprecationWarning,
1620 )
1622 url = urljoin(
1623 self.base_url, f"{self._url_version}/subscriptions/{subscription.id}"
1624 )
1625 headers = self.headers.copy()
1626 headers.update({"Content-Type": "application/json"})
1627 try:
1628 res = self.patch(
1629 url=url,
1630 headers=headers,
1631 data=subscription.model_dump_json(exclude={"id"}, exclude_none=True),
1632 )
1633 if res.ok:
1634 self.logger.info("Subscription successfully updated!")
1635 else:
1636 res.raise_for_status()
1637 except requests.RequestException as err:
1638 msg = f"Could not update subscription {subscription.id}"
1639 self.log_error(err=err, msg=msg)
1640 raise
1642 def delete_subscription(self, subscription_id: str) -> None:
1643 """
1644 Deletes a subscription from a Context Broker
1645 Args:
1646 subscription_id: id of the subscription
1647 """
1648 url = urljoin(
1649 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}"
1650 )
1651 headers = self.headers.copy()
1652 try:
1653 res = self.delete(url=url, headers=headers)
1654 if res.ok:
1655 self.logger.info(
1656 f"Subscription '{subscription_id}' " f"successfully deleted!"
1657 )
1658 else:
1659 res.raise_for_status()
1660 except requests.RequestException as err:
1661 msg = f"Could not delete subscription {subscription_id}"
1662 self.log_error(err=err, msg=msg)
1663 raise
1665 # Registration API
1666 def get_registration_list(self, *, limit: PositiveInt = None) -> List[Registration]:
1667 """
1668 Lists all the context provider registrations present in the system.
1670 Args:
1671 limit: Limit the number of registrations to be retrieved
1672 Returns:
1674 """
1675 url = urljoin(self.base_url, f"{self._url_version}/registrations/")
1676 headers = self.headers.copy()
1677 params = {}
1679 # We always use the 'count' option to check weather pagination is
1680 # required
1681 params.update({"options": "count"})
1682 try:
1683 items = self.__pagination(
1684 limit=limit, url=url, params=params, headers=headers
1685 )
1686 adapter = TypeAdapter(List[Registration])
1687 return adapter.validate_python(items)
1688 except requests.RequestException as err:
1689 msg = "Could not load registrations!"
1690 self.log_error(err=err, msg=msg)
1691 raise
1693 def post_registration(self, registration: Registration):
1694 """
1695 Creates a new context provider registration. This is typically used
1696 for binding context sources as providers of certain data. The
1697 registration is represented by cb.models.Registration
1699 Args:
1700 registration (Registration):
1702 Returns:
1704 """
1705 url = urljoin(self.base_url, f"{self._url_version}/registrations")
1706 headers = self.headers.copy()
1707 headers.update({"Content-Type": "application/json"})
1708 try:
1709 res = self.post(
1710 url=url,
1711 headers=headers,
1712 data=registration.model_dump_json(exclude={"id"}, exclude_none=True),
1713 )
1714 if res.ok:
1715 self.logger.info("Registration successfully created!")
1716 return res.headers["Location"].split("/")[-1]
1717 res.raise_for_status()
1718 except requests.RequestException as err:
1719 msg = f"Could not send registration {registration.id}!"
1720 self.log_error(err=err, msg=msg)
1721 raise
1723 def get_registration(self, registration_id: str) -> Registration:
1724 """
1725 Retrieves a registration from context broker by id
1727 Args:
1728 registration_id: id of the registration
1730 Returns:
1731 Registration
1732 """
1733 url = urljoin(
1734 self.base_url, f"{self._url_version}/registrations/{registration_id}"
1735 )
1736 headers = self.headers.copy()
1737 try:
1738 res = self.get(url=url, headers=headers)
1739 if res.ok:
1740 self.logger.debug("Received: %s", res.json())
1741 return Registration(**res.json())
1742 res.raise_for_status()
1743 except requests.RequestException as err:
1744 msg = f"Could not load registration {registration_id} !"
1745 self.log_error(err=err, msg=msg)
1746 raise
1748 def add_valid_relationships(
1749 self, entities: List[ContextEntity]
1750 ) -> List[ContextEntity]:
1751 """
1752 Validate all attributes in the given entities. If the attribute value points to
1753 an existing entity, it is assumed that this attribute is a relationship, and it
1754 will be assigned with the attribute type "relationship"
1756 Args:
1757 entities: list of entities that need to be validated.
1759 Returns:
1760 updated entities
1761 """
1762 updated_entities = []
1763 for entity in entities[:]:
1764 for attr_name, attr_value in entity.model_dump(
1765 exclude={"id", "type"}
1766 ).items():
1767 if isinstance(attr_value, dict):
1768 if self.validate_relationship(attr_value):
1769 entity.update_attribute(
1770 {
1771 attr_name: ContextAttribute(
1772 **{
1773 "type": DataType.RELATIONSHIP,
1774 "value": attr_value.get("value"),
1775 }
1776 )
1777 }
1778 )
1779 updated_entities.append(entity)
1780 return updated_entities
1782 def remove_invalid_relationships(
1783 self, entities: List[ContextEntity], hard_remove: bool = True
1784 ) -> List[ContextEntity]:
1785 """
1786 Removes invalid relationships from the entities. An invalid relationship
1787 is a relationship that has no destination entity.
1789 Args:
1790 entities: list of entities that need to be validated.
1791 hard_remove: If True, invalid relationships will be deleted.
1792 If False, invalid relationships will be changed to Text
1793 attributes.
1795 Returns:
1796 updated entities
1797 """
1798 updated_entities = []
1799 for entity in entities[:]:
1800 for relationship in entity.get_relationships():
1801 if not self.validate_relationship(relationship):
1802 if hard_remove:
1803 entity.delete_attributes(attrs=[relationship])
1804 else:
1805 # change the attribute type to "Text"
1806 entity.update_attribute(
1807 attrs=[
1808 NamedContextAttribute(
1809 name=relationship.name,
1810 type=DataType.TEXT,
1811 value=relationship.value,
1812 )
1813 ]
1814 )
1815 updated_entities.append(entity)
1816 return updated_entities
1818 def validate_relationship(
1819 self, relationship: Union[NamedContextAttribute, ContextAttribute, Dict]
1820 ) -> bool:
1821 """
1822 Validates a relationship. A relationship is valid if it points to an existing
1823 entity. Otherwise, it is considered invalid
1825 Args:
1826 relationship: relationship to validate
1827 Returns
1828 True if the relationship is valid, False otherwise
1829 """
1830 if isinstance(relationship, NamedContextAttribute) or isinstance(
1831 relationship, ContextAttribute
1832 ):
1833 destination_id = relationship.value
1834 elif isinstance(relationship, dict):
1835 _sentinel = object()
1836 destination_id = relationship.get("value", _sentinel)
1837 if destination_id is _sentinel:
1838 raise ValueError(
1839 "Invalid relationship dictionary format\n"
1840 "Expected format: {"
1841 f'"type": "{DataType.RELATIONSHIP.value}", '
1842 '"value" "entity_id"}'
1843 )
1844 else:
1845 raise ValueError("Invalid relationship type.")
1846 try:
1847 destination_entity = self.get_entity(entity_id=destination_id)
1848 return destination_entity.id == destination_id
1849 except requests.RequestException as err:
1850 if err.response.status_code == 404:
1851 return False
1853 def update_registration(self, registration: Registration):
1854 """
1855 Only the fields included in the request are updated in the registration.
1857 Args:
1858 registration: Registration to update
1859 Returns:
1861 """
1862 url = urljoin(
1863 self.base_url, f"{self._url_version}/registrations/{registration.id}"
1864 )
1865 headers = self.headers.copy()
1866 headers.update({"Content-Type": "application/json"})
1867 try:
1868 res = self.patch(
1869 url=url,
1870 headers=headers,
1871 data=registration.model_dump_json(exclude={"id"}, exclude_none=True),
1872 )
1873 if res.ok:
1874 self.logger.info("Registration successfully updated!")
1875 else:
1876 res.raise_for_status()
1877 except requests.RequestException as err:
1878 msg = f"Could not update registration {registration.id} !"
1879 self.log_error(err=err, msg=msg)
1880 raise
1882 def delete_registration(self, registration_id: str) -> None:
1883 """
1884 Deletes a subscription from a Context Broker
1885 Args:
1886 registration_id: id of the subscription
1887 """
1888 url = urljoin(
1889 self.base_url, f"{self._url_version}/registrations/{registration_id}"
1890 )
1891 headers = self.headers.copy()
1892 try:
1893 res = self.delete(url=url, headers=headers)
1894 if res.ok:
1895 self.logger.info(
1896 "Registration '%s' " "successfully deleted!", registration_id
1897 )
1898 res.raise_for_status()
1899 except requests.RequestException as err:
1900 msg = f"Could not delete registration {registration_id} !"
1901 self.log_error(err=err, msg=msg)
1902 raise
1904 # Batch operation API
1905 def update(
1906 self,
1907 *,
1908 entities: List[Union[ContextEntity, ContextEntityKeyValues]],
1909 action_type: Union[ActionType, str],
1910 update_format: str = None,
1911 forcedUpdate: bool = False,
1912 override_metadata: bool = False,
1913 ) -> None:
1914 """
1915 This operation allows to create, update and/or delete several entities
1916 in a single batch operation.
1918 This operation is split in as many individual operations as entities
1919 in the entities vector, so the actionType is executed for each one of
1920 them. Depending on the actionType, a mapping with regular non-batch
1921 operations can be done:
1923 append: maps to POST /v2/entities (if the entity does not already exist)
1924 or POST /v2/entities/<id>/attrs (if the entity already exists).
1926 appendStrict: maps to POST /v2/entities (if the entity does not
1927 already exist) or POST /v2/entities/<id>/attrs?options=append (if the
1928 entity already exists).
1930 update: maps to PATCH /v2/entities/<id>/attrs.
1932 delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every
1933 attribute included in the entity or to DELETE /v2/entities/<id> if
1934 no attribute were included in the entity.
1936 replace: maps to PUT /v2/entities/<id>/attrs.
1938 Args:
1939 entities: "an array of entities, each entity specified using the "
1940 "JSON entity representation format "
1941 action_type (Update): "actionType, to specify the kind of update
1942 action to do: either append, appendStrict, update, delete,
1943 or replace. "
1944 update_format (str): Optional 'keyValues'
1945 forcedUpdate: Update operation have to trigger any matching
1946 subscription, no matter if there is an actual attribute
1947 update or no instead of the default behavior, which is to
1948 updated only if attribute is effectively updated.
1949 override_metadata:
1950 Bool, replace the existing metadata with the one provided in
1951 the request
1952 Returns:
1954 """
1956 url = urljoin(self.base_url, f"{self._url_version}/op/update")
1957 headers = self.headers.copy()
1958 headers.update({"Content-Type": "application/json"})
1959 params = {}
1960 options = []
1961 if override_metadata:
1962 options.append("overrideMetadata")
1963 if forcedUpdate:
1964 options.append("forcedUpdate")
1965 if update_format:
1966 assert (
1967 update_format == AttrsFormat.KEY_VALUES.value
1968 ), "Only 'keyValues' is allowed as update format"
1969 options.append("keyValues")
1970 if options:
1971 params.update({"options": ",".join(options)})
1972 update = Update(actionType=action_type, entities=entities)
1973 try:
1974 res = self.post(
1975 url=url,
1976 headers=headers,
1977 params=params,
1978 json=update.model_dump(by_alias=True),
1979 )
1980 if res.ok:
1981 self.logger.info("Update operation '%s' succeeded!", action_type)
1982 else:
1983 res.raise_for_status()
1984 except requests.RequestException as err:
1985 msg = f"Update operation '{action_type}' failed!"
1986 self.log_error(err=err, msg=msg)
1987 raise
1989 def query(
1990 self,
1991 *,
1992 query: Query,
1993 limit: PositiveInt = None,
1994 order_by: str = None,
1995 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
1996 ) -> List[Any]:
1997 """
1998 Generate api query
1999 Args:
2000 query (Query):
2001 limit (PositiveInt):
2002 order_by (str):
2003 response_format (AttrsFormat, str):
2004 Returns:
2005 The response payload is an Array containing one object per matching
2006 entity, or an empty array [] if no entities are found. The entities
2007 follow the JSON entity representation format (described in the
2008 section "JSON Entity Representation").
2009 """
2010 url = urljoin(self.base_url, f"{self._url_version}/op/query")
2011 headers = self.headers.copy()
2012 headers.update({"Content-Type": "application/json"})
2013 params = {"options": "count"}
2015 if response_format:
2016 if response_format not in list(AttrsFormat):
2017 raise ValueError(f"Value must be in {list(AttrsFormat)}")
2018 params["options"] = ",".join([response_format, "count"])
2019 try:
2020 items = self.__pagination(
2021 method=PaginationMethod.POST,
2022 url=url,
2023 headers=headers,
2024 params=params,
2025 data=query.model_dump_json(exclude_none=True),
2026 limit=limit,
2027 )
2028 if response_format == AttrsFormat.NORMALIZED:
2029 adapter = TypeAdapter(List[ContextEntity])
2030 return adapter.validate_python(items)
2031 if response_format == AttrsFormat.KEY_VALUES:
2032 adapter = TypeAdapter(List[ContextEntityKeyValues])
2033 return adapter.validate_python(items)
2034 return items
2035 except requests.RequestException as err:
2036 msg = "Query operation failed!"
2037 self.log_error(err=err, msg=msg)
2038 raise
2040 def notify(self, message: Message) -> None:
2041 """
2042 This operation is intended to consume a notification payload so that
2043 all the entity data included by such notification is persisted,
2044 overwriting if necessary. This operation is useful when one NGSIv2
2045 endpoint is subscribed to another NGSIv2 endpoint (federation
2046 scenarios). The request payload must be an NGSIv2 notification
2047 payload. The behaviour must be exactly the same as 'update'
2048 with 'action_type' equal to append.
2050 Args:
2051 message: Notification message
2053 Returns:
2054 None
2055 """
2056 url = urljoin(self.base_url, "v2/op/notify")
2057 headers = self.headers.copy()
2058 headers.update({"Content-Type": "application/json"})
2059 params = {}
2060 try:
2061 res = self.post(
2062 url=url,
2063 headers=headers,
2064 params=params,
2065 data=message.model_dump_json(by_alias=True),
2066 )
2067 if res.ok:
2068 self.logger.info("Notification message sent!")
2069 else:
2070 res.raise_for_status()
2071 except requests.RequestException as err:
2072 msg = (
2073 f"Sending notifcation message failed! \n "
2074 f"{message.model_dump_json(indent=2)}"
2075 )
2076 self.log_error(err=err, msg=msg)
2077 raise
2079 def post_command(
2080 self,
2081 *,
2082 entity_id: str,
2083 command: Union[Command, NamedCommand, Dict],
2084 entity_type: str = None,
2085 command_name: str = None,
2086 ) -> None:
2087 """
2088 Post a command to a context entity this corresponds to 'PATCH' of the
2089 specified command attribute.
2091 Args:
2092 entity_id: Entity identifier
2093 command: Command
2094 entity_type: Entity type
2095 command_name: Name of the command in the entity
2097 Returns:
2098 None
2099 """
2100 if command_name:
2101 assert isinstance(command, (Command, dict))
2102 if isinstance(command, dict):
2103 command = Command(**command)
2104 command = {command_name: command.model_dump()}
2105 else:
2106 assert isinstance(command, (NamedCommand, dict))
2107 if isinstance(command, dict):
2108 command = NamedCommand(**command)
2110 self.update_existing_entity_attributes(
2111 entity_id=entity_id, entity_type=entity_type, attrs=[command]
2112 )
2114 def does_entity_exist(self, entity_id: str, entity_type: str) -> bool:
2115 """
2116 Test if an entity with given id and type is present in the CB
2118 Args:
2119 entity_id: Entity id
2120 entity_type: Entity type
2122 Returns:
2123 bool; True if entity exists
2125 Raises:
2126 RequestException, if any error occurs (e.g: No Connection),
2127 except that the entity is not found
2128 """
2129 url = urljoin(self.base_url, f"v2/entities/{entity_id}")
2130 headers = self.headers.copy()
2131 params = {"type": entity_type}
2133 try:
2134 res = self.get(url=url, params=params, headers=headers)
2135 if res.ok:
2136 return True
2137 res.raise_for_status()
2138 except requests.RequestException as err:
2139 if err.response is None or not err.response.status_code == 404:
2140 self.log_error(err=err, msg="Checking entity existence failed!")
2141 raise
2142 return False
2144 def patch_entity(
2145 self,
2146 entity: ContextEntity,
2147 old_entity: Optional[ContextEntity] = None,
2148 override_attr_metadata: bool = True,
2149 ) -> None:
2150 """
2151 Takes a given entity and updates the state in the CB to match it.
2152 It is an extended equivalent to the HTTP method PATCH, which applies
2153 partial modifications to a resource.
2155 Args:
2156 entity: Entity to update
2157 old_entity: OPTIONAL, if given only the differences between the
2158 old_entity and entity are updated in the CB.
2159 Other changes made to the entity in CB, can be kept.
2160 If type or id was changed, the old_entity will be
2161 deleted.
2162 override_attr_metadata:
2163 Whether to override or append the attributes metadata.
2164 `True` for overwrite or `False` for update/append
2166 Returns:
2167 None
2168 """
2170 new_entity = entity
2172 if old_entity is None:
2173 # If no old entity_was provided we use the current state to compare
2174 # the entity to
2175 if self.does_entity_exist(
2176 entity_id=new_entity.id, entity_type=new_entity.type
2177 ):
2178 old_entity = self.get_entity(
2179 entity_id=new_entity.id, entity_type=new_entity.type
2180 )
2181 else:
2182 # the entity is new, post and finish
2183 self.post_entity(new_entity, update=False)
2184 return
2186 else:
2187 # An old_entity was provided
2188 # check if the old_entity (still) exists else recall methode
2189 # and discard old_entity
2190 if not self.does_entity_exist(
2191 entity_id=old_entity.id, entity_type=old_entity.type
2192 ):
2193 self.patch_entity(
2194 new_entity, override_attr_metadata=override_attr_metadata
2195 )
2196 return
2198 # if type or id was changed, the old_entity needs to be deleted
2199 # and the new_entity created
2200 # In this case we will lose the current state of the entity
2201 if old_entity.id != new_entity.id or old_entity.type != new_entity.type:
2202 self.delete_entity(entity_id=old_entity.id, entity_type=old_entity.type)
2204 if not self.does_entity_exist(
2205 entity_id=new_entity.id, entity_type=new_entity.type
2206 ):
2207 self.post_entity(entity=new_entity, update=False)
2208 return
2210 # At this point we know that we need to patch only the attributes of
2211 # the entity
2212 # Check the differences between the attributes of old and new entity
2213 # Delete the removed attributes, create the new ones,
2214 # and update the existing if necessary
2215 old_attributes = old_entity.get_attributes()
2216 new_attributes = new_entity.get_attributes()
2218 # Manage attributes that existed before
2219 for old_attr in old_attributes:
2220 # commands do not exist in the ContextEntity and are only
2221 # registrations to the corresponding device. Operations as
2222 # delete will fail as it does not technically exist
2223 corresponding_new_attr = None
2224 for new_attr in new_attributes:
2225 if new_attr.name == old_attr.name:
2226 corresponding_new_attr = new_attr
2228 if corresponding_new_attr is None:
2229 # Attribute no longer exists, delete it
2230 try:
2231 self.delete_entity_attribute(
2232 entity_id=new_entity.id,
2233 entity_type=new_entity.type,
2234 attr_name=old_attr.name,
2235 )
2236 except requests.RequestException as err:
2237 msg = (
2238 f"Failed to delete attribute {old_attr.name} of "
2239 f"entity {new_entity.id}."
2240 )
2241 if err.response is not None and err.response.status_code == 404:
2242 # if the attribute is provided by a registration the
2243 # deletion will fail
2244 msg += (
2245 f" The attribute is probably provided "
2246 f"by a registration."
2247 )
2248 self.log_error(err=err, msg=msg)
2249 else:
2250 self.log_error(err=err, msg=msg)
2251 raise
2252 else:
2253 # Check if attributed changed in any way, if yes update
2254 # else do nothing and keep current state
2255 if old_attr != corresponding_new_attr:
2256 try:
2257 self.update_entity_attribute(
2258 entity_id=new_entity.id,
2259 entity_type=new_entity.type,
2260 attr=corresponding_new_attr,
2261 override_metadata=override_attr_metadata,
2262 )
2263 except requests.RequestException as err:
2264 msg = (
2265 f"Failed to update attribute {old_attr.name} of "
2266 f"entity {new_entity.id}."
2267 )
2268 if err.response is not None and err.response.status_code == 404:
2269 # if the attribute is provided by a registration the
2270 # update will fail
2271 msg += (
2272 f" The attribute is probably provided "
2273 f"by a registration."
2274 )
2275 self.log_error(err=err, msg=msg)
2276 raise
2278 # Create new attributes
2279 update_entity = ContextEntity(id=entity.id, type=entity.type)
2280 update_needed = False
2281 for new_attr in new_attributes:
2282 # commands do not exist in the ContextEntity and are only
2283 # registrations to the corresponding device. Operations as
2284 # delete will fail as it does not technically exists
2285 attr_existed = False
2286 for old_attr in old_attributes:
2287 if new_attr.name == old_attr.name:
2288 attr_existed = True
2290 if not attr_existed:
2291 update_needed = True
2292 update_entity.add_attributes([new_attr])
2294 if update_needed:
2295 self.update_entity(update_entity)
2297 def _subscription_dicts_are_equal(self, first: dict, second: dict):
2298 """
2299 Check if two dictionaries and all sub-dictionaries are equal.
2300 Logs a warning if the keys are not equal, but ignores the
2301 comparison of such keys.
2303 Args:
2304 first dict: Dictionary of first subscription
2305 second dict: Dictionary of second subscription
2307 Returns:
2308 True if equal, else False
2309 """
2311 def _value_is_not_none(value):
2312 """
2313 Recursive function to check if a value equals none.
2314 If the value is a dict and any value of the dict is not none,
2315 the value is not none.
2316 If the value is a list and any item is not none, the value is not none.
2317 If it's neither dict nore list, bool is used.
2318 """
2319 if isinstance(value, dict):
2320 return any([_value_is_not_none(value=_v) for _v in value.values()])
2321 if isinstance(value, list):
2322 return any([_value_is_not_none(value=_v) for _v in value])
2323 else:
2324 return bool(value)
2326 if first.keys() != second.keys():
2327 warnings.warn(
2328 "Subscriptions contain a different set of fields. "
2329 "Only comparing to new fields of the new one."
2330 )
2331 for k, v in first.items():
2332 ex_value = second.get(k, None)
2333 if isinstance(v, dict) and isinstance(ex_value, dict):
2334 equal = self._subscription_dicts_are_equal(v, ex_value)
2335 if equal:
2336 continue
2337 else:
2338 return False
2339 if v != ex_value:
2340 self.logger.debug(f"Not equal fields for key {k}: ({v}, {ex_value})")
2341 if (
2342 not _value_is_not_none(v)
2343 and not _value_is_not_none(ex_value)
2344 or k == "timesSent"
2345 ):
2346 continue
2347 return False
2348 return True
2351#
2352#
2353# def check_duplicate_subscription(self, subscription_body, limit: int = 20):
2354# """
2355# Function compares the subject of the subscription body, on whether a subscription
2356# already exists for a device / entity.
2357# :param subscription_body: the body of the new subscripton
2358# :param limit: pagination parameter, to set the number of
2359# subscriptions bodies the get request should grab
2360# :return: exists, boolean -> True, if such a subscription allready
2361# exists
2362# """
2363# exists = False
2364# subscription_subject = json.loads(subscription_body)["subject"]
2365# # Exact keys depend on subscription body
2366# try:
2367# subscription_url = json.loads(subscription_body)[
2368# "notification"]["httpCustom"]["url"]
2369# except KeyError:
2370# subscription_url = json.loads(subscription_body)[
2371# "notification"]["http"]["url"]
2372#
2373# # If the number of subscriptions is larger then the limit,
2374# paginations methods have to be used
2375# url = self.url + '/v2/subscriptions?limit=' + str(limit) +
2376# '&options=count'
2377# response = self.session.get(url, headers=self.get_header())
2378#
2379# sub_count = float(response.headers["Fiware-Total-Count"])
2380# response = json.loads(response.text)
2381# if sub_count >= limit:
2382# response = self.get_pagination(url=url, headers=self.get_header(),
2383# limit=limit, count=sub_count)
2384# response = json.loads(response)
2385#
2386# for existing_subscription in response:
2387# # check whether the exact same subscriptions already exists
2388# if existing_subscription["subject"] == subscription_subject:
2389# exists = True
2390# break
2391# try:
2392# existing_url = existing_subscription["notification"][
2393# "http"]["url"]
2394# except KeyError:
2395# existing_url = existing_subscription["notification"][
2396# "httpCustom"]["url"]
2397# # check whether both subscriptions notify to the same path
2398# if existing_url != subscription_url:
2399# continue
2400# else:
2401# # iterate over all entities included in the subscription object
2402# for entity in subscription_subject["entities"]:
2403# if 'type' in entity.keys():
2404# subscription_type = entity['type']
2405# else:
2406# subscription_type = entity['typePattern']
2407# if 'id' in entity.keys():
2408# subscription_id = entity['id']
2409# else:
2410# subscription_id = entity["idPattern"]
2411# # iterate over all entities included in the exisiting
2412# subscriptions
2413# for existing_entity in existing_subscription["subject"][
2414# "entities"]:
2415# if "type" in entity.keys():
2416# type_existing = entity["type"]
2417# else:
2418# type_existing = entity["typePattern"]
2419# if "id" in entity.keys():
2420# id_existing = entity["id"]
2421# else:
2422# id_existing = entity["idPattern"]
2423# # as the ID field is non optional, it has to match
2424# # check whether the type match
2425# # if the type field is empty, they match all types
2426# if (type_existing == subscription_type) or\
2427# ('*' in subscription_type) or \
2428# ('*' in type_existing)\
2429# or (type_existing == "") or (
2430# subscription_type == ""):
2431# # check if on of the subscriptions is a pattern,
2432# or if they both refer to the same id
2433# # Get the attrs first, to avoid code duplication
2434# # last thing to compare is the attributes
2435# # Assumption -> position is the same as the
2436# entities _list
2437# # i == j
2438# i = subscription_subject["entities"].index(entity)
2439# j = existing_subscription["subject"][
2440# "entities"].index(existing_entity)
2441# try:
2442# subscription_attrs = subscription_subject[
2443# "condition"]["attrs"][i]
2444# except (KeyError, IndexError):
2445# subscription_attrs = []
2446# try:
2447# existing_attrs = existing_subscription[
2448# "subject"]["condition"]["attrs"][j]
2449# except (KeyError, IndexError):
2450# existing_attrs = []
2451#
2452# if (".*" in subscription_id) or ('.*' in
2453# id_existing) or (subscription_id == id_existing):
2454# # Attributes have to match, or the have to
2455# be an empty array
2456# if (subscription_attrs == existing_attrs) or
2457# (subscription_attrs == []) or (existing_attrs == []):
2458# exists = True
2459# # if they do not match completely or subscribe
2460# to all ids they have to match up to a certain position
2461# elif ("*" in subscription_id) or ('*' in
2462# id_existing):
2463# regex_existing = id_existing.find('*')
2464# regex_subscription =
2465# subscription_id.find('*')
2466# # slice the strings to compare
2467# if (id_existing[:regex_existing] in
2468# subscription_id) or (subscription_id[:regex_subscription] in id_existing) or \
2469# (id_existing[regex_existing:] in
2470# subscription_id) or (subscription_id[regex_subscription:] in id_existing):
2471# if (subscription_attrs ==
2472# existing_attrs) or (subscription_attrs == []) or (existing_attrs == []):
2473# exists = True
2474# else:
2475# continue
2476# else:
2477# continue
2478# else:
2479# continue
2480# else:
2481# continue
2482# else:
2483# continue
2484# return exists
2485#