Coverage for filip / clients / ngsi_v2 / cb.py: 81%
703 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 08:01 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 08:01 +0000
1"""
2Context Broker Module for API Client
3"""
5from __future__ import annotations
7import copy
8import json
9from copy import deepcopy
10from enum import Enum
11from math import inf
12from packaging.version import parse as parse_version
13from packaging import version
14from pydantic import PositiveInt, PositiveFloat, AnyHttpUrl, ValidationError
15from pydantic.type_adapter import TypeAdapter
16from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union
17import re
18import requests
19from urllib.parse import urljoin
20import warnings
21from requests import RequestException
22from filip.clients.base_http_client import BaseHttpClient, NgsiURLVersion
23from filip.config import settings
24from filip.models.base import FiwareHeader, PaginationMethod, DataType
25from filip.utils.simple_ql import QueryString
26from filip.models.ngsi_v2.context import (
27 ActionType,
28 Command,
29 ContextEntity,
30 ContextEntityKeyValues,
31 ContextAttribute,
32 NamedCommand,
33 NamedContextAttribute,
34 Query,
35 Update,
36 PropertyFormat,
37 ContextEntityList,
38 ContextEntityKeyValuesList,
39 ContextEntityValidationList,
40 ContextEntityKeyValuesValidationList,
41)
42from filip.models.ngsi_v2.base import AttrsFormat
43from filip.models.ngsi_v2.subscriptions import Subscription, Message
44from filip.models.ngsi_v2.registrations import Registration
45from filip.clients.exceptions import BaseHttpClientException
47if TYPE_CHECKING:
48 from filip.clients.ngsi_v2.iota import IoTAClient
51class ContextBrokerClient(BaseHttpClient):
52 """
53 Implementation of NGSI Context Broker functionalities, such as creating
54 entities and subscriptions; retrieving, updating and deleting data.
55 Further documentation:
56 https://fiware-orion.readthedocs.io/en/master/
58 Api specifications for v2 are located here:
59 https://telefonicaid.github.io/fiware-orion/api/v2/stable/
61 Note:
62 We use the reference implementation for development. Therefore, some
63 other brokers may show slightly different behavior!
64 """
66 def __init__(
67 self,
68 url: str = None,
69 *,
70 session: requests.Session = None,
71 fiware_header: FiwareHeader = None,
72 **kwargs,
73 ):
74 """
76 Args:
77 url: Url of context broker server
78 session (requests.Session):
79 fiware_header (FiwareHeader): fiware service and fiware service path
80 **kwargs (Optional): Optional arguments that ``request`` takes.
81 """
82 # set service url
83 url = url or settings.CB_URL
84 self._url_version = NgsiURLVersion.v2_url.value
85 super().__init__(
86 url=url, session=session, fiware_header=fiware_header, **kwargs
87 )
88 self._check_correct_cb_version()
90 def __pagination(
91 self,
92 *,
93 method: PaginationMethod = PaginationMethod.GET,
94 url: str,
95 headers: Dict,
96 limit: Union[PositiveInt, PositiveFloat] = None,
97 params: Dict = None,
98 data: str = None,
99 ) -> List[Dict]:
100 """
101 NGSIv2 implements a pagination mechanism in order to help clients to
102 retrieve large sets of resources. This mechanism works for all listing
103 operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions,
104 POST /v2/op/query, etc.). This function helps getting datasets that are
105 larger than the limit for the different GET operations.
107 https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html
109 Args:
110 url: Information about the url, obtained from the original function
111 headers: The headers from the original function
112 params:
113 limit:
115 Returns:
116 object:
118 """
119 if limit is None:
120 limit = inf
121 if limit > 1000:
122 params["limit"] = 1000 # maximum items per request
123 else:
124 params["limit"] = limit
126 if self.session:
127 session = self.session
128 else:
129 session = requests.Session()
130 with session:
131 res = session.request(
132 method=method, url=url, params=params, headers=headers, data=data
133 )
134 if res.ok:
135 items = res.json()
136 # do pagination
137 count = int(res.headers["Fiware-Total-Count"])
139 while len(items) < limit and len(items) < count:
140 # Establishing the offset from where entities are retrieved
141 params["offset"] = len(items)
142 params["limit"] = min(1000, (limit - len(items)))
143 res = session.request(
144 method=method,
145 url=url,
146 params=params,
147 headers=headers,
148 data=data,
149 )
150 if res.ok:
151 items.extend(res.json())
152 else:
153 res.raise_for_status()
154 self.logger.debug("Received: %s", items)
155 return items
156 res.raise_for_status()
158 # MANAGEMENT API
159 def get_version(self) -> Dict:
160 """
161 Gets version of IoT Agent
162 Returns:
163 Dictionary with response
164 """
165 url = urljoin(self.base_url, "version")
166 try:
167 res = self.get(url=url, headers=self.headers)
168 if res.ok:
169 return res.json()
170 res.raise_for_status()
171 except requests.RequestException as err:
172 self.logger.error(err)
173 msg = f"Fetch version fails, reason: {err.args}"
174 raise BaseHttpClientException(message=msg, response=err.response) from err
176 def _check_correct_cb_version(self) -> None:
177 """
178 Checks whether the used Orion version is greater or equal than the minimum required orion version of
179 the current filip version
180 """
181 orion_version = self.get_version()["orion"]["version"]
182 if version.parse(orion_version) < version.parse(settings.MINIMUM_ORION_VERSION):
183 self.logger.warning(
184 f"You are using orion version {orion_version}. There was a breaking change in Orion Version "
185 f"{settings.MINIMUM_ORION_VERSION}, therefore functionality is not assured when using "
186 f"version {orion_version}."
187 )
189 def get_resources(self) -> Dict:
190 """
191 Gets reo
193 Returns:
194 Dict
195 """
196 url = urljoin(self.base_url, self._url_version)
197 try:
198 res = self.get(url=url, headers=self.headers)
199 if res.ok:
200 return res.json()
201 res.raise_for_status()
202 except requests.RequestException as err:
203 self.logger.error(err)
204 raise RequestException(response=err.response) from err
206 # STATISTICS API
207 def get_statistics(self) -> Dict:
208 """
209 Gets statistics of context broker
210 Returns:
211 Dictionary with response
212 """
213 url = urljoin(self.base_url, "statistics")
214 try:
215 res = self.get(url=url, headers=self.headers)
216 if res.ok:
217 return res.json()
218 res.raise_for_status()
219 except requests.RequestException as err:
220 self.logger.error(err)
221 raise BaseHttpClientException(
222 message=err.response.text, response=err.response
223 ) from err
225 # CONTEXT MANAGEMENT API ENDPOINTS
226 # Entity Operations
227 def post_entity(
228 self,
229 entity: Union[ContextEntity, ContextEntityKeyValues],
230 update: bool = False,
231 patch: bool = False,
232 override_metadata: bool = True,
233 key_values: bool = False,
234 ):
235 """
236 Function registers an Object with the NGSI Context Broker,
237 if it already exists it can be automatically updated (overwritten)
238 if the update bool is True.
239 First a post request with the entity is tried, if the response code
240 is 422 the entity is uncrossable, as it already exists there are two
241 options, either overwrite it, if the attribute have changed
242 (e.g. at least one new/new values) (update = True) or leave
243 it the way it is (update=False)
244 If you only want to manipulate the entities values, you need to set
245 patch argument.
247 Args:
248 entity (ContextEntity/ContextEntityKeyValues):
249 Context Entity Object
250 update (bool):
251 If the response.status_code is 422, whether the override and
252 existing entity
253 patch (bool):
254 If the response.status_code is 422, whether to manipulate the
255 existing entity. Omitted if update `True`.
256 override_metadata:
257 Only applies for patch equal to `True`.
258 Whether to override or append the attribute's metadata.
259 `True` for overwrite or `False` for update/append
260 key_values(bool):
261 By default False. If set to True, "options=keyValues" will
262 be included in params of post request. The payload uses
263 the keyValues simplified entity representation, i.e.
264 ContextEntityKeyValues.
265 """
266 url = urljoin(self.base_url, f"{self._url_version}/entities")
267 headers = self.headers.copy()
268 params = {}
269 options = []
270 if key_values:
271 assert isinstance(entity, ContextEntityKeyValues)
272 options.append("keyValues")
273 else:
274 assert isinstance(entity, ContextEntity)
275 if options:
276 params.update({"options": ",".join(options)})
277 try:
278 res = self.post(
279 url=url,
280 headers=headers,
281 json=entity.model_dump(exclude_none=True),
282 params=params,
283 )
284 if res.ok:
285 self.logger.info("Entity successfully posted!")
286 return res.headers.get("Location")
287 res.raise_for_status()
288 except requests.RequestException as err:
289 if err.response is not None:
290 if update and err.response.status_code == 422:
291 return self.override_entity(entity=entity, key_values=key_values)
292 if patch and err.response.status_code == 422:
293 return self.patch_entity(
294 entity=entity,
295 override_metadata=override_metadata,
296 key_values=key_values,
297 )
298 msg = f"Could not post entity {entity.id}"
299 raise BaseHttpClientException(message=msg, response=err.response) from err
301 def get_entity_list(
302 self,
303 *,
304 entity_ids: List[str] = None,
305 entity_types: List[str] = None,
306 id_pattern: str = None,
307 type_pattern: str = None,
308 q: Union[str, QueryString] = None,
309 mq: Union[str, QueryString] = None,
310 georel: str = None,
311 geometry: str = None,
312 coords: str = None,
313 limit: PositiveInt = inf,
314 attrs: List[str] = None,
315 metadata: str = None,
316 order_by: str = None,
317 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
318 include_invalid: bool = False,
319 ) -> Union[
320 List[Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]],
321 ContextEntityValidationList,
322 ContextEntityKeyValuesValidationList,
323 ]:
324 r"""
325 Retrieves a list of context entities that match different criteria by
326 id, type, pattern matching (either id or type) and/or those which
327 match a query or geographical query (see Simple Query Language and
328 Geographical Queries). A given entity has to match all the criteria
329 to be retrieved (i.e., the criteria is combined in a logical AND
330 way). Note that pattern matching query parameters are incompatible
331 (i.e. mutually exclusive) with their corresponding exact matching
332 parameters, i.e. idPattern with id and typePattern with type.
334 Args:
335 entity_ids: A comma-separated list of elements. Retrieve entities
336 whose ID matches one of the elements in the list.
337 Incompatible with idPattern,e.g. Boe_Idarium
338 entity_types: comma-separated list of elements. Retrieve entities
339 whose type matches one of the elements in the list.
340 Incompatible with typePattern. Example: Room.
341 id_pattern: A correctly formatted regular expression. Retrieve
342 entities whose ID matches the regular expression. Incompatible
343 with id, e.g. ngsi-ld.* or sensor.*
344 type_pattern: A correctly formatted regular expression. Retrieve
345 entities whose type matches the regular expression.
346 Incompatible with type, e.g. room.*
347 q (SimpleQuery): A query expression, composed of a list of
348 statements separated by ;, i.e.,
349 q=statement1;statement2;statement3. See Simple Query
350 Language specification. Example: temperature>40.
351 mq (SimpleQuery): A query expression for attribute metadata,
352 composed of a list of statements separated by ;, i.e.,
353 mq=statement1;statement2;statement3. See Simple Query
354 Language specification. Example: temperature.accuracy<0.9.
355 georel: Spatial relationship between matching entities and a
356 reference shape. See Geographical Queries. Example: 'near'.
357 geometry: Geographical area to which the query is restricted.
358 See Geographical Queries. Example: point.
359 coords: List of latitude-longitude pairs of coordinates separated
360 by ';'. See Geographical Queries. Example: 41.390205,
361 2.154007;48.8566,2.3522.
362 limit: Limits the number of entities to be retrieved Example: 20
363 attrs: Comma-separated list of attribute names whose data are to
364 be included in the response. The attributes are retrieved in
365 the order specified by this parameter. If this parameter is
366 not included, the attributes are retrieved in arbitrary
367 order. See "Filtering out attributes and metadata" section in https://fiware-orion.readthedocs.io/en/master/orion-api.html#filtering-out-attributes-and-metadata
368 for more detail. Example: seatNumber.
369 metadata: A list of metadata names to include in the response.
370 See "Filtering out attributes and metadata" section in https://fiware-orion.readthedocs.io/en/master/orion-api.html#filtering-out-attributes-and-metadata for more
371 detail. Example: accuracy.
372 order_by: Criteria for ordering results. See "Ordering Results"
373 section for details. Example: temperature,!speed.
374 response_format (AttrsFormat, str): Response Format. Note: That if
375 'keyValues' or 'values' are used the response model will
376 change to List[ContextEntityKeyValues] and to List[Dict[str,
377 Any]], respectively.
378 include_invalid: Specify if the returned list should also contain a list of invalid entity IDs or not.
379 Returns:
381 """
382 url = urljoin(self.base_url, f"{self._url_version}/entities/")
383 headers = self.headers.copy()
384 params = {}
386 if entity_ids and id_pattern:
387 raise ValueError
388 if entity_types and type_pattern:
389 raise ValueError
390 if entity_ids:
391 if not isinstance(entity_ids, list):
392 entity_ids = [entity_ids]
393 params.update({"id": ",".join(entity_ids)})
394 if id_pattern:
395 try:
396 re.compile(id_pattern)
397 except re.error as err:
398 raise ValueError(f"Invalid Pattern: {err}") from err
399 params.update({"idPattern": id_pattern})
400 if entity_types:
401 if not isinstance(entity_types, list):
402 entity_types = [entity_types]
403 params.update({"type": ",".join(entity_types)})
404 if type_pattern:
405 try:
406 re.compile(type_pattern)
407 except re.error as err:
408 raise ValueError(f"Invalid Pattern: {err.msg}") from err
409 params.update({"typePattern": type_pattern})
410 if attrs:
411 params.update({"attrs": ",".join(attrs)})
412 if metadata:
413 params.update({"metadata": ",".join(metadata)})
414 if q:
415 if isinstance(q, str):
416 q = QueryString.parse_str(q)
417 params.update({"q": str(q)})
418 if mq:
419 params.update({"mq": str(mq)})
420 if geometry:
421 params.update({"geometry": geometry})
422 if georel:
423 params.update({"georel": georel})
424 if coords:
425 params.update({"coords": coords})
426 if order_by:
427 params.update({"orderBy": order_by})
428 if response_format not in list(AttrsFormat):
429 raise ValueError(f"Value must be in {list(AttrsFormat)}")
430 response_format = ",".join(["count", response_format])
431 params.update({"options": response_format})
432 try:
433 items = self.__pagination(
434 method=PaginationMethod.GET,
435 limit=limit,
436 url=url,
437 params=params,
438 headers=headers,
439 )
440 if include_invalid:
441 valid_entities = []
442 invalid_entities = []
444 if AttrsFormat.NORMALIZED in response_format:
445 adapter = TypeAdapter(ContextEntity)
447 for entity in items:
448 try:
449 valid_entity = adapter.validate_python(entity)
450 valid_entities.append(valid_entity)
451 except ValidationError:
452 invalid_entities.append(entity.get("id"))
454 return ContextEntityValidationList.model_validate(
455 {
456 "entities": valid_entities,
457 "invalid_entities": invalid_entities,
458 }
459 )
460 elif AttrsFormat.KEY_VALUES in response_format:
461 adapter = TypeAdapter(ContextEntityKeyValues)
463 for entity in items:
464 try:
465 valid_entity = adapter.validate_python(entity)
466 valid_entities.append(valid_entity)
467 except ValidationError:
468 invalid_entities.append(entity.get("id"))
470 return ContextEntityKeyValuesValidationList.model_validate(
471 {
472 "entities": valid_entities,
473 "invalid_entities": invalid_entities,
474 }
475 )
476 else:
477 return items
478 else:
479 if AttrsFormat.NORMALIZED in response_format:
480 return ContextEntityList.model_validate(
481 {"entities": items}
482 ).entities
483 elif AttrsFormat.KEY_VALUES in response_format:
484 return ContextEntityKeyValuesList.model_validate(
485 {"entities": items}
486 ).entities
487 return items # in case of VALUES as response_format
489 except requests.RequestException as err:
490 msg = "Could not load entities"
491 raise BaseHttpClientException(message=msg, response=err.response) from err
493 def get_entity(
494 self,
495 entity_id: str,
496 entity_type: str = None,
497 attrs: List[str] = None,
498 metadata: List[str] = None,
499 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
500 ) -> Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]:
501 """
502 This operation must return one entity element only, but there may be
503 more than one entity with the same ID (e.g. entities with same ID but
504 different types). In such case, an error message is returned, with
505 the HTTP status code set to 409 Conflict.
507 Args:
508 entity_id (String): Id of the entity to be retrieved
509 entity_type (String): Entity type, to avoid ambiguity in case
510 there are several entities with the same entity id.
511 attrs (List of Strings): List of attribute names whose data must be
512 included in the response. The attributes are retrieved in the
513 order specified by this parameter.
514 See "Filtering out attributes and metadata" section in https://fiware-orion.readthedocs.io/en/master/orion-api.html#filtering-out-attributes-and-metadata for more
515 detail. If this parameter is not included, the attributes are
516 retrieved in arbitrary order, and all the attributes of the
517 entity are included in the response.
518 Example: temperature, humidity.
519 metadata (List of Strings): A list of metadata names to include in
520 the response. See "Filtering out attributes and metadata"
521 section for more detail. Example: accuracy.
522 response_format (AttrsFormat, str): Representation format of
523 response
524 Returns:
525 ContextEntity
526 """
527 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}")
528 headers = self.headers.copy()
529 params = {}
530 if entity_type:
531 params.update({"type": entity_type})
532 if attrs:
533 params.update({"attrs": ",".join(attrs)})
534 if metadata:
535 params.update({"metadata": ",".join(metadata)})
536 if response_format not in list(AttrsFormat):
537 raise ValueError(f"Value must be in {list(AttrsFormat)}")
538 params.update({"options": response_format})
540 try:
541 res = self.get(url=url, params=params, headers=headers)
542 if res.ok:
543 self.logger.info("Entity successfully retrieved!")
544 self.logger.debug("Received: %s", res.json())
545 if response_format == AttrsFormat.NORMALIZED:
546 return ContextEntity(**res.json())
547 if response_format == AttrsFormat.KEY_VALUES:
548 return ContextEntityKeyValues(**res.json())
549 return res.json()
550 res.raise_for_status()
551 except requests.RequestException as err:
552 msg = f"Could not load entity {entity_id}"
553 raise BaseHttpClientException(message=msg, response=err.response) from err
555 def get_entity_attributes(
556 self,
557 entity_id: str,
558 entity_type: str = None,
559 attrs: List[str] = None,
560 metadata: List[str] = None,
561 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
562 ) -> Dict[str, ContextAttribute]:
563 """
564 This request is similar to retrieving the whole entity, however this
565 one omits the id and type fields. Just like the general request of
566 getting an entire entity, this operation must return only one entity
567 element. If more than one entity with the same ID is found (e.g.
568 entities with same ID but different type), an error message is
569 returned, with the HTTP status code set to 409 Conflict.
571 Args:
572 entity_id (String): Id of the entity to be retrieved
573 entity_type (String): Entity type, to avoid ambiguity in case
574 there are several entities with the same entity id.
575 attrs (List of Strings): List of attribute names whose data must be
576 included in the response. The attributes are retrieved in the
577 order specified by this parameter.
578 See "Filtering out attributes and metadata" section in https://fiware-orion.readthedocs.io/en/master/orion-api.html#filtering-out-attributes-and-metadata for more
579 detail. If this parameter is not included, the attributes are
580 retrieved in arbitrary order, and all the attributes of the
581 entity are included in the response. Example: temperature,
582 humidity.
583 metadata (List of Strings): A list of metadata names to include in
584 the response. See "Filtering out attributes and metadata"
585 section for more detail. Example: accuracy.
586 response_format (AttrsFormat, str): Representation format of
587 response
588 Returns:
589 Dict
590 """
591 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
592 headers = self.headers.copy()
593 params = {}
594 if entity_type:
595 params.update({"type": entity_type})
596 if attrs:
597 params.update({"attrs": ",".join(attrs)})
598 if metadata:
599 params.update({"metadata": ",".join(metadata)})
600 if response_format not in list(AttrsFormat):
601 raise ValueError(f"Value must be in {list(AttrsFormat)}")
602 params.update({"options": response_format})
603 try:
604 res = self.get(url=url, params=params, headers=headers)
605 if res.ok:
606 if response_format == AttrsFormat.NORMALIZED:
607 return {
608 key: ContextAttribute(**values)
609 for key, values in res.json().items()
610 }
611 return res.json()
612 res.raise_for_status()
613 except requests.RequestException as err:
614 msg = f"Could not load attributes from entity {entity_id} !"
615 raise BaseHttpClientException(message=msg, response=err.response) from err
617 def update_entity(
618 self,
619 entity: Union[ContextEntity, ContextEntityKeyValues, dict],
620 append_strict: bool = False,
621 key_values: bool = False,
622 ):
623 """
624 The request payload is an object representing the attributes to
625 append or update.
627 Note:
628 Update means overwriting the existing entity. If you want to
629 manipulate you should rather use patch_entity.
631 Args:
632 entity (ContextEntity):
633 append_strict: If `False` the entity attributes are updated (if they
634 previously exist) or appended (if they don't previously exist)
635 with the ones in the payload.
636 If `True` all the attributes in the payload not
637 previously existing in the entity are appended. In addition
638 to that, in case some of the attributes in the payload
639 already exist in the entity, an error is returned.
640 More precisely this means a strict append procedure.
641 key_values: By default False. If set to True, the payload uses
642 the keyValues simplified entity representation, i.e.
643 ContextEntityKeyValues.
644 Returns:
645 None
646 """
647 if key_values:
648 if isinstance(entity, dict):
649 entity = copy.deepcopy(entity)
650 _id = entity.pop("id")
651 _type = entity.pop("type")
652 attrs = entity
653 entity = ContextEntityKeyValues(id=_id, type=_type)
654 else:
655 attrs = entity.model_dump(exclude={"id", "type"})
656 else:
657 attrs = entity.get_attributes()
658 self.update_or_append_entity_attributes(
659 entity_id=entity.id,
660 entity_type=entity.type,
661 attrs=attrs,
662 append_strict=append_strict,
663 key_values=key_values,
664 )
666 def update_entity_properties(
667 self, entity: ContextEntity, append_strict: bool = False
668 ):
669 """
670 The request payload is an object representing the attributes, of any type
671 but Relationship, to append or update.
673 Note:
674 Update means overwriting the existing entity. If you want to
675 manipulate you should rather use patch_entity.
677 Args:
678 entity (ContextEntity):
679 append_strict: If `False` the entity attributes are updated (if they
680 previously exist) or appended (if they don't previously exist)
681 with the ones in the payload.
682 If `True` all the attributes in the payload not
683 previously existing in the entity are appended. In addition
684 to that, in case some of the attributes in the payload
685 already exist in the entity, an error is returned.
686 More precisely this means a strict append procedure.
688 Returns:
689 None
690 """
691 self.update_or_append_entity_attributes(
692 entity_id=entity.id,
693 entity_type=entity.type,
694 attrs=entity.get_properties(),
695 append_strict=append_strict,
696 )
698 def update_entity_relationships(
699 self, entity: ContextEntity, append_strict: bool = False
700 ):
701 """
702 The request payload is an object representing only the attributes, of type
703 Relationship, to append or update.
705 Note:
706 Update means overwriting the existing entity. If you want to
707 manipulate you should rather use patch_entity.
709 Args:
710 entity (ContextEntity):
711 append_strict: If `False` the entity attributes are updated (if they
712 previously exist) or appended (if they don't previously exist)
713 with the ones in the payload.
714 If `True` all the attributes in the payload not
715 previously existing in the entity are appended. In addition
716 to that, in case some of the attributes in the payload
717 already exist in the entity, an error is returned.
718 More precisely this means a strict append procedure.
720 Returns:
721 None
722 """
723 self.update_or_append_entity_attributes(
724 entity_id=entity.id,
725 entity_type=entity.type,
726 attrs=entity.get_relationships(),
727 append_strict=append_strict,
728 )
730 def delete_entity(
731 self,
732 entity_id: str,
733 entity_type: str = None,
734 delete_devices: bool = False,
735 iota_client: IoTAClient = None,
736 iota_url: AnyHttpUrl = settings.IOTA_URL,
737 ) -> None:
738 """
739 Remove a entity from the context broker. No payload is required
740 or received.
742 Args:
743 entity_id:
744 Id of the entity to be deleted
745 entity_type:
746 Entity type, to avoid ambiguity in case there are several
747 entities with the same entity id.
748 delete_devices:
749 If True, also delete all devices that reference this
750 entity (entity_id as entity_name)
751 iota_client:
752 Corresponding IoTA-Client used to access IoTA-Agent
753 iota_url:
754 URL of the corresponding IoT-Agent. This will autogenerate
755 an IoTA-Client, mirroring the information of the
756 ContextBrokerClient, e.g. FiwareHeader, and other headers
758 Returns:
759 None
760 """
761 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}")
762 headers = self.headers.copy()
763 if entity_type:
764 params = {"type": entity_type}
765 else:
766 params = None
767 try:
768 res = self.delete(url=url, params=params, headers=headers)
769 if res.ok:
770 self.logger.info("Entity '%s' successfully deleted!", entity_id)
771 else:
772 res.raise_for_status()
773 except requests.RequestException as err:
774 msg = f"Could not delete entity {entity_id} !"
775 raise BaseHttpClientException(message=msg, response=err.response) from err
777 if delete_devices:
778 from filip.clients.ngsi_v2 import IoTAClient
780 if iota_client:
781 iota_client_local = deepcopy(iota_client)
782 else:
783 warnings.warn(
784 "No IoTA-Client object provided! "
785 "Will try to generate one. "
786 "This usage is not recommended."
787 )
789 iota_client_local = IoTAClient(
790 url=iota_url,
791 fiware_header=self.fiware_headers,
792 headers=self.headers,
793 )
795 for device in iota_client_local.get_device_list(entity_names=[entity_id]):
796 if entity_type:
797 if device.entity_type == entity_type:
798 iota_client_local.delete_device(device_id=device.device_id)
799 else:
800 iota_client_local.delete_device(device_id=device.device_id)
801 iota_client_local.close()
803 def delete_entities(self, entities: List[ContextEntity]) -> None:
804 """
805 Remove a list of entities from the context broker. This methode is
806 more efficient than to call delete_entity() for each entity
808 Args:
809 entities: List[ContextEntity]: List of entities to be deleted
811 Raises:
812 Exception, if one of the entities is not in the ContextBroker
814 Returns:
815 None
816 """
818 # update() delete, deletes all entities without attributes completely,
819 # and removes the attributes for the other
820 # The entities are sorted based on the fact if they have
821 # attributes.
822 limit = 1000 # max number of entities that will be deleted at once
823 entities_with_attributes: List[ContextEntity] = []
824 for entity in entities:
825 attribute_names = [
826 key
827 for key in entity.model_dump()
828 if key not in ContextEntity.model_fields
829 ]
830 if len(attribute_names) > 0:
831 entities_with_attributes.append(
832 ContextEntity(id=entity.id, type=entity.type)
833 )
835 # Post update_delete for those without attribute only once,
836 # for the other post update_delete again but for the changed entity
837 # in the ContextBroker (only id and type left)
838 while len(entities) > 0:
839 self.update(entities=entities[0:limit], action_type="delete")
840 entities = entities[limit:]
841 while len(entities_with_attributes) > 0:
842 self.update(
843 entities=entities_with_attributes[0:limit], action_type="delete"
844 )
845 entities_with_attributes = entities_with_attributes[limit:]
847 def update_or_append_entity_attributes(
848 self,
849 entity_id: str,
850 attrs: Union[
851 List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any]
852 ],
853 entity_type: str = None,
854 append_strict: bool = False,
855 forcedUpdate: bool = False,
856 key_values: bool = False,
857 ):
858 """
859 The request payload is an object representing the attributes to
860 append or update. This corresponds to a 'POST' request if append is
861 set to 'False'
863 Note:
864 Be careful not to update attributes that are
865 provided via context registration, e.g. commands. Commands are
866 removed before sending the request. To avoid breaking things.
868 Args:
869 entity_id: Entity id to be updated
870 entity_type: Entity type, to avoid ambiguity in case there are
871 several entities with the same entity id.
872 attrs: List of attributes to update or to append
873 append_strict: If `False` the entity attributes are updated (if they
874 previously exist) or appended (if they don't previously exist)
875 with the ones in the payload.
876 If `True` all the attributes in the payload not
877 previously existing in the entity are appended. In addition
878 to that, in case some of the attributes in the payload
879 already exist in the entity, an error is returned.
880 More precisely this means a strict append procedure.
881 forcedUpdate: Update operation have to trigger any matching
882 subscription, no matter if there is an actual attribute
883 update or no instead of the default behavior, which is to
884 updated only if attribute is effectively updated.
885 key_values: By default False. If set to True, the payload uses
886 the keyValues simplified entity representation, i.e.
887 ContextEntityKeyValues.
888 Returns:
889 None
891 """
892 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
893 headers = self.headers.copy()
894 params = {}
895 if entity_type:
896 params.update({"type": entity_type})
897 else:
898 entity_type = "dummy"
900 options = []
901 if append_strict:
902 options.append("append")
903 if forcedUpdate:
904 options.append("forcedUpdate")
905 if key_values:
906 assert isinstance(attrs, dict), "for keyValues attrs has to be a dict"
907 options.append("keyValues")
908 if options:
909 params.update({"options": ",".join(options)})
911 if key_values:
912 entity = ContextEntityKeyValues(id=entity_id, type=entity_type, **attrs)
913 else:
914 entity = ContextEntity(id=entity_id, type=entity_type)
915 entity.add_attributes(attrs)
916 # exclude commands from the send data,
917 # as they live in the IoTA-agent
918 excluded_keys = {"id", "type"}
919 # excluded_keys.update(
920 # entity.get_commands(response_format=PropertyFormat.DICT).keys()
921 # )
922 try:
923 res = self.post(
924 url=url,
925 headers=headers,
926 json=entity.model_dump(exclude=excluded_keys, exclude_none=True),
927 params=params,
928 )
929 if res.ok:
930 self.logger.info("Entity '%s' successfully " "updated!", entity.id)
931 else:
932 res.raise_for_status()
933 except requests.RequestException as err:
934 msg = f"Could not update or append attributes of entity" f" {entity.id} !"
935 self.log_error(err=err, msg=msg)
936 raise BaseHttpClientException(message=msg, response=err.response) from err
938 def update_existing_entity_attributes(
939 self,
940 entity_id: str,
941 attrs: Union[
942 List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any]
943 ],
944 entity_type: str = None,
945 forcedUpdate: bool = False,
946 override_metadata: bool = False,
947 key_values: bool = False,
948 ):
949 """
950 The entity attributes are updated with the ones in the payload.
951 In addition to that, if one or more attributes in the payload doesn't
952 exist in the entity, an error is returned. This corresponds to a
953 'PATCH' request.
955 Args:
956 entity_id: Entity id to be updated
957 entity_type: Entity type, to avoid ambiguity in case there are
958 several entities with the same entity id.
959 attrs: List of attributes to update or to append
960 forcedUpdate: Update operation have to trigger any matching
961 subscription, no matter if there is an actual attribute
962 update or no instead of the default behavior, which is to
963 updated only if attribute is effectively updated.
964 override_metadata:
965 Bool,replace the existing metadata with the one provided in
966 the request
967 key_values: By default False. If set to True, the payload uses
968 the keyValues simplified entity representation, i.e.
969 ContextEntityKeyValues.
970 Returns:
971 None
973 """
974 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
975 headers = self.headers.copy()
976 if entity_type:
977 params = {"type": entity_type}
978 else:
979 params = None
980 entity_type = "dummy"
982 options = []
983 if override_metadata:
984 options.append("overrideMetadata")
985 if forcedUpdate:
986 options.append("forcedUpdate")
987 if key_values:
988 assert isinstance(attrs, dict), "for keyValues the attrs must be dict"
989 payload = attrs
990 options.append("keyValues")
991 else:
992 entity = ContextEntity(id=entity_id, type=entity_type)
993 entity.add_attributes(attrs)
994 payload = entity.model_dump(exclude={"id", "type"}, exclude_none=True)
995 if options:
996 params.update({"options": ",".join(options)})
998 try:
999 res = self.patch(
1000 url=url,
1001 headers=headers,
1002 json=payload,
1003 params=params,
1004 )
1005 if res.ok:
1006 self.logger.info("Entity '%s' successfully " "updated!", entity_id)
1007 else:
1008 res.raise_for_status()
1009 except requests.RequestException as err:
1010 msg = f"Could not update attributes of entity"
1011 raise BaseHttpClientException(message=msg, response=err.response) from err
1013 def override_entity(
1014 self, entity: Union[ContextEntity, ContextEntityKeyValues], **kwargs
1015 ):
1016 """
1017 The request payload is an object representing the attributes to
1018 override the existing entity.
1020 Note:
1021 If you want to manipulate you should rather use patch_entity.
1023 Args:
1024 entity (ContextEntity or ContextEntityKeyValues):
1025 Returns:
1026 None
1027 """
1028 return self.replace_entity_attributes(
1029 entity_id=entity.id,
1030 entity_type=entity.type,
1031 attrs=entity.get_attributes(),
1032 **kwargs,
1033 )
1035 def replace_entity_attributes(
1036 self,
1037 entity_id: str,
1038 attrs: Union[
1039 List[Union[NamedContextAttribute, Dict[str, ContextAttribute]]], Dict
1040 ],
1041 entity_type: str = None,
1042 forcedUpdate: bool = False,
1043 key_values: bool = False,
1044 ):
1045 """
1046 The attributes previously existing in the entity are removed and
1047 replaced by the ones in the request. This corresponds to a 'PUT'
1048 request.
1050 Args:
1051 entity_id: Entity id to be updated
1052 entity_type: Entity type, to avoid ambiguity in case there are
1053 several entities with the same entity id.
1054 attrs: List of attributes to add to the entity or dict of
1055 attributes in case of key_values=True.
1056 forcedUpdate: Update operation have to trigger any matching
1057 subscription, no matter if there is an actual attribute
1058 update or no instead of the default behavior, which is to
1059 updated only if attribute is effectively updated.
1060 key_values(bool):
1061 By default False. If set to True, "options=keyValues" will
1062 be included in params of the request. The payload uses
1063 the keyValues simplified entity representation, i.e.
1064 ContextEntityKeyValues.
1065 Returns:
1066 None
1067 """
1068 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
1069 headers = self.headers.copy()
1070 params = {}
1071 options = []
1072 if entity_type:
1073 params.update({"type": entity_type})
1074 else:
1075 entity_type = "dummy"
1077 if forcedUpdate:
1078 options.append("forcedUpdate")
1080 if key_values:
1081 options.append("keyValues")
1082 assert isinstance(attrs, dict)
1083 else:
1084 entity = ContextEntity(id=entity_id, type=entity_type)
1085 entity.add_attributes(attrs)
1086 attrs = entity.model_dump(exclude={"id", "type"}, exclude_none=True)
1087 if options:
1088 params.update({"options": ",".join(options)})
1090 try:
1091 res = self.put(
1092 url=url,
1093 headers=headers,
1094 json=attrs,
1095 params=params,
1096 )
1097 if res.ok:
1098 self.logger.info("Entity '%s' successfully " "updated!", entity_id)
1099 else:
1100 res.raise_for_status()
1101 except requests.RequestException as err:
1102 msg = f"Could not replace attribute of entity {entity_id} !"
1103 raise BaseHttpClientException(message=msg, response=err.response) from err
1105 # Attribute operations
1106 def get_attribute(
1107 self,
1108 entity_id: str,
1109 attr_name: str,
1110 entity_type: str = None,
1111 metadata: str = None,
1112 response_format="",
1113 ) -> ContextAttribute:
1114 """
1115 Retrieves a specified attribute from an entity.
1117 Args:
1118 entity_id: Id of the entity. Example: Bcn_Welt
1119 attr_name: Name of the attribute to be retrieved.
1120 entity_type (Optional): Type of the entity to retrieve
1121 metadata (Optional): A list of metadata names to include in the
1122 response. See "Filtering out attributes and metadata" section in https://fiware-orion.readthedocs.io/en/master/orion-api.html#filtering-out-attributes-and-metadata
1123 for more detail.
1125 Returns:
1126 The content of the retrieved attribute as ContextAttribute
1128 Raises:
1129 Error
1131 """
1132 url = urljoin(
1133 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
1134 )
1135 headers = self.headers.copy()
1136 params = {}
1137 if entity_type:
1138 params.update({"type": entity_type})
1139 if metadata:
1140 params.update({"metadata": ",".join(metadata)})
1141 try:
1142 res = self.get(url=url, params=params, headers=headers)
1143 if res.ok:
1144 self.logger.debug("Received: %s", res.json())
1145 return ContextAttribute(**res.json())
1146 res.raise_for_status()
1147 except requests.RequestException as err:
1148 msg = (
1149 f"Could not load attribute '{attr_name}' from entity" f"'{entity_id}' "
1150 )
1151 raise BaseHttpClientException(message=msg, response=err.response) from err
1153 def update_entity_attribute(
1154 self,
1155 entity_id: str,
1156 attr: Union[ContextAttribute, NamedContextAttribute],
1157 *,
1158 entity_type: str = None,
1159 attr_name: str = None,
1160 override_metadata: bool = True,
1161 forcedUpdate: bool = False,
1162 ):
1163 """
1164 Updates a specified attribute from an entity.
1166 Args:
1167 attr:
1168 context attribute to update
1169 entity_id:
1170 Id of the entity. Example: Bcn_Welt
1171 entity_type:
1172 Entity type, to avoid ambiguity in case there are
1173 several entities with the same entity id.
1174 forcedUpdate: Update operation have to trigger any matching
1175 subscription, no matter if there is an actual attribute
1176 update or no instead of the default behavior, which is to
1177 updated only if attribute is effectively updated.
1178 attr_name:
1179 Name of the attribute to be updated.
1180 override_metadata:
1181 Bool, if set to `True` (default) the metadata will be
1182 overwritten. This is for backwards compatibility reasons.
1183 If `False` the metadata values will be either updated if
1184 already existing or append if not.
1185 See also:
1186 https://fiware-orion.readthedocs.io/en/master/user/metadata.html
1187 """
1188 headers = self.headers.copy()
1189 if not isinstance(attr, NamedContextAttribute):
1190 assert attr_name is not None, (
1191 "Missing name for attribute. "
1192 "attr_name must be present if"
1193 "attr is of type ContextAttribute"
1194 )
1195 else:
1196 assert attr_name is None, (
1197 "Invalid argument attr_name. Do not set "
1198 "attr_name if attr is of type "
1199 "NamedContextAttribute"
1200 )
1201 attr_name = attr.name
1203 url = urljoin(
1204 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
1205 )
1206 params = {}
1207 if entity_type:
1208 params.update({"type": entity_type})
1209 # set overrideMetadata option (we assure backwards compatibility here)
1210 options = []
1211 if override_metadata:
1212 options.append("overrideMetadata")
1213 if forcedUpdate:
1214 options.append("forcedUpdate")
1215 if options:
1216 params.update({"options": ",".join(options)})
1217 try:
1218 res = self.put(
1219 url=url,
1220 headers=headers,
1221 params=params,
1222 json=attr.model_dump(exclude={"name"}, exclude_none=True),
1223 )
1224 if res.ok:
1225 self.logger.info(
1226 "Attribute '%s' of '%s' " "successfully updated!",
1227 attr_name,
1228 entity_id,
1229 )
1230 else:
1231 res.raise_for_status()
1232 except requests.RequestException as err:
1233 msg = (
1234 f"Could not update attribute '{attr_name}' of entity" f"'{entity_id}' "
1235 )
1236 raise BaseHttpClientException(message=msg, response=err.response) from err
1238 def delete_entity_attribute(
1239 self, entity_id: str, attr_name: str, entity_type: str = None
1240 ) -> None:
1241 """
1242 Removes a specified attribute from an entity.
1244 Args:
1245 entity_id: Id of the entity.
1246 attr_name: Name of the attribute to be retrieved.
1247 entity_type: Entity type, to avoid ambiguity in case there are
1248 several entities with the same entity id.
1249 Raises:
1250 Error
1252 """
1253 url = urljoin(
1254 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
1255 )
1256 headers = self.headers.copy()
1257 params = {}
1258 if entity_type:
1259 params.update({"type": entity_type})
1260 try:
1261 res = self.delete(url=url, headers=headers)
1262 if res.ok:
1263 self.logger.info(
1264 "Attribute '%s' of '%s' " "successfully deleted!",
1265 attr_name,
1266 entity_id,
1267 )
1268 else:
1269 res.raise_for_status()
1270 except requests.RequestException as err:
1271 msg = f"Could not delete attribute '{attr_name}' of entity '{entity_id}'"
1272 raise BaseHttpClientException(message=msg, response=err.response) from err
1274 # Attribute value operations
1275 def get_attribute_value(
1276 self, entity_id: str, attr_name: str, entity_type: str = None
1277 ) -> Any:
1278 """
1279 This operation returns the value property with the value of the
1280 attribute.
1282 Args:
1283 entity_id: Id of the entity. Example: Bcn_Welt
1284 attr_name: Name of the attribute to be retrieved.
1285 Example: temperature.
1286 entity_type: Entity type, to avoid ambiguity in case there are
1287 several entities with the same entity id.
1289 Returns:
1291 """
1292 url = urljoin(
1293 self.base_url,
1294 f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value",
1295 )
1296 headers = self.headers.copy()
1297 params = {}
1298 if entity_type:
1299 params.update({"type": entity_type})
1300 try:
1301 res = self.get(url=url, params=params, headers=headers)
1302 if res.ok:
1303 self.logger.debug("Received: %s", res.json())
1304 return res.json()
1305 res.raise_for_status()
1306 except requests.RequestException as err:
1307 msg = (
1308 f"Could not load value of attribute '{attr_name}' from "
1309 f"entity'{entity_id}' "
1310 )
1311 raise BaseHttpClientException(message=msg, response=err.response) from err
1313 def update_attribute_value(
1314 self,
1315 *,
1316 entity_id: str,
1317 attr_name: str,
1318 value: Any,
1319 entity_type: str = None,
1320 forcedUpdate: bool = False,
1321 ):
1322 """
1323 Updates the value of a specified attribute of an entity
1325 Args:
1326 value: update value
1327 entity_id: Id of the entity. Example: Bcn_Welt
1328 attr_name: Name of the attribute to be retrieved.
1329 Example: temperature.
1330 entity_type: Entity type, to avoid ambiguity in case there are
1331 several entities with the same entity id.
1332 forcedUpdate: Update operation have to trigger any matching
1333 subscription, no matter if there is an actual attribute
1334 update or no instead of the default behavior, which is to
1335 updated only if attribute is effectively updated.
1336 Returns:
1338 """
1339 url = urljoin(
1340 self.base_url,
1341 f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value",
1342 )
1343 headers = self.headers.copy()
1344 params = {}
1345 if entity_type:
1346 params.update({"type": entity_type})
1347 options = []
1348 if forcedUpdate:
1349 options.append("forcedUpdate")
1350 if options:
1351 params.update({"options": ",".join(options)})
1352 try:
1353 if not isinstance(value, (dict, list)):
1354 headers.update({"Content-Type": "text/plain"})
1355 if isinstance(value, str):
1356 value = f"{value}"
1357 res = self.put(url=url, headers=headers, json=value, params=params)
1358 else:
1359 res = self.put(url=url, headers=headers, json=value, params=params)
1360 if res.ok:
1361 self.logger.info(
1362 "Attribute '%s' of '%s' " "successfully updated!",
1363 attr_name,
1364 entity_id,
1365 )
1366 else:
1367 res.raise_for_status()
1368 except requests.RequestException as err:
1369 msg = (
1370 f"Could not update value of attribute '{attr_name}' from "
1371 f"entity '{entity_id}' "
1372 )
1373 raise BaseHttpClientException(message=msg, response=err.response) from err
1375 # Types Operations
1376 def get_entity_types(
1377 self, *, limit: int = None, offset: int = None, options: str = None
1378 ) -> List[Dict[str, Any]]:
1379 """
1381 Args:
1382 limit: Limit the number of types to be retrieved.
1383 offset: Skip a number of records.
1384 options: Options dictionary. Allowed: count, values
1386 Returns:
1388 """
1389 url = urljoin(self.base_url, f"{self._url_version}/types")
1390 headers = self.headers.copy()
1391 params = {}
1392 if limit:
1393 params.update({"limit": limit})
1394 if offset:
1395 params.update({"offset": offset})
1396 if options:
1397 params.update({"options": options})
1398 try:
1399 res = self.get(url=url, params=params, headers=headers)
1400 if res.ok:
1401 self.logger.debug("Received: %s", res.json())
1402 return res.json()
1403 res.raise_for_status()
1404 except requests.RequestException as err:
1405 msg = "Could not load entity types!"
1406 raise BaseHttpClientException(message=msg, response=err.response) from err
1408 def get_entity_type(self, entity_type: str) -> Dict[str, Any]:
1409 """
1411 Args:
1412 entity_type: Entity Type. Example: Room
1414 Returns:
1416 """
1417 url = urljoin(self.base_url, f"{self._url_version}/types/{entity_type}")
1418 headers = self.headers.copy()
1419 params = {}
1420 try:
1421 res = self.get(url=url, params=params, headers=headers)
1422 if res.ok:
1423 self.logger.debug("Received: %s", res.json())
1424 return res.json()
1425 res.raise_for_status()
1426 except requests.RequestException as err:
1427 msg = f"Could not load entities of type" f"'{entity_type}' "
1428 raise BaseHttpClientException(message=msg, response=err.response) from err
1430 # SUBSCRIPTION API ENDPOINTS
1431 def get_subscription_list(self, limit: PositiveInt = inf) -> List[Subscription]:
1432 """
1433 Returns a list of all the subscriptions present in the system.
1434 Args:
1435 limit: Limit the number of subscriptions to be retrieved
1436 Returns:
1437 list of subscriptions
1438 """
1439 url = urljoin(self.base_url, f"{self._url_version}/subscriptions/")
1440 headers = self.headers.copy()
1441 params = {}
1443 # We always use the 'count' option to check weather pagination is
1444 # required
1445 params.update({"options": "count"})
1446 try:
1447 items = self.__pagination(
1448 limit=limit, url=url, params=params, headers=headers
1449 )
1450 adapter = TypeAdapter(List[Subscription])
1451 return adapter.validate_python(items)
1452 except requests.RequestException as err:
1453 msg = "Could not load subscriptions!"
1454 raise BaseHttpClientException(message=msg, response=err.response) from err
1456 def post_subscription(
1457 self,
1458 subscription: Subscription,
1459 update: bool = False,
1460 skip_initial_notification: bool = False,
1461 ) -> str:
1462 """
1463 Creates a new subscription. The subscription is represented by a
1464 Subscription object defined in filip.cb.models.
1466 If the subscription already exists, the adding is prevented and the id
1467 of the existing subscription is returned.
1469 A subscription is deemed as already existing if there exists a
1470 subscription with the exact same subject and notification fields. All
1471 optional fields are not considered.
1473 Args:
1474 subscription: Subscription
1475 update: True - If the subscription already exists, update it
1476 False- If the subscription already exists, throw warning
1477 skip_initial_notification: True - Initial Notifications will be
1478 sent to recipient containing the whole data. This is
1479 deprecated and removed from version 3.0 of the context broker.
1480 False - skip the initial notification
1481 Returns:
1482 str: Id of the (created) subscription
1484 """
1485 existing_subscriptions = self.get_subscription_list()
1487 sub_dict = subscription.model_dump(
1488 include={"subject", "notification"},
1489 exclude={
1490 "notification": {
1491 "lastSuccess": True,
1492 "lastFailure": True,
1493 "lastSuccessCode": True,
1494 "lastFailureReason": True,
1495 "mqtt": {"passwd"},
1496 "mqttCustom": {"passwd"},
1497 }
1498 },
1499 )
1501 for ex_sub in existing_subscriptions:
1502 if self._subscription_dicts_are_equal(
1503 sub_dict,
1504 ex_sub.model_dump(
1505 include={"subject", "notification"},
1506 exclude={
1507 "notification": {
1508 "lastSuccess": True,
1509 "lastFailure": True,
1510 "lastSuccessCode": True,
1511 "lastFailureReason": True,
1512 "mqtt": {"passwd"},
1513 "mqttCustom": {"passwd"},
1514 }
1515 },
1516 ),
1517 ):
1518 self.logger.info("Subscription already exists")
1519 if update:
1520 self.logger.info("Updated subscription")
1521 subscription.id = ex_sub.id
1522 self.update_subscription(subscription)
1523 else:
1524 warnings.warn(
1525 f"Subscription existed already with the id" f" {ex_sub.id}"
1526 )
1527 return ex_sub.id
1529 params = {}
1530 if skip_initial_notification:
1531 version = self.get_version()["orion"]["version"]
1532 if parse_version(version) <= parse_version("3.1"):
1533 params.update({"options": "skipInitialNotification"})
1534 else:
1535 pass
1536 warnings.warn(
1537 f"Skip initial notifications is a deprecated "
1538 f"feature of older versions <=3.1 of the context "
1539 f"broker. The Context Broker that you requesting has "
1540 f"version: {version}. For newer versions we "
1541 f"automatically skip this option. Consider "
1542 f"refactoring and updating your services",
1543 DeprecationWarning,
1544 )
1546 url = urljoin(self.base_url, "v2/subscriptions")
1547 headers = self.headers.copy()
1548 headers.update({"Content-Type": "application/json"})
1549 try:
1550 res = self.post(
1551 url=url,
1552 headers=headers,
1553 data=subscription.model_dump_json(
1554 exclude={
1555 "id": True,
1556 "notification": {
1557 "lastSuccess",
1558 "lastFailure",
1559 "lastSuccessCode",
1560 "lastFailureReason",
1561 },
1562 },
1563 exclude_none=True,
1564 ),
1565 params=params,
1566 )
1567 if res.ok:
1568 self.logger.info("Subscription successfully created!")
1569 return res.headers["Location"].split("/")[-1]
1570 res.raise_for_status()
1571 except requests.RequestException as err:
1572 msg = "Could not send subscription!"
1573 raise BaseHttpClientException(message=msg, response=err.response) from err
1575 def get_subscription(self, subscription_id: str) -> Subscription:
1576 """
1577 Retrieves a subscription from
1578 Args:
1579 subscription_id: id of the subscription
1581 Returns:
1583 """
1584 url = urljoin(
1585 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}"
1586 )
1587 headers = self.headers.copy()
1588 try:
1589 res = self.get(url=url, headers=headers)
1590 if res.ok:
1591 self.logger.debug("Received: %s", res.json())
1592 return Subscription(**res.json())
1593 res.raise_for_status()
1594 except requests.RequestException as err:
1595 msg = f"Could not load subscription {subscription_id}"
1596 raise BaseHttpClientException(message=msg, response=err.response) from err
1598 def update_subscription(
1599 self, subscription: Subscription, skip_initial_notification: bool = False
1600 ):
1601 """
1602 Only the fields included in the request are updated in the subscription.
1604 Args:
1605 subscription: Subscription to update
1606 skip_initial_notification: True - Initial Notifications will be
1607 sent to recipient containing the whole data. This is
1608 deprecated and removed from version 3.0 of the context broker.
1609 False - skip the initial notification
1611 Returns:
1612 None
1613 """
1614 params = {}
1615 if skip_initial_notification:
1616 version = self.get_version()["orion"]["version"]
1617 if parse_version(version) <= parse_version("3.1"):
1618 params.update({"options": "skipInitialNotification"})
1619 else:
1620 pass
1621 warnings.warn(
1622 f"Skip initial notifications is a deprecated "
1623 f"feature of older versions <3.1 of the context "
1624 f"broker. The Context Broker that you requesting has "
1625 f"version: {version}. For newer versions we "
1626 f"automatically skip this option. Consider "
1627 f"refactoring and updating your services",
1628 DeprecationWarning,
1629 )
1631 url = urljoin(
1632 self.base_url, f"{self._url_version}/subscriptions/{subscription.id}"
1633 )
1634 headers = self.headers.copy()
1635 headers.update({"Content-Type": "application/json"})
1636 try:
1637 res = self.patch(
1638 url=url,
1639 headers=headers,
1640 data=subscription.model_dump_json(
1641 exclude={
1642 "id": True,
1643 "notification": {
1644 "lastSuccess",
1645 "lastFailure",
1646 "lastSuccessCode",
1647 "lastFailureReason",
1648 },
1649 },
1650 exclude_none=True,
1651 ),
1652 )
1653 if res.ok:
1654 self.logger.info("Subscription successfully updated!")
1655 else:
1656 res.raise_for_status()
1657 except requests.RequestException as err:
1658 msg = f"Could not update subscription {subscription.id}"
1659 raise BaseHttpClientException(message=msg, response=err.response) from err
1661 def delete_subscription(self, subscription_id: str) -> None:
1662 """
1663 Deletes a subscription from a Context Broker
1664 Args:
1665 subscription_id: id of the subscription
1666 """
1667 url = urljoin(
1668 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}"
1669 )
1670 headers = self.headers.copy()
1671 try:
1672 res = self.delete(url=url, headers=headers)
1673 if res.ok:
1674 self.logger.info(
1675 f"Subscription '{subscription_id}' " f"successfully deleted!"
1676 )
1677 else:
1678 res.raise_for_status()
1679 except requests.RequestException as err:
1680 msg = f"Could not delete subscription {subscription_id}"
1681 raise BaseHttpClientException(message=msg, response=err.response) from err
1683 # Registration API
1684 def get_registration_list(self, *, limit: PositiveInt = None) -> List[Registration]:
1685 """
1686 Lists all the context provider registrations present in the system.
1688 Args:
1689 limit: Limit the number of registrations to be retrieved
1690 Returns:
1692 """
1693 url = urljoin(self.base_url, f"{self._url_version}/registrations/")
1694 headers = self.headers.copy()
1695 params = {}
1697 # We always use the 'count' option to check weather pagination is
1698 # required
1699 params.update({"options": "count"})
1700 try:
1701 items = self.__pagination(
1702 limit=limit, url=url, params=params, headers=headers
1703 )
1704 adapter = TypeAdapter(List[Registration])
1705 return adapter.validate_python(items)
1706 except requests.RequestException as err:
1707 msg = "Could not load registrations!"
1708 raise BaseHttpClientException(message=msg, response=err.response) from err
1710 def post_registration(self, registration: Registration):
1711 """
1712 Creates a new context provider registration. This is typically used
1713 for binding context sources as providers of certain data. The
1714 registration is represented by cb.models.Registration
1716 Args:
1717 registration (Registration):
1719 Returns:
1721 """
1722 url = urljoin(self.base_url, f"{self._url_version}/registrations")
1723 headers = self.headers.copy()
1724 headers.update({"Content-Type": "application/json"})
1725 try:
1726 res = self.post(
1727 url=url,
1728 headers=headers,
1729 data=registration.model_dump_json(exclude={"id"}, exclude_none=True),
1730 )
1731 if res.ok:
1732 self.logger.info("Registration successfully created!")
1733 return res.headers["Location"].split("/")[-1]
1734 res.raise_for_status()
1735 except requests.RequestException as err:
1736 msg = f"Could not send registration {registration.id}!"
1737 raise BaseHttpClientException(message=msg, response=err.response) from err
1739 def get_registration(self, registration_id: str) -> Registration:
1740 """
1741 Retrieves a registration from context broker by id
1743 Args:
1744 registration_id: id of the registration
1746 Returns:
1747 Registration
1748 """
1749 url = urljoin(
1750 self.base_url, f"{self._url_version}/registrations/{registration_id}"
1751 )
1752 headers = self.headers.copy()
1753 try:
1754 res = self.get(url=url, headers=headers)
1755 if res.ok:
1756 self.logger.debug("Received: %s", res.json())
1757 return Registration(**res.json())
1758 res.raise_for_status()
1759 except requests.RequestException as err:
1760 msg = f"Could not load registration {registration_id} !"
1761 raise BaseHttpClientException(message=msg, response=err.response) from err
1763 def add_valid_relationships(
1764 self, entities: List[ContextEntity]
1765 ) -> List[ContextEntity]:
1766 """
1767 Validate all attributes in the given entities. If the attribute value points to
1768 an existing entity, it is assumed that this attribute is a relationship, and it
1769 will be assigned with the attribute type "relationship"
1771 Args:
1772 entities: list of entities that need to be validated.
1774 Returns:
1775 updated entities
1776 """
1777 updated_entities = []
1778 for entity in entities[:]:
1779 for attr_name, attr_value in entity.model_dump(
1780 exclude={"id", "type"}
1781 ).items():
1782 if isinstance(attr_value, dict):
1783 if self.validate_relationship(attr_value):
1784 entity.update_attribute(
1785 {
1786 attr_name: ContextAttribute(
1787 **{
1788 "type": DataType.RELATIONSHIP,
1789 "value": attr_value.get("value"),
1790 }
1791 )
1792 }
1793 )
1794 updated_entities.append(entity)
1795 return updated_entities
1797 def remove_invalid_relationships(
1798 self, entities: List[ContextEntity], hard_remove: bool = True
1799 ) -> List[ContextEntity]:
1800 """
1801 Removes invalid relationships from the entities. An invalid relationship
1802 is a relationship that has no destination entity.
1804 Args:
1805 entities: list of entities that need to be validated.
1806 hard_remove: If True, invalid relationships will be deleted.
1807 If False, invalid relationships will be changed to Text
1808 attributes.
1810 Returns:
1811 updated entities
1812 """
1813 updated_entities = []
1814 for entity in entities[:]:
1815 for relationship in entity.get_relationships():
1816 if not self.validate_relationship(relationship):
1817 if hard_remove:
1818 entity.delete_attributes(attrs=[relationship])
1819 else:
1820 # change the attribute type to "Text"
1821 entity.update_attribute(
1822 attrs=[
1823 NamedContextAttribute(
1824 name=relationship.name,
1825 type=DataType.TEXT,
1826 value=relationship.value,
1827 )
1828 ]
1829 )
1830 updated_entities.append(entity)
1831 return updated_entities
1833 def validate_relationship(
1834 self, relationship: Union[NamedContextAttribute, ContextAttribute, Dict]
1835 ) -> bool:
1836 """
1837 Validates a relationship. A relationship is valid if it points to an existing
1838 entity. Otherwise, it is considered invalid
1840 Args:
1841 relationship: relationship to validate
1842 Returns
1843 True if the relationship is valid, False otherwise
1844 """
1845 if isinstance(relationship, NamedContextAttribute) or isinstance(
1846 relationship, ContextAttribute
1847 ):
1848 destination_id = relationship.value
1849 elif isinstance(relationship, dict):
1850 _sentinel = object()
1851 destination_id = relationship.get("value", _sentinel)
1852 if destination_id is _sentinel:
1853 raise ValueError(
1854 "Invalid relationship dictionary format\n"
1855 "Expected format: {"
1856 f'"type": "{DataType.RELATIONSHIP.value}", '
1857 '"value" "entity_id"}'
1858 )
1859 else:
1860 raise ValueError("Invalid relationship type.")
1861 try:
1862 destination_entity = self.get_entity(entity_id=destination_id)
1863 return destination_entity.id == destination_id
1864 except requests.RequestException as err:
1865 if err.response.status_code == 404:
1866 return False
1868 def update_registration(self, registration: Registration):
1869 """
1870 Only the fields included in the request are updated in the registration.
1872 Args:
1873 registration: Registration to update
1874 Returns:
1876 """
1877 url = urljoin(
1878 self.base_url, f"{self._url_version}/registrations/{registration.id}"
1879 )
1880 headers = self.headers.copy()
1881 headers.update({"Content-Type": "application/json"})
1882 try:
1883 res = self.patch(
1884 url=url,
1885 headers=headers,
1886 data=registration.model_dump_json(exclude={"id"}, exclude_none=True),
1887 )
1888 if res.ok:
1889 self.logger.info("Registration successfully updated!")
1890 else:
1891 res.raise_for_status()
1892 except requests.RequestException as err:
1893 msg = f"Could not update registration {registration.id} !"
1894 raise BaseHttpClientException(message=msg, response=err.response) from err
1896 def delete_registration(self, registration_id: str) -> None:
1897 """
1898 Deletes a subscription from a Context Broker
1899 Args:
1900 registration_id: id of the subscription
1901 """
1902 url = urljoin(
1903 self.base_url, f"{self._url_version}/registrations/{registration_id}"
1904 )
1905 headers = self.headers.copy()
1906 try:
1907 res = self.delete(url=url, headers=headers)
1908 if res.ok:
1909 self.logger.info(
1910 "Registration '%s' " "successfully deleted!", registration_id
1911 )
1912 res.raise_for_status()
1913 except requests.RequestException as err:
1914 msg = f"Could not delete registration {registration_id} !"
1915 raise BaseHttpClientException(message=msg, response=err.response) from err
1917 # Batch operation API
1918 def update(
1919 self,
1920 *,
1921 entities: List[Union[ContextEntity, ContextEntityKeyValues]],
1922 action_type: Union[ActionType, str],
1923 update_format: str = None,
1924 forcedUpdate: bool = False,
1925 override_metadata: bool = False,
1926 ) -> None:
1927 """
1928 This operation allows to create, update and/or delete several entities
1929 in a single batch operation.
1931 This operation is split in as many individual operations as entities
1932 in the entities vector, so the actionType is executed for each one of
1933 them. Depending on the actionType, a mapping with regular non-batch
1934 operations can be done:
1936 append: maps to POST /v2/entities (if the entity does not already exist)
1937 or POST /v2/entities/<id>/attrs (if the entity already exists).
1939 appendStrict: maps to POST /v2/entities (if the entity does not
1940 already exist) or POST /v2/entities/<id>/attrs?options=append (if the
1941 entity already exists).
1943 update: maps to PATCH /v2/entities/<id>/attrs.
1945 delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every
1946 attribute included in the entity or to DELETE /v2/entities/<id> if
1947 no attribute were included in the entity.
1949 replace: maps to PUT /v2/entities/<id>/attrs.
1951 Args:
1952 entities: "an array of entities, each entity specified using the "
1953 "JSON entity representation format "
1954 action_type (Update): "actionType, to specify the kind of update
1955 action to do: either append, appendStrict, update, delete,
1956 or replace. "
1957 update_format (str): Optional 'keyValues'
1958 forcedUpdate: Update operation have to trigger any matching
1959 subscription, no matter if there is an actual attribute
1960 update or no instead of the default behavior, which is to
1961 updated only if attribute is effectively updated.
1962 override_metadata:
1963 Bool, replace the existing metadata with the one provided in
1964 the request
1965 Returns:
1967 """
1969 url = urljoin(self.base_url, f"{self._url_version}/op/update")
1970 headers = self.headers.copy()
1971 headers.update({"Content-Type": "application/json"})
1972 params = {}
1973 options = []
1974 if override_metadata:
1975 options.append("overrideMetadata")
1976 if forcedUpdate:
1977 options.append("forcedUpdate")
1978 if update_format:
1979 assert (
1980 update_format == AttrsFormat.KEY_VALUES.value
1981 ), "Only 'keyValues' is allowed as update format"
1982 options.append("keyValues")
1983 if options:
1984 params.update({"options": ",".join(options)})
1985 update = Update(actionType=action_type, entities=entities)
1986 try:
1987 res = self.post(
1988 url=url,
1989 headers=headers,
1990 params=params,
1991 json=update.model_dump(by_alias=True),
1992 )
1993 if res.ok:
1994 self.logger.info("Update operation '%s' succeeded!", action_type)
1995 else:
1996 res.raise_for_status()
1997 except requests.RequestException as err:
1998 msg = f"Update operation '{action_type}' failed!"
1999 raise BaseHttpClientException(message=msg, response=err.response) from err
2001 def query(
2002 self,
2003 *,
2004 query: Query,
2005 limit: PositiveInt = None,
2006 order_by: str = None,
2007 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
2008 ) -> List[Any]:
2009 """
2010 Generate api query
2011 Args:
2012 query (Query):
2013 limit (PositiveInt):
2014 order_by (str):
2015 response_format (AttrsFormat, str):
2016 Returns:
2017 The response payload is an Array containing one object per matching
2018 entity, or an empty array [] if no entities are found. The entities
2019 follow the JSON entity representation format (described in the
2020 section "JSON Entity Representation").
2021 """
2022 url = urljoin(self.base_url, f"{self._url_version}/op/query")
2023 headers = self.headers.copy()
2024 headers.update({"Content-Type": "application/json"})
2025 params = {"options": "count"}
2027 if response_format:
2028 if response_format not in list(AttrsFormat):
2029 raise ValueError(f"Value must be in {list(AttrsFormat)}")
2030 params["options"] = ",".join([response_format, "count"])
2031 try:
2032 items = self.__pagination(
2033 method=PaginationMethod.POST,
2034 url=url,
2035 headers=headers,
2036 params=params,
2037 data=query.model_dump_json(exclude_none=True),
2038 limit=limit,
2039 )
2040 if response_format == AttrsFormat.NORMALIZED:
2041 adapter = TypeAdapter(List[ContextEntity])
2042 return adapter.validate_python(items)
2043 if response_format == AttrsFormat.KEY_VALUES:
2044 adapter = TypeAdapter(List[ContextEntityKeyValues])
2045 return adapter.validate_python(items)
2046 return items
2047 except requests.RequestException as err:
2048 msg = "Query operation failed!"
2049 raise BaseHttpClientException(message=msg, response=err.response) from err
2051 def notify(self, message: Message) -> None:
2052 """
2053 This operation is intended to consume a notification payload so that
2054 all the entity data included by such notification is persisted,
2055 overwriting if necessary. This operation is useful when one NGSIv2
2056 endpoint is subscribed to another NGSIv2 endpoint (federation
2057 scenarios). The request payload must be an NGSIv2 notification
2058 payload. The behaviour must be exactly the same as 'update'
2059 with 'action_type' equal to append.
2061 Args:
2062 message: Notification message
2064 Returns:
2065 None
2066 """
2067 url = urljoin(self.base_url, "v2/op/notify")
2068 headers = self.headers.copy()
2069 headers.update({"Content-Type": "application/json"})
2070 params = {}
2071 try:
2072 res = self.post(
2073 url=url,
2074 headers=headers,
2075 params=params,
2076 data=message.model_dump_json(by_alias=True),
2077 )
2078 if res.ok:
2079 self.logger.info("Notification message sent!")
2080 else:
2081 res.raise_for_status()
2082 except requests.RequestException as err:
2083 msg = (
2084 f"Sending notifcation message failed! \n "
2085 f"{message.model_dump_json(indent=2)}"
2086 )
2087 raise BaseHttpClientException(message=msg, response=err.response) from err
2089 def post_command(
2090 self,
2091 *,
2092 entity_id: str,
2093 command: Union[Command, NamedCommand, Dict],
2094 entity_type: str = None,
2095 command_name: str = None,
2096 ) -> None:
2097 """
2098 Post a command to a context entity this corresponds to 'PATCH' of the
2099 specified command attribute.
2101 Args:
2102 entity_id: Entity identifier
2103 command: Command
2104 entity_type: Entity type
2105 command_name: Name of the command in the entity
2107 Returns:
2108 None
2109 """
2110 if command_name:
2111 assert isinstance(command, (Command, dict))
2112 if isinstance(command, dict):
2113 command = Command(**command)
2114 command = {command_name: command.model_dump()}
2115 else:
2116 assert isinstance(command, (NamedCommand, dict))
2117 if isinstance(command, dict):
2118 command = NamedCommand(**command)
2120 self.update_existing_entity_attributes(
2121 entity_id=entity_id, entity_type=entity_type, attrs=[command]
2122 )
2124 def does_entity_exist(self, entity_id: str, entity_type: str) -> bool:
2125 """
2126 Test if an entity with given id and type is present in the CB
2128 Args:
2129 entity_id: Entity id
2130 entity_type: Entity type
2132 Returns:
2133 bool; True if entity exists
2135 Raises:
2136 RequestException, if any error occurs (e.g: No Connection),
2137 except that the entity is not found
2138 """
2139 url = urljoin(self.base_url, f"v2/entities/{entity_id}")
2140 headers = self.headers.copy()
2141 params = {"type": entity_type}
2143 try:
2144 res = self.get(url=url, params=params, headers=headers)
2145 if res.ok:
2146 return True
2147 res.raise_for_status()
2148 except requests.RequestException as err:
2149 if err.response is None or not err.response.status_code == 404:
2150 self.log_error(err=err, msg="Checking entity existence failed!")
2151 raise
2152 return False
2154 def patch_entity(
2155 self,
2156 entity: Union[ContextEntity, ContextEntityKeyValues],
2157 key_values: bool = False,
2158 forcedUpdate: bool = False,
2159 override_metadata: bool = False,
2160 ) -> None:
2161 """
2162 Takes a given entity and updates the state in the CB to match it.
2163 It is an extended equivalent to the HTTP method PATCH, which applies
2164 partial modifications to a resource.
2166 Args:
2167 entity: Entity to update
2168 key_values: If True, the entity is updated in key-values format.
2169 forcedUpdate: Update operation have to trigger any matching
2170 subscription, no matter if there is an actual attribute
2171 update or no instead of the default behavior, which is to
2172 updated only if attribute is effectively updated.
2173 override_metadata: If True, the existing metadata of the entity
2174 is replaced
2175 Returns:
2176 None
2177 """
2178 attributes = entity.get_attributes()
2180 self.update_existing_entity_attributes(
2181 entity_id=entity.id,
2182 entity_type=entity.type,
2183 attrs=attributes,
2184 key_values=key_values,
2185 forcedUpdate=forcedUpdate,
2186 override_metadata=override_metadata,
2187 )
2189 @staticmethod
2190 def compare_lists_ignore_order(list_a, list_b):
2191 """
2192 Compares two lists ignoring order.
2193 Handles unhashable types like dictionaries and mixed types.
2194 """
2195 # 1. Quick check: if lengths differ, they are not equal
2196 if len(list_a) != len(list_b):
2197 return False
2199 # 2. Define a helper to create a comparable string signature for any item
2200 def get_canonical_key(item):
2201 # For lists, we recursively sort them so nested lists are also order-independent
2202 if isinstance(item, list):
2203 return json.dumps(sorted(item, key=lambda x: get_canonical_key(x)))
2205 # For everything else (int, str, dict, obj), just use the string representation
2206 return str(item)
2208 # 3. Sort both lists using the canonical key and compare
2209 return sorted(list_a, key=get_canonical_key) == sorted(
2210 list_b, key=get_canonical_key
2211 )
2213 def _subscription_dicts_are_equal(self, first: dict, second: dict):
2214 """
2215 Check if two dictionaries and all sub-dictionaries are equal.
2216 Logs a warning if the keys are not equal, but ignores the
2217 comparison of such keys.
2219 Args:
2220 first dict: Dictionary of first subscription
2221 second dict: Dictionary of second subscription
2223 Returns:
2224 True if equal, else False
2225 """
2227 def _value_is_not_none(value):
2228 """
2229 Recursive function to check if a value equals none.
2230 If the value is a dict and any value of the dict is not none,
2231 the value is not none.
2232 If the value is a list and any item is not none, the value is not none.
2233 If it's neither dict nore list, bool is used.
2234 """
2235 if isinstance(value, dict):
2236 return any([_value_is_not_none(value=_v) for _v in value.values()])
2237 if isinstance(value, list):
2238 return any([_value_is_not_none(value=_v) for _v in value])
2239 else:
2240 return bool(value)
2242 if first.keys() != second.keys():
2243 warnings.warn(
2244 "Subscriptions contain a different set of fields. "
2245 "Only comparing to new fields of the new one."
2246 )
2247 for k, v in first.items():
2248 ex_value = second.get(k, None)
2249 if isinstance(v, dict) and isinstance(ex_value, dict):
2250 equal = self._subscription_dicts_are_equal(v, ex_value)
2251 if equal:
2252 continue
2253 else:
2254 return False
2255 elif isinstance(v, list) and isinstance(ex_value, list):
2256 equal = self.compare_lists_ignore_order(v, ex_value)
2257 if equal:
2258 continue
2259 else:
2260 return False
2261 else:
2262 equal = v == ex_value
2263 if equal:
2264 continue
2265 else:
2266 self.logger.debug(
2267 f"Not equal fields for key {k}: ({v}, {ex_value})"
2268 )
2269 if (
2270 not _value_is_not_none(v)
2271 and not _value_is_not_none(ex_value)
2272 or k == "timesSent"
2273 ):
2274 continue
2275 return False
2276 return True