Coverage for filip/clients/ngsi_v2/cb.py: 80%
686 statements
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-26 14:36 +0000
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-26 14:36 +0000
1"""
2Context Broker Module for API Client
3"""
5from __future__ import annotations
7import copy
8from copy import deepcopy
9from enum import Enum
10from math import inf
11from packaging.version import parse as parse_version
12from packaging import version
13from pydantic import PositiveInt, PositiveFloat, AnyHttpUrl, ValidationError
14from pydantic.type_adapter import TypeAdapter
15from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union
16import re
17import requests
18from urllib.parse import urljoin
19import warnings
20from requests import RequestException
21from filip.clients.base_http_client import BaseHttpClient, NgsiURLVersion
22from filip.config import settings
23from filip.models.base import FiwareHeader, PaginationMethod, DataType
24from filip.utils.simple_ql import QueryString
25from filip.models.ngsi_v2.context import (
26 ActionType,
27 Command,
28 ContextEntity,
29 ContextEntityKeyValues,
30 ContextAttribute,
31 NamedCommand,
32 NamedContextAttribute,
33 Query,
34 Update,
35 PropertyFormat,
36 ContextEntityList,
37 ContextEntityKeyValuesList,
38 ContextEntityValidationList,
39 ContextEntityKeyValuesValidationList,
40)
41from filip.models.ngsi_v2.base import AttrsFormat
42from filip.models.ngsi_v2.subscriptions import Subscription, Message
43from filip.models.ngsi_v2.registrations import Registration
44from filip.clients.exceptions import BaseHttpClientException
46if TYPE_CHECKING:
47 from filip.clients.ngsi_v2.iota import IoTAClient
50class ContextBrokerClient(BaseHttpClient):
51 """
52 Implementation of NGSI Context Broker functionalities, such as creating
53 entities and subscriptions; retrieving, updating and deleting data.
54 Further documentation:
55 https://fiware-orion.readthedocs.io/en/master/
57 Api specifications for v2 are located here:
58 https://telefonicaid.github.io/fiware-orion/api/v2/stable/
60 Note:
61 We use the reference implementation for development. Therefore, some
62 other brokers may show slightly different behavior!
63 """
65 def __init__(
66 self,
67 url: str = None,
68 *,
69 session: requests.Session = None,
70 fiware_header: FiwareHeader = None,
71 **kwargs,
72 ):
73 """
75 Args:
76 url: Url of context broker server
77 session (requests.Session):
78 fiware_header (FiwareHeader): fiware service and fiware service path
79 **kwargs (Optional): Optional arguments that ``request`` takes.
80 """
81 # set service url
82 url = url or settings.CB_URL
83 self._url_version = NgsiURLVersion.v2_url.value
84 super().__init__(
85 url=url, session=session, fiware_header=fiware_header, **kwargs
86 )
87 self._check_correct_cb_version()
89 def __pagination(
90 self,
91 *,
92 method: PaginationMethod = PaginationMethod.GET,
93 url: str,
94 headers: Dict,
95 limit: Union[PositiveInt, PositiveFloat] = None,
96 params: Dict = None,
97 data: str = None,
98 ) -> List[Dict]:
99 """
100 NGSIv2 implements a pagination mechanism in order to help clients to
101 retrieve large sets of resources. This mechanism works for all listing
102 operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions,
103 POST /v2/op/query, etc.). This function helps getting datasets that are
104 larger than the limit for the different GET operations.
106 https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html
108 Args:
109 url: Information about the url, obtained from the original function
110 headers: The headers from the original function
111 params:
112 limit:
114 Returns:
115 object:
117 """
118 if limit is None:
119 limit = inf
120 if limit > 1000:
121 params["limit"] = 1000 # maximum items per request
122 else:
123 params["limit"] = limit
125 if self.session:
126 session = self.session
127 else:
128 session = requests.Session()
129 with session:
130 res = session.request(
131 method=method, url=url, params=params, headers=headers, data=data
132 )
133 if res.ok:
134 items = res.json()
135 # do pagination
136 count = int(res.headers["Fiware-Total-Count"])
138 while len(items) < limit and len(items) < count:
139 # Establishing the offset from where entities are retrieved
140 params["offset"] = len(items)
141 params["limit"] = min(1000, (limit - len(items)))
142 res = session.request(
143 method=method,
144 url=url,
145 params=params,
146 headers=headers,
147 data=data,
148 )
149 if res.ok:
150 items.extend(res.json())
151 else:
152 res.raise_for_status()
153 self.logger.debug("Received: %s", items)
154 return items
155 res.raise_for_status()
157 # MANAGEMENT API
158 def get_version(self) -> Dict:
159 """
160 Gets version of IoT Agent
161 Returns:
162 Dictionary with response
163 """
164 url = urljoin(self.base_url, "version")
165 try:
166 res = self.get(url=url, headers=self.headers)
167 if res.ok:
168 return res.json()
169 res.raise_for_status()
170 except requests.RequestException as err:
171 self.logger.error(err)
172 msg = f"Fetch version fails, reason: {err.args}"
173 raise BaseHttpClientException(message=msg, response=err.response) from err
175 def _check_correct_cb_version(self) -> None:
176 """
177 Checks whether the used Orion version is greater or equal than the minimum required orion version of
178 the current filip version
179 """
180 orion_version = self.get_version()["orion"]["version"]
181 if version.parse(orion_version) < version.parse(settings.MINIMUM_ORION_VERSION):
182 self.logger.warning(
183 f"You are using orion version {orion_version}. There was a breaking change in Orion Version "
184 f"{settings.MINIMUM_ORION_VERSION}, therefore functionality is not assured when using "
185 f"version {orion_version}."
186 )
188 def get_resources(self) -> Dict:
189 """
190 Gets reo
192 Returns:
193 Dict
194 """
195 url = urljoin(self.base_url, self._url_version)
196 try:
197 res = self.get(url=url, headers=self.headers)
198 if res.ok:
199 return res.json()
200 res.raise_for_status()
201 except requests.RequestException as err:
202 self.logger.error(err)
203 raise RequestException(response=err.response) from err
205 # STATISTICS API
206 def get_statistics(self) -> Dict:
207 """
208 Gets statistics of context broker
209 Returns:
210 Dictionary with response
211 """
212 url = urljoin(self.base_url, "statistics")
213 try:
214 res = self.get(url=url, headers=self.headers)
215 if res.ok:
216 return res.json()
217 res.raise_for_status()
218 except requests.RequestException as err:
219 self.logger.error(err)
220 raise BaseHttpClientException(
221 message=err.response.text, response=err.response
222 ) from err
224 # CONTEXT MANAGEMENT API ENDPOINTS
225 # Entity Operations
226 def post_entity(
227 self,
228 entity: Union[ContextEntity, ContextEntityKeyValues],
229 update: bool = False,
230 patch: bool = False,
231 override_metadata: bool = True,
232 key_values: bool = False,
233 ):
234 """
235 Function registers an Object with the NGSI Context Broker,
236 if it already exists it can be automatically updated (overwritten)
237 if the update bool is True.
238 First a post request with the entity is tried, if the response code
239 is 422 the entity is uncrossable, as it already exists there are two
240 options, either overwrite it, if the attribute have changed
241 (e.g. at least one new/new values) (update = True) or leave
242 it the way it is (update=False)
243 If you only want to manipulate the entities values, you need to set
244 patch argument.
246 Args:
247 entity (ContextEntity/ContextEntityKeyValues):
248 Context Entity Object
249 update (bool):
250 If the response.status_code is 422, whether the override and
251 existing entity
252 patch (bool):
253 If the response.status_code is 422, whether to manipulate the
254 existing entity. Omitted if update `True`.
255 override_metadata:
256 Only applies for patch equal to `True`.
257 Whether to override or append the attribute's metadata.
258 `True` for overwrite or `False` for update/append
259 key_values(bool):
260 By default False. If set to True, "options=keyValues" will
261 be included in params of post request. The payload uses
262 the keyValues simplified entity representation, i.e.
263 ContextEntityKeyValues.
264 """
265 url = urljoin(self.base_url, f"{self._url_version}/entities")
266 headers = self.headers.copy()
267 params = {}
268 options = []
269 if key_values:
270 assert isinstance(entity, ContextEntityKeyValues)
271 options.append("keyValues")
272 else:
273 assert isinstance(entity, ContextEntity)
274 if options:
275 params.update({"options": ",".join(options)})
276 try:
277 res = self.post(
278 url=url,
279 headers=headers,
280 json=entity.model_dump(exclude_none=True),
281 params=params,
282 )
283 if res.ok:
284 self.logger.info("Entity successfully posted!")
285 return res.headers.get("Location")
286 res.raise_for_status()
287 except requests.RequestException as err:
288 if err.response is not None:
289 if update and err.response.status_code == 422:
290 return self.override_entity(entity=entity, key_values=key_values)
291 if patch and err.response.status_code == 422:
292 return self.patch_entity(
293 entity=entity,
294 override_metadata=override_metadata,
295 key_values=key_values,
296 )
297 msg = f"Could not post entity {entity.id}"
298 raise BaseHttpClientException(message=msg, response=err.response) from err
300 def get_entity_list(
301 self,
302 *,
303 entity_ids: List[str] = None,
304 entity_types: List[str] = None,
305 id_pattern: str = None,
306 type_pattern: str = None,
307 q: Union[str, QueryString] = None,
308 mq: Union[str, QueryString] = None,
309 georel: str = None,
310 geometry: str = None,
311 coords: str = None,
312 limit: PositiveInt = inf,
313 attrs: List[str] = None,
314 metadata: str = None,
315 order_by: str = None,
316 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
317 include_invalid: bool = False,
318 ) -> Union[
319 List[Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]],
320 ContextEntityValidationList,
321 ContextEntityKeyValuesValidationList,
322 ]:
323 r"""
324 Retrieves a list of context entities that match different criteria by
325 id, type, pattern matching (either id or type) and/or those which
326 match a query or geographical query (see Simple Query Language and
327 Geographical Queries). A given entity has to match all the criteria
328 to be retrieved (i.e., the criteria is combined in a logical AND
329 way). Note that pattern matching query parameters are incompatible
330 (i.e. mutually exclusive) with their corresponding exact matching
331 parameters, i.e. idPattern with id and typePattern with type.
333 Args:
334 entity_ids: A comma-separated list of elements. Retrieve entities
335 whose ID matches one of the elements in the list.
336 Incompatible with idPattern,e.g. Boe_Idarium
337 entity_types: comma-separated list of elements. Retrieve entities
338 whose type matches one of the elements in the list.
339 Incompatible with typePattern. Example: Room.
340 id_pattern: A correctly formatted regular expression. Retrieve
341 entities whose ID matches the regular expression. Incompatible
342 with id, e.g. ngsi-ld.* or sensor.*
343 type_pattern: A correctly formatted regular expression. Retrieve
344 entities whose type matches the regular expression.
345 Incompatible with type, e.g. room.*
346 q (SimpleQuery): A query expression, composed of a list of
347 statements separated by ;, i.e.,
348 q=statement1;statement2;statement3. See Simple Query
349 Language specification. Example: temperature>40.
350 mq (SimpleQuery): A query expression for attribute metadata,
351 composed of a list of statements separated by ;, i.e.,
352 mq=statement1;statement2;statement3. See Simple Query
353 Language specification. Example: temperature.accuracy<0.9.
354 georel: Spatial relationship between matching entities and a
355 reference shape. See Geographical Queries. Example: 'near'.
356 geometry: Geographical area to which the query is restricted.
357 See Geographical Queries. Example: point.
358 coords: List of latitude-longitude pairs of coordinates separated
359 by ';'. See Geographical Queries. Example: 41.390205,
360 2.154007;48.8566,2.3522.
361 limit: Limits the number of entities to be retrieved Example: 20
362 attrs: Comma-separated list of attribute names whose data are to
363 be included in the response. The attributes are retrieved in
364 the order specified by this parameter. If this parameter is
365 not included, the attributes are retrieved in arbitrary
366 order. See "Filtering out attributes and metadata" section
367 for more detail. Example: seatNumber.
368 metadata: A list of metadata names to include in the response.
369 See "Filtering out attributes and metadata" section for more
370 detail. Example: accuracy.
371 order_by: Criteria for ordering results. See "Ordering Results"
372 section for details. Example: temperature,!speed.
373 response_format (AttrsFormat, str): Response Format. Note: That if
374 'keyValues' or 'values' are used the response model will
375 change to List[ContextEntityKeyValues] and to List[Dict[str,
376 Any]], respectively.
377 include_invalid: Specify if the returned list should also contain a list of invalid entity IDs or not.
378 Returns:
380 """
381 url = urljoin(self.base_url, f"{self._url_version}/entities/")
382 headers = self.headers.copy()
383 params = {}
385 if entity_ids and id_pattern:
386 raise ValueError
387 if entity_types and type_pattern:
388 raise ValueError
389 if entity_ids:
390 if not isinstance(entity_ids, list):
391 entity_ids = [entity_ids]
392 params.update({"id": ",".join(entity_ids)})
393 if id_pattern:
394 try:
395 re.compile(id_pattern)
396 except re.error as err:
397 raise ValueError(f"Invalid Pattern: {err}") from err
398 params.update({"idPattern": id_pattern})
399 if entity_types:
400 if not isinstance(entity_types, list):
401 entity_types = [entity_types]
402 params.update({"type": ",".join(entity_types)})
403 if type_pattern:
404 try:
405 re.compile(type_pattern)
406 except re.error as err:
407 raise ValueError(f"Invalid Pattern: {err.msg}") from err
408 params.update({"typePattern": type_pattern})
409 if attrs:
410 params.update({"attrs": ",".join(attrs)})
411 if metadata:
412 params.update({"metadata": ",".join(metadata)})
413 if q:
414 if isinstance(q, str):
415 q = QueryString.parse_str(q)
416 params.update({"q": str(q)})
417 if mq:
418 params.update({"mq": str(mq)})
419 if geometry:
420 params.update({"geometry": geometry})
421 if georel:
422 params.update({"georel": georel})
423 if coords:
424 params.update({"coords": coords})
425 if order_by:
426 params.update({"orderBy": order_by})
427 if response_format not in list(AttrsFormat):
428 raise ValueError(f"Value must be in {list(AttrsFormat)}")
429 response_format = ",".join(["count", response_format])
430 params.update({"options": response_format})
431 try:
432 items = self.__pagination(
433 method=PaginationMethod.GET,
434 limit=limit,
435 url=url,
436 params=params,
437 headers=headers,
438 )
439 if include_invalid:
440 valid_entities = []
441 invalid_entities = []
443 if AttrsFormat.NORMALIZED in response_format:
444 adapter = TypeAdapter(ContextEntity)
446 for entity in items:
447 try:
448 valid_entity = adapter.validate_python(entity)
449 valid_entities.append(valid_entity)
450 except ValidationError:
451 invalid_entities.append(entity.get("id"))
453 return ContextEntityValidationList.model_validate(
454 {
455 "entities": valid_entities,
456 "invalid_entities": invalid_entities,
457 }
458 )
459 elif AttrsFormat.KEY_VALUES in response_format:
460 adapter = TypeAdapter(ContextEntityKeyValues)
462 for entity in items:
463 try:
464 valid_entity = adapter.validate_python(entity)
465 valid_entities.append(valid_entity)
466 except ValidationError:
467 invalid_entities.append(entity.get("id"))
469 return ContextEntityKeyValuesValidationList.model_validate(
470 {
471 "entities": valid_entities,
472 "invalid_entities": invalid_entities,
473 }
474 )
475 else:
476 return items
477 else:
478 if AttrsFormat.NORMALIZED in response_format:
479 return ContextEntityList.model_validate(
480 {"entities": items}
481 ).entities
482 elif AttrsFormat.KEY_VALUES in response_format:
483 return ContextEntityKeyValuesList.model_validate(
484 {"entities": items}
485 ).entities
486 return items # in case of VALUES as response_format
488 except requests.RequestException as err:
489 msg = "Could not load entities"
490 raise BaseHttpClientException(message=msg, response=err.response) from err
492 def get_entity(
493 self,
494 entity_id: str,
495 entity_type: str = None,
496 attrs: List[str] = None,
497 metadata: List[str] = None,
498 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
499 ) -> Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]:
500 """
501 This operation must return one entity element only, but there may be
502 more than one entity with the same ID (e.g. entities with same ID but
503 different types). In such case, an error message is returned, with
504 the HTTP status code set to 409 Conflict.
506 Args:
507 entity_id (String): Id of the entity to be retrieved
508 entity_type (String): Entity type, to avoid ambiguity in case
509 there are several entities with the same entity id.
510 attrs (List of Strings): List of attribute names whose data must be
511 included in the response. The attributes are retrieved in the
512 order specified by this parameter.
513 See "Filtering out attributes and metadata" section for more
514 detail. If this parameter is not included, the attributes are
515 retrieved in arbitrary order, and all the attributes of the
516 entity are included in the response.
517 Example: temperature, humidity.
518 metadata (List of Strings): A list of metadata names to include in
519 the response. See "Filtering out attributes and metadata"
520 section for more detail. Example: accuracy.
521 response_format (AttrsFormat, str): Representation format of
522 response
523 Returns:
524 ContextEntity
525 """
526 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}")
527 headers = self.headers.copy()
528 params = {}
529 if entity_type:
530 params.update({"type": entity_type})
531 if attrs:
532 params.update({"attrs": ",".join(attrs)})
533 if metadata:
534 params.update({"metadata": ",".join(metadata)})
535 if response_format not in list(AttrsFormat):
536 raise ValueError(f"Value must be in {list(AttrsFormat)}")
537 params.update({"options": response_format})
539 try:
540 res = self.get(url=url, params=params, headers=headers)
541 if res.ok:
542 self.logger.info("Entity successfully retrieved!")
543 self.logger.debug("Received: %s", res.json())
544 if response_format == AttrsFormat.NORMALIZED:
545 return ContextEntity(**res.json())
546 if response_format == AttrsFormat.KEY_VALUES:
547 return ContextEntityKeyValues(**res.json())
548 return res.json()
549 res.raise_for_status()
550 except requests.RequestException as err:
551 msg = f"Could not load entity {entity_id}"
552 raise BaseHttpClientException(message=msg, response=err.response) from err
554 def get_entity_attributes(
555 self,
556 entity_id: str,
557 entity_type: str = None,
558 attrs: List[str] = None,
559 metadata: List[str] = None,
560 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
561 ) -> Dict[str, ContextAttribute]:
562 """
563 This request is similar to retrieving the whole entity, however this
564 one omits the id and type fields. Just like the general request of
565 getting an entire entity, this operation must return only one entity
566 element. If more than one entity with the same ID is found (e.g.
567 entities with same ID but different type), an error message is
568 returned, with the HTTP status code set to 409 Conflict.
570 Args:
571 entity_id (String): Id of the entity to be retrieved
572 entity_type (String): Entity type, to avoid ambiguity in case
573 there are several entities with the same entity id.
574 attrs (List of Strings): List of attribute names whose data must be
575 included in the response. The attributes are retrieved in the
576 order specified by this parameter.
577 See "Filtering out attributes and metadata" section for more
578 detail. If this parameter is not included, the attributes are
579 retrieved in arbitrary order, and all the attributes of the
580 entity are included in the response. Example: temperature,
581 humidity.
582 metadata (List of Strings): A list of metadata names to include in
583 the response. See "Filtering out attributes and metadata"
584 section for more detail. Example: accuracy.
585 response_format (AttrsFormat, str): Representation format of
586 response
587 Returns:
588 Dict
589 """
590 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
591 headers = self.headers.copy()
592 params = {}
593 if entity_type:
594 params.update({"type": entity_type})
595 if attrs:
596 params.update({"attrs": ",".join(attrs)})
597 if metadata:
598 params.update({"metadata": ",".join(metadata)})
599 if response_format not in list(AttrsFormat):
600 raise ValueError(f"Value must be in {list(AttrsFormat)}")
601 params.update({"options": response_format})
602 try:
603 res = self.get(url=url, params=params, headers=headers)
604 if res.ok:
605 if response_format == AttrsFormat.NORMALIZED:
606 return {
607 key: ContextAttribute(**values)
608 for key, values in res.json().items()
609 }
610 return res.json()
611 res.raise_for_status()
612 except requests.RequestException as err:
613 msg = f"Could not load attributes from entity {entity_id} !"
614 raise BaseHttpClientException(message=msg, response=err.response) from err
616 def update_entity(
617 self,
618 entity: Union[ContextEntity, ContextEntityKeyValues, dict],
619 append_strict: bool = False,
620 key_values: bool = False,
621 ):
622 """
623 The request payload is an object representing the attributes to
624 append or update.
626 Note:
627 Update means overwriting the existing entity. If you want to
628 manipulate you should rather use patch_entity.
630 Args:
631 entity (ContextEntity):
632 append_strict: If `False` the entity attributes are updated (if they
633 previously exist) or appended (if they don't previously exist)
634 with the ones in the payload.
635 If `True` all the attributes in the payload not
636 previously existing in the entity are appended. In addition
637 to that, in case some of the attributes in the payload
638 already exist in the entity, an error is returned.
639 More precisely this means a strict append procedure.
640 key_values: By default False. If set to True, the payload uses
641 the keyValues simplified entity representation, i.e.
642 ContextEntityKeyValues.
643 Returns:
644 None
645 """
646 if key_values:
647 if isinstance(entity, dict):
648 entity = copy.deepcopy(entity)
649 _id = entity.pop("id")
650 _type = entity.pop("type")
651 attrs = entity
652 entity = ContextEntityKeyValues(id=_id, type=_type)
653 else:
654 attrs = entity.model_dump(exclude={"id", "type"})
655 else:
656 attrs = entity.get_attributes()
657 self.update_or_append_entity_attributes(
658 entity_id=entity.id,
659 entity_type=entity.type,
660 attrs=attrs,
661 append_strict=append_strict,
662 key_values=key_values,
663 )
665 def update_entity_properties(
666 self, entity: ContextEntity, append_strict: bool = False
667 ):
668 """
669 The request payload is an object representing the attributes, of any type
670 but Relationship, to append or update.
672 Note:
673 Update means overwriting the existing entity. If you want to
674 manipulate you should rather use patch_entity.
676 Args:
677 entity (ContextEntity):
678 append_strict: If `False` the entity attributes are updated (if they
679 previously exist) or appended (if they don't previously exist)
680 with the ones in the payload.
681 If `True` all the attributes in the payload not
682 previously existing in the entity are appended. In addition
683 to that, in case some of the attributes in the payload
684 already exist in the entity, an error is returned.
685 More precisely this means a strict append procedure.
687 Returns:
688 None
689 """
690 self.update_or_append_entity_attributes(
691 entity_id=entity.id,
692 entity_type=entity.type,
693 attrs=entity.get_properties(),
694 append_strict=append_strict,
695 )
697 def update_entity_relationships(
698 self, entity: ContextEntity, append_strict: bool = False
699 ):
700 """
701 The request payload is an object representing only the attributes, of type
702 Relationship, to append or update.
704 Note:
705 Update means overwriting the existing entity. If you want to
706 manipulate you should rather use patch_entity.
708 Args:
709 entity (ContextEntity):
710 append_strict: If `False` the entity attributes are updated (if they
711 previously exist) or appended (if they don't previously exist)
712 with the ones in the payload.
713 If `True` all the attributes in the payload not
714 previously existing in the entity are appended. In addition
715 to that, in case some of the attributes in the payload
716 already exist in the entity, an error is returned.
717 More precisely this means a strict append procedure.
719 Returns:
720 None
721 """
722 self.update_or_append_entity_attributes(
723 entity_id=entity.id,
724 entity_type=entity.type,
725 attrs=entity.get_relationships(),
726 append_strict=append_strict,
727 )
729 def delete_entity(
730 self,
731 entity_id: str,
732 entity_type: str = None,
733 delete_devices: bool = False,
734 iota_client: IoTAClient = None,
735 iota_url: AnyHttpUrl = settings.IOTA_URL,
736 ) -> None:
737 """
738 Remove a entity from the context broker. No payload is required
739 or received.
741 Args:
742 entity_id:
743 Id of the entity to be deleted
744 entity_type:
745 Entity type, to avoid ambiguity in case there are several
746 entities with the same entity id.
747 delete_devices:
748 If True, also delete all devices that reference this
749 entity (entity_id as entity_name)
750 iota_client:
751 Corresponding IoTA-Client used to access IoTA-Agent
752 iota_url:
753 URL of the corresponding IoT-Agent. This will autogenerate
754 an IoTA-Client, mirroring the information of the
755 ContextBrokerClient, e.g. FiwareHeader, and other headers
757 Returns:
758 None
759 """
760 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}")
761 headers = self.headers.copy()
762 if entity_type:
763 params = {"type": entity_type}
764 else:
765 params = None
766 try:
767 res = self.delete(url=url, params=params, headers=headers)
768 if res.ok:
769 self.logger.info("Entity '%s' successfully deleted!", entity_id)
770 else:
771 res.raise_for_status()
772 except requests.RequestException as err:
773 msg = f"Could not delete entity {entity_id} !"
774 raise BaseHttpClientException(message=msg, response=err.response) from err
776 if delete_devices:
777 from filip.clients.ngsi_v2 import IoTAClient
779 if iota_client:
780 iota_client_local = deepcopy(iota_client)
781 else:
782 warnings.warn(
783 "No IoTA-Client object provided! "
784 "Will try to generate one. "
785 "This usage is not recommended."
786 )
788 iota_client_local = IoTAClient(
789 url=iota_url,
790 fiware_header=self.fiware_headers,
791 headers=self.headers,
792 )
794 for device in iota_client_local.get_device_list(entity_names=[entity_id]):
795 if entity_type:
796 if device.entity_type == entity_type:
797 iota_client_local.delete_device(device_id=device.device_id)
798 else:
799 iota_client_local.delete_device(device_id=device.device_id)
800 iota_client_local.close()
802 def delete_entities(self, entities: List[ContextEntity]) -> None:
803 """
804 Remove a list of entities from the context broker. This methode is
805 more efficient than to call delete_entity() for each entity
807 Args:
808 entities: List[ContextEntity]: List of entities to be deleted
810 Raises:
811 Exception, if one of the entities is not in the ContextBroker
813 Returns:
814 None
815 """
817 # update() delete, deletes all entities without attributes completely,
818 # and removes the attributes for the other
819 # The entities are sorted based on the fact if they have
820 # attributes.
821 limit = 1000 # max number of entities that will be deleted at once
822 entities_with_attributes: List[ContextEntity] = []
823 for entity in entities:
824 attribute_names = [
825 key
826 for key in entity.model_dump()
827 if key not in ContextEntity.model_fields
828 ]
829 if len(attribute_names) > 0:
830 entities_with_attributes.append(
831 ContextEntity(id=entity.id, type=entity.type)
832 )
834 # Post update_delete for those without attribute only once,
835 # for the other post update_delete again but for the changed entity
836 # in the ContextBroker (only id and type left)
837 while len(entities) > 0:
838 self.update(entities=entities[0:limit], action_type="delete")
839 entities = entities[limit:]
840 while len(entities_with_attributes) > 0:
841 self.update(
842 entities=entities_with_attributes[0:limit], action_type="delete"
843 )
844 entities_with_attributes = entities_with_attributes[limit:]
846 def update_or_append_entity_attributes(
847 self,
848 entity_id: str,
849 attrs: Union[
850 List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any]
851 ],
852 entity_type: str = None,
853 append_strict: bool = False,
854 forcedUpdate: bool = False,
855 key_values: bool = False,
856 ):
857 """
858 The request payload is an object representing the attributes to
859 append or update. This corresponds to a 'POST' request if append is
860 set to 'False'
862 Note:
863 Be careful not to update attributes that are
864 provided via context registration, e.g. commands. Commands are
865 removed before sending the request. To avoid breaking things.
867 Args:
868 entity_id: Entity id to be updated
869 entity_type: Entity type, to avoid ambiguity in case there are
870 several entities with the same entity id.
871 attrs: List of attributes to update or to append
872 append_strict: If `False` the entity attributes are updated (if they
873 previously exist) or appended (if they don't previously exist)
874 with the ones in the payload.
875 If `True` all the attributes in the payload not
876 previously existing in the entity are appended. In addition
877 to that, in case some of the attributes in the payload
878 already exist in the entity, an error is returned.
879 More precisely this means a strict append procedure.
880 forcedUpdate: Update operation have to trigger any matching
881 subscription, no matter if there is an actual attribute
882 update or no instead of the default behavior, which is to
883 updated only if attribute is effectively updated.
884 key_values: By default False. If set to True, the payload uses
885 the keyValues simplified entity representation, i.e.
886 ContextEntityKeyValues.
887 Returns:
888 None
890 """
891 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
892 headers = self.headers.copy()
893 params = {}
894 if entity_type:
895 params.update({"type": entity_type})
896 else:
897 entity_type = "dummy"
899 options = []
900 if append_strict:
901 options.append("append")
902 if forcedUpdate:
903 options.append("forcedUpdate")
904 if key_values:
905 assert isinstance(attrs, dict), "for keyValues attrs has to be a dict"
906 options.append("keyValues")
907 if options:
908 params.update({"options": ",".join(options)})
910 if key_values:
911 entity = ContextEntityKeyValues(id=entity_id, type=entity_type, **attrs)
912 else:
913 entity = ContextEntity(id=entity_id, type=entity_type)
914 entity.add_attributes(attrs)
915 # exclude commands from the send data,
916 # as they live in the IoTA-agent
917 excluded_keys = {"id", "type"}
918 # excluded_keys.update(
919 # entity.get_commands(response_format=PropertyFormat.DICT).keys()
920 # )
921 try:
922 res = self.post(
923 url=url,
924 headers=headers,
925 json=entity.model_dump(exclude=excluded_keys, exclude_none=True),
926 params=params,
927 )
928 if res.ok:
929 self.logger.info("Entity '%s' successfully " "updated!", entity.id)
930 else:
931 res.raise_for_status()
932 except requests.RequestException as err:
933 msg = f"Could not update or append attributes of entity" f" {entity.id} !"
934 self.log_error(err=err, msg=msg)
935 raise BaseHttpClientException(message=msg, response=err.response) from err
937 def update_existing_entity_attributes(
938 self,
939 entity_id: str,
940 attrs: Union[
941 List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any]
942 ],
943 entity_type: str = None,
944 forcedUpdate: bool = False,
945 override_metadata: bool = False,
946 key_values: bool = False,
947 ):
948 """
949 The entity attributes are updated with the ones in the payload.
950 In addition to that, if one or more attributes in the payload doesn't
951 exist in the entity, an error is returned. This corresponds to a
952 'PATCH' request.
954 Args:
955 entity_id: Entity id to be updated
956 entity_type: Entity type, to avoid ambiguity in case there are
957 several entities with the same entity id.
958 attrs: List of attributes to update or to append
959 forcedUpdate: Update operation have to trigger any matching
960 subscription, no matter if there is an actual attribute
961 update or no instead of the default behavior, which is to
962 updated only if attribute is effectively updated.
963 override_metadata:
964 Bool,replace the existing metadata with the one provided in
965 the request
966 key_values: By default False. If set to True, the payload uses
967 the keyValues simplified entity representation, i.e.
968 ContextEntityKeyValues.
969 Returns:
970 None
972 """
973 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
974 headers = self.headers.copy()
975 if entity_type:
976 params = {"type": entity_type}
977 else:
978 params = None
979 entity_type = "dummy"
981 options = []
982 if override_metadata:
983 options.append("overrideMetadata")
984 if forcedUpdate:
985 options.append("forcedUpdate")
986 if key_values:
987 assert isinstance(attrs, dict), "for keyValues the attrs must be dict"
988 payload = attrs
989 options.append("keyValues")
990 else:
991 entity = ContextEntity(id=entity_id, type=entity_type)
992 entity.add_attributes(attrs)
993 payload = entity.model_dump(exclude={"id", "type"}, exclude_none=True)
994 if options:
995 params.update({"options": ",".join(options)})
997 try:
998 res = self.patch(
999 url=url,
1000 headers=headers,
1001 json=payload,
1002 params=params,
1003 )
1004 if res.ok:
1005 self.logger.info("Entity '%s' successfully " "updated!", entity_id)
1006 else:
1007 res.raise_for_status()
1008 except requests.RequestException as err:
1009 msg = f"Could not update attributes of entity"
1010 raise BaseHttpClientException(message=msg, response=err.response) from err
1012 def override_entity(
1013 self, entity: Union[ContextEntity, ContextEntityKeyValues], **kwargs
1014 ):
1015 """
1016 The request payload is an object representing the attributes to
1017 override the existing entity.
1019 Note:
1020 If you want to manipulate you should rather use patch_entity.
1022 Args:
1023 entity (ContextEntity or ContextEntityKeyValues):
1024 Returns:
1025 None
1026 """
1027 return self.replace_entity_attributes(
1028 entity_id=entity.id,
1029 entity_type=entity.type,
1030 attrs=entity.get_attributes(),
1031 **kwargs,
1032 )
1034 def replace_entity_attributes(
1035 self,
1036 entity_id: str,
1037 attrs: Union[
1038 List[Union[NamedContextAttribute, Dict[str, ContextAttribute]]], Dict
1039 ],
1040 entity_type: str = None,
1041 forcedUpdate: bool = False,
1042 key_values: bool = False,
1043 ):
1044 """
1045 The attributes previously existing in the entity are removed and
1046 replaced by the ones in the request. This corresponds to a 'PUT'
1047 request.
1049 Args:
1050 entity_id: Entity id to be updated
1051 entity_type: Entity type, to avoid ambiguity in case there are
1052 several entities with the same entity id.
1053 attrs: List of attributes to add to the entity or dict of
1054 attributes in case of key_values=True.
1055 forcedUpdate: Update operation have to trigger any matching
1056 subscription, no matter if there is an actual attribute
1057 update or no instead of the default behavior, which is to
1058 updated only if attribute is effectively updated.
1059 key_values(bool):
1060 By default False. If set to True, "options=keyValues" will
1061 be included in params of the request. The payload uses
1062 the keyValues simplified entity representation, i.e.
1063 ContextEntityKeyValues.
1064 Returns:
1065 None
1066 """
1067 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
1068 headers = self.headers.copy()
1069 params = {}
1070 options = []
1071 if entity_type:
1072 params.update({"type": entity_type})
1073 else:
1074 entity_type = "dummy"
1076 if forcedUpdate:
1077 options.append("forcedUpdate")
1079 if key_values:
1080 options.append("keyValues")
1081 assert isinstance(attrs, dict)
1082 else:
1083 entity = ContextEntity(id=entity_id, type=entity_type)
1084 entity.add_attributes(attrs)
1085 attrs = entity.model_dump(exclude={"id", "type"}, exclude_none=True)
1086 if options:
1087 params.update({"options": ",".join(options)})
1089 try:
1090 res = self.put(
1091 url=url,
1092 headers=headers,
1093 json=attrs,
1094 params=params,
1095 )
1096 if res.ok:
1097 self.logger.info("Entity '%s' successfully " "updated!", entity_id)
1098 else:
1099 res.raise_for_status()
1100 except requests.RequestException as err:
1101 msg = f"Could not replace attribute of entity {entity_id} !"
1102 raise BaseHttpClientException(message=msg, response=err.response) from err
1104 # Attribute operations
1105 def get_attribute(
1106 self,
1107 entity_id: str,
1108 attr_name: str,
1109 entity_type: str = None,
1110 metadata: str = None,
1111 response_format="",
1112 ) -> ContextAttribute:
1113 """
1114 Retrieves a specified attribute from an entity.
1116 Args:
1117 entity_id: Id of the entity. Example: Bcn_Welt
1118 attr_name: Name of the attribute to be retrieved.
1119 entity_type (Optional): Type of the entity to retrieve
1120 metadata (Optional): A list of metadata names to include in the
1121 response. See "Filtering out attributes and metadata" section
1122 for more detail.
1124 Returns:
1125 The content of the retrieved attribute as ContextAttribute
1127 Raises:
1128 Error
1130 """
1131 url = urljoin(
1132 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
1133 )
1134 headers = self.headers.copy()
1135 params = {}
1136 if entity_type:
1137 params.update({"type": entity_type})
1138 if metadata:
1139 params.update({"metadata": ",".join(metadata)})
1140 try:
1141 res = self.get(url=url, params=params, headers=headers)
1142 if res.ok:
1143 self.logger.debug("Received: %s", res.json())
1144 return ContextAttribute(**res.json())
1145 res.raise_for_status()
1146 except requests.RequestException as err:
1147 msg = (
1148 f"Could not load attribute '{attr_name}' from entity" f"'{entity_id}' "
1149 )
1150 raise BaseHttpClientException(message=msg, response=err.response) from err
1152 def update_entity_attribute(
1153 self,
1154 entity_id: str,
1155 attr: Union[ContextAttribute, NamedContextAttribute],
1156 *,
1157 entity_type: str = None,
1158 attr_name: str = None,
1159 override_metadata: bool = True,
1160 forcedUpdate: bool = False,
1161 ):
1162 """
1163 Updates a specified attribute from an entity.
1165 Args:
1166 attr:
1167 context attribute to update
1168 entity_id:
1169 Id of the entity. Example: Bcn_Welt
1170 entity_type:
1171 Entity type, to avoid ambiguity in case there are
1172 several entities with the same entity id.
1173 forcedUpdate: Update operation have to trigger any matching
1174 subscription, no matter if there is an actual attribute
1175 update or no instead of the default behavior, which is to
1176 updated only if attribute is effectively updated.
1177 attr_name:
1178 Name of the attribute to be updated.
1179 override_metadata:
1180 Bool, if set to `True` (default) the metadata will be
1181 overwritten. This is for backwards compatibility reasons.
1182 If `False` the metadata values will be either updated if
1183 already existing or append if not.
1184 See also:
1185 https://fiware-orion.readthedocs.io/en/master/user/metadata.html
1186 """
1187 headers = self.headers.copy()
1188 if not isinstance(attr, NamedContextAttribute):
1189 assert attr_name is not None, (
1190 "Missing name for attribute. "
1191 "attr_name must be present if"
1192 "attr is of type ContextAttribute"
1193 )
1194 else:
1195 assert attr_name is None, (
1196 "Invalid argument attr_name. Do not set "
1197 "attr_name if attr is of type "
1198 "NamedContextAttribute"
1199 )
1200 attr_name = attr.name
1202 url = urljoin(
1203 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
1204 )
1205 params = {}
1206 if entity_type:
1207 params.update({"type": entity_type})
1208 # set overrideMetadata option (we assure backwards compatibility here)
1209 options = []
1210 if override_metadata:
1211 options.append("overrideMetadata")
1212 if forcedUpdate:
1213 options.append("forcedUpdate")
1214 if options:
1215 params.update({"options": ",".join(options)})
1216 try:
1217 res = self.put(
1218 url=url,
1219 headers=headers,
1220 params=params,
1221 json=attr.model_dump(exclude={"name"}, exclude_none=True),
1222 )
1223 if res.ok:
1224 self.logger.info(
1225 "Attribute '%s' of '%s' " "successfully updated!",
1226 attr_name,
1227 entity_id,
1228 )
1229 else:
1230 res.raise_for_status()
1231 except requests.RequestException as err:
1232 msg = (
1233 f"Could not update attribute '{attr_name}' of entity" f"'{entity_id}' "
1234 )
1235 raise BaseHttpClientException(message=msg, response=err.response) from err
1237 def delete_entity_attribute(
1238 self, entity_id: str, attr_name: str, entity_type: str = None
1239 ) -> None:
1240 """
1241 Removes a specified attribute from an entity.
1243 Args:
1244 entity_id: Id of the entity.
1245 attr_name: Name of the attribute to be retrieved.
1246 entity_type: Entity type, to avoid ambiguity in case there are
1247 several entities with the same entity id.
1248 Raises:
1249 Error
1251 """
1252 url = urljoin(
1253 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
1254 )
1255 headers = self.headers.copy()
1256 params = {}
1257 if entity_type:
1258 params.update({"type": entity_type})
1259 try:
1260 res = self.delete(url=url, headers=headers)
1261 if res.ok:
1262 self.logger.info(
1263 "Attribute '%s' of '%s' " "successfully deleted!",
1264 attr_name,
1265 entity_id,
1266 )
1267 else:
1268 res.raise_for_status()
1269 except requests.RequestException as err:
1270 msg = f"Could not delete attribute '{attr_name}' of entity '{entity_id}'"
1271 raise BaseHttpClientException(message=msg, response=err.response) from err
1273 # Attribute value operations
1274 def get_attribute_value(
1275 self, entity_id: str, attr_name: str, entity_type: str = None
1276 ) -> Any:
1277 """
1278 This operation returns the value property with the value of the
1279 attribute.
1281 Args:
1282 entity_id: Id of the entity. Example: Bcn_Welt
1283 attr_name: Name of the attribute to be retrieved.
1284 Example: temperature.
1285 entity_type: Entity type, to avoid ambiguity in case there are
1286 several entities with the same entity id.
1288 Returns:
1290 """
1291 url = urljoin(
1292 self.base_url,
1293 f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value",
1294 )
1295 headers = self.headers.copy()
1296 params = {}
1297 if entity_type:
1298 params.update({"type": entity_type})
1299 try:
1300 res = self.get(url=url, params=params, headers=headers)
1301 if res.ok:
1302 self.logger.debug("Received: %s", res.json())
1303 return res.json()
1304 res.raise_for_status()
1305 except requests.RequestException as err:
1306 msg = (
1307 f"Could not load value of attribute '{attr_name}' from "
1308 f"entity'{entity_id}' "
1309 )
1310 raise BaseHttpClientException(message=msg, response=err.response) from err
1312 def update_attribute_value(
1313 self,
1314 *,
1315 entity_id: str,
1316 attr_name: str,
1317 value: Any,
1318 entity_type: str = None,
1319 forcedUpdate: bool = False,
1320 ):
1321 """
1322 Updates the value of a specified attribute of an entity
1324 Args:
1325 value: update value
1326 entity_id: Id of the entity. Example: Bcn_Welt
1327 attr_name: Name of the attribute to be retrieved.
1328 Example: temperature.
1329 entity_type: Entity type, to avoid ambiguity in case there are
1330 several entities with the same entity id.
1331 forcedUpdate: Update operation have to trigger any matching
1332 subscription, no matter if there is an actual attribute
1333 update or no instead of the default behavior, which is to
1334 updated only if attribute is effectively updated.
1335 Returns:
1337 """
1338 url = urljoin(
1339 self.base_url,
1340 f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value",
1341 )
1342 headers = self.headers.copy()
1343 params = {}
1344 if entity_type:
1345 params.update({"type": entity_type})
1346 options = []
1347 if forcedUpdate:
1348 options.append("forcedUpdate")
1349 if options:
1350 params.update({"options": ",".join(options)})
1351 try:
1352 if not isinstance(value, (dict, list)):
1353 headers.update({"Content-Type": "text/plain"})
1354 if isinstance(value, str):
1355 value = f"{value}"
1356 res = self.put(url=url, headers=headers, json=value, params=params)
1357 else:
1358 res = self.put(url=url, headers=headers, json=value, params=params)
1359 if res.ok:
1360 self.logger.info(
1361 "Attribute '%s' of '%s' " "successfully updated!",
1362 attr_name,
1363 entity_id,
1364 )
1365 else:
1366 res.raise_for_status()
1367 except requests.RequestException as err:
1368 msg = (
1369 f"Could not update value of attribute '{attr_name}' from "
1370 f"entity '{entity_id}' "
1371 )
1372 raise BaseHttpClientException(message=msg, response=err.response) from err
1374 # Types Operations
1375 def get_entity_types(
1376 self, *, limit: int = None, offset: int = None, options: str = None
1377 ) -> List[Dict[str, Any]]:
1378 """
1380 Args:
1381 limit: Limit the number of types to be retrieved.
1382 offset: Skip a number of records.
1383 options: Options dictionary. Allowed: count, values
1385 Returns:
1387 """
1388 url = urljoin(self.base_url, f"{self._url_version}/types")
1389 headers = self.headers.copy()
1390 params = {}
1391 if limit:
1392 params.update({"limit": limit})
1393 if offset:
1394 params.update({"offset": offset})
1395 if options:
1396 params.update({"options": options})
1397 try:
1398 res = self.get(url=url, params=params, headers=headers)
1399 if res.ok:
1400 self.logger.debug("Received: %s", res.json())
1401 return res.json()
1402 res.raise_for_status()
1403 except requests.RequestException as err:
1404 msg = "Could not load entity types!"
1405 raise BaseHttpClientException(message=msg, response=err.response) from err
1407 def get_entity_type(self, entity_type: str) -> Dict[str, Any]:
1408 """
1410 Args:
1411 entity_type: Entity Type. Example: Room
1413 Returns:
1415 """
1416 url = urljoin(self.base_url, f"{self._url_version}/types/{entity_type}")
1417 headers = self.headers.copy()
1418 params = {}
1419 try:
1420 res = self.get(url=url, params=params, headers=headers)
1421 if res.ok:
1422 self.logger.debug("Received: %s", res.json())
1423 return res.json()
1424 res.raise_for_status()
1425 except requests.RequestException as err:
1426 msg = f"Could not load entities of type" f"'{entity_type}' "
1427 raise BaseHttpClientException(message=msg, response=err.response) from err
1429 # SUBSCRIPTION API ENDPOINTS
1430 def get_subscription_list(self, limit: PositiveInt = inf) -> List[Subscription]:
1431 """
1432 Returns a list of all the subscriptions present in the system.
1433 Args:
1434 limit: Limit the number of subscriptions to be retrieved
1435 Returns:
1436 list of subscriptions
1437 """
1438 url = urljoin(self.base_url, f"{self._url_version}/subscriptions/")
1439 headers = self.headers.copy()
1440 params = {}
1442 # We always use the 'count' option to check weather pagination is
1443 # required
1444 params.update({"options": "count"})
1445 try:
1446 items = self.__pagination(
1447 limit=limit, url=url, params=params, headers=headers
1448 )
1449 adapter = TypeAdapter(List[Subscription])
1450 return adapter.validate_python(items)
1451 except requests.RequestException as err:
1452 msg = "Could not load subscriptions!"
1453 raise BaseHttpClientException(message=msg, response=err.response) from err
1455 def post_subscription(
1456 self,
1457 subscription: Subscription,
1458 update: bool = False,
1459 skip_initial_notification: bool = False,
1460 ) -> str:
1461 """
1462 Creates a new subscription. The subscription is represented by a
1463 Subscription object defined in filip.cb.models.
1465 If the subscription already exists, the adding is prevented and the id
1466 of the existing subscription is returned.
1468 A subscription is deemed as already existing if there exists a
1469 subscription with the exact same subject and notification fields. All
1470 optional fields are not considered.
1472 Args:
1473 subscription: Subscription
1474 update: True - If the subscription already exists, update it
1475 False- If the subscription already exists, throw warning
1476 skip_initial_notification: True - Initial Notifications will be
1477 sent to recipient containing the whole data. This is
1478 deprecated and removed from version 3.0 of the context broker.
1479 False - skip the initial notification
1480 Returns:
1481 str: Id of the (created) subscription
1483 """
1484 existing_subscriptions = self.get_subscription_list()
1486 sub_dict = subscription.model_dump(
1487 include={"subject", "notification"},
1488 exclude={
1489 "notification": {
1490 "lastSuccess",
1491 "lastFailure",
1492 "lastSuccessCode",
1493 "lastFailureReason",
1494 }
1495 },
1496 )
1497 for ex_sub in existing_subscriptions:
1498 if self._subscription_dicts_are_equal(
1499 sub_dict,
1500 ex_sub.model_dump(
1501 include={"subject", "notification"},
1502 exclude={
1503 "lastSuccess",
1504 "lastFailure",
1505 "lastSuccessCode",
1506 "lastFailureReason",
1507 },
1508 ),
1509 ):
1510 self.logger.info("Subscription already exists")
1511 if update:
1512 self.logger.info("Updated subscription")
1513 subscription.id = ex_sub.id
1514 self.update_subscription(subscription)
1515 else:
1516 warnings.warn(
1517 f"Subscription existed already with the id" f" {ex_sub.id}"
1518 )
1519 return ex_sub.id
1521 params = {}
1522 if skip_initial_notification:
1523 version = self.get_version()["orion"]["version"]
1524 if parse_version(version) <= parse_version("3.1"):
1525 params.update({"options": "skipInitialNotification"})
1526 else:
1527 pass
1528 warnings.warn(
1529 f"Skip initial notifications is a deprecated "
1530 f"feature of older versions <=3.1 of the context "
1531 f"broker. The Context Broker that you requesting has "
1532 f"version: {version}. For newer versions we "
1533 f"automatically skip this option. Consider "
1534 f"refactoring and updating your services",
1535 DeprecationWarning,
1536 )
1538 url = urljoin(self.base_url, "v2/subscriptions")
1539 headers = self.headers.copy()
1540 headers.update({"Content-Type": "application/json"})
1541 try:
1542 res = self.post(
1543 url=url,
1544 headers=headers,
1545 data=subscription.model_dump_json(
1546 exclude={
1547 "id": True,
1548 "notification": {
1549 "lastSuccess",
1550 "lastFailure",
1551 "lastSuccessCode",
1552 "lastFailureReason",
1553 },
1554 },
1555 exclude_none=True,
1556 ),
1557 params=params,
1558 )
1559 if res.ok:
1560 self.logger.info("Subscription successfully created!")
1561 return res.headers["Location"].split("/")[-1]
1562 res.raise_for_status()
1563 except requests.RequestException as err:
1564 msg = "Could not send subscription!"
1565 raise BaseHttpClientException(message=msg, response=err.response) from err
1567 def get_subscription(self, subscription_id: str) -> Subscription:
1568 """
1569 Retrieves a subscription from
1570 Args:
1571 subscription_id: id of the subscription
1573 Returns:
1575 """
1576 url = urljoin(
1577 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}"
1578 )
1579 headers = self.headers.copy()
1580 try:
1581 res = self.get(url=url, headers=headers)
1582 if res.ok:
1583 self.logger.debug("Received: %s", res.json())
1584 return Subscription(**res.json())
1585 res.raise_for_status()
1586 except requests.RequestException as err:
1587 msg = f"Could not load subscription {subscription_id}"
1588 raise BaseHttpClientException(message=msg, response=err.response) from err
1590 def update_subscription(
1591 self, subscription: Subscription, skip_initial_notification: bool = False
1592 ):
1593 """
1594 Only the fields included in the request are updated in the subscription.
1596 Args:
1597 subscription: Subscription to update
1598 skip_initial_notification: True - Initial Notifications will be
1599 sent to recipient containing the whole data. This is
1600 deprecated and removed from version 3.0 of the context broker.
1601 False - skip the initial notification
1603 Returns:
1604 None
1605 """
1606 params = {}
1607 if skip_initial_notification:
1608 version = self.get_version()["orion"]["version"]
1609 if parse_version(version) <= parse_version("3.1"):
1610 params.update({"options": "skipInitialNotification"})
1611 else:
1612 pass
1613 warnings.warn(
1614 f"Skip initial notifications is a deprecated "
1615 f"feature of older versions <3.1 of the context "
1616 f"broker. The Context Broker that you requesting has "
1617 f"version: {version}. For newer versions we "
1618 f"automatically skip this option. Consider "
1619 f"refactoring and updating your services",
1620 DeprecationWarning,
1621 )
1623 url = urljoin(
1624 self.base_url, f"{self._url_version}/subscriptions/{subscription.id}"
1625 )
1626 headers = self.headers.copy()
1627 headers.update({"Content-Type": "application/json"})
1628 try:
1629 res = self.patch(
1630 url=url,
1631 headers=headers,
1632 data=subscription.model_dump_json(
1633 exclude={
1634 "id": True,
1635 "notification": {
1636 "lastSuccess",
1637 "lastFailure",
1638 "lastSuccessCode",
1639 "lastFailureReason",
1640 },
1641 },
1642 exclude_none=True,
1643 ),
1644 )
1645 if res.ok:
1646 self.logger.info("Subscription successfully updated!")
1647 else:
1648 res.raise_for_status()
1649 except requests.RequestException as err:
1650 msg = f"Could not update subscription {subscription.id}"
1651 raise BaseHttpClientException(message=msg, response=err.response) from err
1653 def delete_subscription(self, subscription_id: str) -> None:
1654 """
1655 Deletes a subscription from a Context Broker
1656 Args:
1657 subscription_id: id of the subscription
1658 """
1659 url = urljoin(
1660 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}"
1661 )
1662 headers = self.headers.copy()
1663 try:
1664 res = self.delete(url=url, headers=headers)
1665 if res.ok:
1666 self.logger.info(
1667 f"Subscription '{subscription_id}' " f"successfully deleted!"
1668 )
1669 else:
1670 res.raise_for_status()
1671 except requests.RequestException as err:
1672 msg = f"Could not delete subscription {subscription_id}"
1673 raise BaseHttpClientException(message=msg, response=err.response) from err
1675 # Registration API
1676 def get_registration_list(self, *, limit: PositiveInt = None) -> List[Registration]:
1677 """
1678 Lists all the context provider registrations present in the system.
1680 Args:
1681 limit: Limit the number of registrations to be retrieved
1682 Returns:
1684 """
1685 url = urljoin(self.base_url, f"{self._url_version}/registrations/")
1686 headers = self.headers.copy()
1687 params = {}
1689 # We always use the 'count' option to check weather pagination is
1690 # required
1691 params.update({"options": "count"})
1692 try:
1693 items = self.__pagination(
1694 limit=limit, url=url, params=params, headers=headers
1695 )
1696 adapter = TypeAdapter(List[Registration])
1697 return adapter.validate_python(items)
1698 except requests.RequestException as err:
1699 msg = "Could not load registrations!"
1700 raise BaseHttpClientException(message=msg, response=err.response) from err
1702 def post_registration(self, registration: Registration):
1703 """
1704 Creates a new context provider registration. This is typically used
1705 for binding context sources as providers of certain data. The
1706 registration is represented by cb.models.Registration
1708 Args:
1709 registration (Registration):
1711 Returns:
1713 """
1714 url = urljoin(self.base_url, f"{self._url_version}/registrations")
1715 headers = self.headers.copy()
1716 headers.update({"Content-Type": "application/json"})
1717 try:
1718 res = self.post(
1719 url=url,
1720 headers=headers,
1721 data=registration.model_dump_json(exclude={"id"}, exclude_none=True),
1722 )
1723 if res.ok:
1724 self.logger.info("Registration successfully created!")
1725 return res.headers["Location"].split("/")[-1]
1726 res.raise_for_status()
1727 except requests.RequestException as err:
1728 msg = f"Could not send registration {registration.id}!"
1729 raise BaseHttpClientException(message=msg, response=err.response) from err
1731 def get_registration(self, registration_id: str) -> Registration:
1732 """
1733 Retrieves a registration from context broker by id
1735 Args:
1736 registration_id: id of the registration
1738 Returns:
1739 Registration
1740 """
1741 url = urljoin(
1742 self.base_url, f"{self._url_version}/registrations/{registration_id}"
1743 )
1744 headers = self.headers.copy()
1745 try:
1746 res = self.get(url=url, headers=headers)
1747 if res.ok:
1748 self.logger.debug("Received: %s", res.json())
1749 return Registration(**res.json())
1750 res.raise_for_status()
1751 except requests.RequestException as err:
1752 msg = f"Could not load registration {registration_id} !"
1753 raise BaseHttpClientException(message=msg, response=err.response) from err
1755 def add_valid_relationships(
1756 self, entities: List[ContextEntity]
1757 ) -> List[ContextEntity]:
1758 """
1759 Validate all attributes in the given entities. If the attribute value points to
1760 an existing entity, it is assumed that this attribute is a relationship, and it
1761 will be assigned with the attribute type "relationship"
1763 Args:
1764 entities: list of entities that need to be validated.
1766 Returns:
1767 updated entities
1768 """
1769 updated_entities = []
1770 for entity in entities[:]:
1771 for attr_name, attr_value in entity.model_dump(
1772 exclude={"id", "type"}
1773 ).items():
1774 if isinstance(attr_value, dict):
1775 if self.validate_relationship(attr_value):
1776 entity.update_attribute(
1777 {
1778 attr_name: ContextAttribute(
1779 **{
1780 "type": DataType.RELATIONSHIP,
1781 "value": attr_value.get("value"),
1782 }
1783 )
1784 }
1785 )
1786 updated_entities.append(entity)
1787 return updated_entities
1789 def remove_invalid_relationships(
1790 self, entities: List[ContextEntity], hard_remove: bool = True
1791 ) -> List[ContextEntity]:
1792 """
1793 Removes invalid relationships from the entities. An invalid relationship
1794 is a relationship that has no destination entity.
1796 Args:
1797 entities: list of entities that need to be validated.
1798 hard_remove: If True, invalid relationships will be deleted.
1799 If False, invalid relationships will be changed to Text
1800 attributes.
1802 Returns:
1803 updated entities
1804 """
1805 updated_entities = []
1806 for entity in entities[:]:
1807 for relationship in entity.get_relationships():
1808 if not self.validate_relationship(relationship):
1809 if hard_remove:
1810 entity.delete_attributes(attrs=[relationship])
1811 else:
1812 # change the attribute type to "Text"
1813 entity.update_attribute(
1814 attrs=[
1815 NamedContextAttribute(
1816 name=relationship.name,
1817 type=DataType.TEXT,
1818 value=relationship.value,
1819 )
1820 ]
1821 )
1822 updated_entities.append(entity)
1823 return updated_entities
1825 def validate_relationship(
1826 self, relationship: Union[NamedContextAttribute, ContextAttribute, Dict]
1827 ) -> bool:
1828 """
1829 Validates a relationship. A relationship is valid if it points to an existing
1830 entity. Otherwise, it is considered invalid
1832 Args:
1833 relationship: relationship to validate
1834 Returns
1835 True if the relationship is valid, False otherwise
1836 """
1837 if isinstance(relationship, NamedContextAttribute) or isinstance(
1838 relationship, ContextAttribute
1839 ):
1840 destination_id = relationship.value
1841 elif isinstance(relationship, dict):
1842 _sentinel = object()
1843 destination_id = relationship.get("value", _sentinel)
1844 if destination_id is _sentinel:
1845 raise ValueError(
1846 "Invalid relationship dictionary format\n"
1847 "Expected format: {"
1848 f'"type": "{DataType.RELATIONSHIP.value}", '
1849 '"value" "entity_id"}'
1850 )
1851 else:
1852 raise ValueError("Invalid relationship type.")
1853 try:
1854 destination_entity = self.get_entity(entity_id=destination_id)
1855 return destination_entity.id == destination_id
1856 except requests.RequestException as err:
1857 if err.response.status_code == 404:
1858 return False
1860 def update_registration(self, registration: Registration):
1861 """
1862 Only the fields included in the request are updated in the registration.
1864 Args:
1865 registration: Registration to update
1866 Returns:
1868 """
1869 url = urljoin(
1870 self.base_url, f"{self._url_version}/registrations/{registration.id}"
1871 )
1872 headers = self.headers.copy()
1873 headers.update({"Content-Type": "application/json"})
1874 try:
1875 res = self.patch(
1876 url=url,
1877 headers=headers,
1878 data=registration.model_dump_json(exclude={"id"}, exclude_none=True),
1879 )
1880 if res.ok:
1881 self.logger.info("Registration successfully updated!")
1882 else:
1883 res.raise_for_status()
1884 except requests.RequestException as err:
1885 msg = f"Could not update registration {registration.id} !"
1886 raise BaseHttpClientException(message=msg, response=err.response) from err
1888 def delete_registration(self, registration_id: str) -> None:
1889 """
1890 Deletes a subscription from a Context Broker
1891 Args:
1892 registration_id: id of the subscription
1893 """
1894 url = urljoin(
1895 self.base_url, f"{self._url_version}/registrations/{registration_id}"
1896 )
1897 headers = self.headers.copy()
1898 try:
1899 res = self.delete(url=url, headers=headers)
1900 if res.ok:
1901 self.logger.info(
1902 "Registration '%s' " "successfully deleted!", registration_id
1903 )
1904 res.raise_for_status()
1905 except requests.RequestException as err:
1906 msg = f"Could not delete registration {registration_id} !"
1907 raise BaseHttpClientException(message=msg, response=err.response) from err
1909 # Batch operation API
1910 def update(
1911 self,
1912 *,
1913 entities: List[Union[ContextEntity, ContextEntityKeyValues]],
1914 action_type: Union[ActionType, str],
1915 update_format: str = None,
1916 forcedUpdate: bool = False,
1917 override_metadata: bool = False,
1918 ) -> None:
1919 """
1920 This operation allows to create, update and/or delete several entities
1921 in a single batch operation.
1923 This operation is split in as many individual operations as entities
1924 in the entities vector, so the actionType is executed for each one of
1925 them. Depending on the actionType, a mapping with regular non-batch
1926 operations can be done:
1928 append: maps to POST /v2/entities (if the entity does not already exist)
1929 or POST /v2/entities/<id>/attrs (if the entity already exists).
1931 appendStrict: maps to POST /v2/entities (if the entity does not
1932 already exist) or POST /v2/entities/<id>/attrs?options=append (if the
1933 entity already exists).
1935 update: maps to PATCH /v2/entities/<id>/attrs.
1937 delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every
1938 attribute included in the entity or to DELETE /v2/entities/<id> if
1939 no attribute were included in the entity.
1941 replace: maps to PUT /v2/entities/<id>/attrs.
1943 Args:
1944 entities: "an array of entities, each entity specified using the "
1945 "JSON entity representation format "
1946 action_type (Update): "actionType, to specify the kind of update
1947 action to do: either append, appendStrict, update, delete,
1948 or replace. "
1949 update_format (str): Optional 'keyValues'
1950 forcedUpdate: Update operation have to trigger any matching
1951 subscription, no matter if there is an actual attribute
1952 update or no instead of the default behavior, which is to
1953 updated only if attribute is effectively updated.
1954 override_metadata:
1955 Bool, replace the existing metadata with the one provided in
1956 the request
1957 Returns:
1959 """
1961 url = urljoin(self.base_url, f"{self._url_version}/op/update")
1962 headers = self.headers.copy()
1963 headers.update({"Content-Type": "application/json"})
1964 params = {}
1965 options = []
1966 if override_metadata:
1967 options.append("overrideMetadata")
1968 if forcedUpdate:
1969 options.append("forcedUpdate")
1970 if update_format:
1971 assert (
1972 update_format == AttrsFormat.KEY_VALUES.value
1973 ), "Only 'keyValues' is allowed as update format"
1974 options.append("keyValues")
1975 if options:
1976 params.update({"options": ",".join(options)})
1977 update = Update(actionType=action_type, entities=entities)
1978 try:
1979 res = self.post(
1980 url=url,
1981 headers=headers,
1982 params=params,
1983 json=update.model_dump(by_alias=True),
1984 )
1985 if res.ok:
1986 self.logger.info("Update operation '%s' succeeded!", action_type)
1987 else:
1988 res.raise_for_status()
1989 except requests.RequestException as err:
1990 msg = f"Update operation '{action_type}' failed!"
1991 raise BaseHttpClientException(message=msg, response=err.response) from err
1993 def query(
1994 self,
1995 *,
1996 query: Query,
1997 limit: PositiveInt = None,
1998 order_by: str = None,
1999 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
2000 ) -> List[Any]:
2001 """
2002 Generate api query
2003 Args:
2004 query (Query):
2005 limit (PositiveInt):
2006 order_by (str):
2007 response_format (AttrsFormat, str):
2008 Returns:
2009 The response payload is an Array containing one object per matching
2010 entity, or an empty array [] if no entities are found. The entities
2011 follow the JSON entity representation format (described in the
2012 section "JSON Entity Representation").
2013 """
2014 url = urljoin(self.base_url, f"{self._url_version}/op/query")
2015 headers = self.headers.copy()
2016 headers.update({"Content-Type": "application/json"})
2017 params = {"options": "count"}
2019 if response_format:
2020 if response_format not in list(AttrsFormat):
2021 raise ValueError(f"Value must be in {list(AttrsFormat)}")
2022 params["options"] = ",".join([response_format, "count"])
2023 try:
2024 items = self.__pagination(
2025 method=PaginationMethod.POST,
2026 url=url,
2027 headers=headers,
2028 params=params,
2029 data=query.model_dump_json(exclude_none=True),
2030 limit=limit,
2031 )
2032 if response_format == AttrsFormat.NORMALIZED:
2033 adapter = TypeAdapter(List[ContextEntity])
2034 return adapter.validate_python(items)
2035 if response_format == AttrsFormat.KEY_VALUES:
2036 adapter = TypeAdapter(List[ContextEntityKeyValues])
2037 return adapter.validate_python(items)
2038 return items
2039 except requests.RequestException as err:
2040 msg = "Query operation failed!"
2041 raise BaseHttpClientException(message=msg, response=err.response) from err
2043 def notify(self, message: Message) -> None:
2044 """
2045 This operation is intended to consume a notification payload so that
2046 all the entity data included by such notification is persisted,
2047 overwriting if necessary. This operation is useful when one NGSIv2
2048 endpoint is subscribed to another NGSIv2 endpoint (federation
2049 scenarios). The request payload must be an NGSIv2 notification
2050 payload. The behaviour must be exactly the same as 'update'
2051 with 'action_type' equal to append.
2053 Args:
2054 message: Notification message
2056 Returns:
2057 None
2058 """
2059 url = urljoin(self.base_url, "v2/op/notify")
2060 headers = self.headers.copy()
2061 headers.update({"Content-Type": "application/json"})
2062 params = {}
2063 try:
2064 res = self.post(
2065 url=url,
2066 headers=headers,
2067 params=params,
2068 data=message.model_dump_json(by_alias=True),
2069 )
2070 if res.ok:
2071 self.logger.info("Notification message sent!")
2072 else:
2073 res.raise_for_status()
2074 except requests.RequestException as err:
2075 msg = (
2076 f"Sending notifcation message failed! \n "
2077 f"{message.model_dump_json(indent=2)}"
2078 )
2079 raise BaseHttpClientException(message=msg, response=err.response) from err
2081 def post_command(
2082 self,
2083 *,
2084 entity_id: str,
2085 command: Union[Command, NamedCommand, Dict],
2086 entity_type: str = None,
2087 command_name: str = None,
2088 ) -> None:
2089 """
2090 Post a command to a context entity this corresponds to 'PATCH' of the
2091 specified command attribute.
2093 Args:
2094 entity_id: Entity identifier
2095 command: Command
2096 entity_type: Entity type
2097 command_name: Name of the command in the entity
2099 Returns:
2100 None
2101 """
2102 if command_name:
2103 assert isinstance(command, (Command, dict))
2104 if isinstance(command, dict):
2105 command = Command(**command)
2106 command = {command_name: command.model_dump()}
2107 else:
2108 assert isinstance(command, (NamedCommand, dict))
2109 if isinstance(command, dict):
2110 command = NamedCommand(**command)
2112 self.update_existing_entity_attributes(
2113 entity_id=entity_id, entity_type=entity_type, attrs=[command]
2114 )
2116 def does_entity_exist(self, entity_id: str, entity_type: str) -> bool:
2117 """
2118 Test if an entity with given id and type is present in the CB
2120 Args:
2121 entity_id: Entity id
2122 entity_type: Entity type
2124 Returns:
2125 bool; True if entity exists
2127 Raises:
2128 RequestException, if any error occurs (e.g: No Connection),
2129 except that the entity is not found
2130 """
2131 url = urljoin(self.base_url, f"v2/entities/{entity_id}")
2132 headers = self.headers.copy()
2133 params = {"type": entity_type}
2135 try:
2136 res = self.get(url=url, params=params, headers=headers)
2137 if res.ok:
2138 return True
2139 res.raise_for_status()
2140 except requests.RequestException as err:
2141 if err.response is None or not err.response.status_code == 404:
2142 self.log_error(err=err, msg="Checking entity existence failed!")
2143 raise
2144 return False
2146 def patch_entity(
2147 self,
2148 entity: Union[ContextEntity, ContextEntityKeyValues],
2149 key_values: bool = False,
2150 forcedUpdate: bool = False,
2151 override_metadata: bool = False,
2152 ) -> None:
2153 """
2154 Takes a given entity and updates the state in the CB to match it.
2155 It is an extended equivalent to the HTTP method PATCH, which applies
2156 partial modifications to a resource.
2158 Args:
2159 entity: Entity to update
2160 key_values: If True, the entity is updated in key-values format.
2161 forcedUpdate: Update operation have to trigger any matching
2162 subscription, no matter if there is an actual attribute
2163 update or no instead of the default behavior, which is to
2164 updated only if attribute is effectively updated.
2165 override_metadata: If True, the existing metadata of the entity
2166 is replaced
2167 Returns:
2168 None
2169 """
2170 attributes = entity.get_attributes()
2172 self.update_existing_entity_attributes(
2173 entity_id=entity.id,
2174 entity_type=entity.type,
2175 attrs=attributes,
2176 key_values=key_values,
2177 forcedUpdate=forcedUpdate,
2178 override_metadata=override_metadata,
2179 )
2181 def _subscription_dicts_are_equal(self, first: dict, second: dict):
2182 """
2183 Check if two dictionaries and all sub-dictionaries are equal.
2184 Logs a warning if the keys are not equal, but ignores the
2185 comparison of such keys.
2187 Args:
2188 first dict: Dictionary of first subscription
2189 second dict: Dictionary of second subscription
2191 Returns:
2192 True if equal, else False
2193 """
2195 def _value_is_not_none(value):
2196 """
2197 Recursive function to check if a value equals none.
2198 If the value is a dict and any value of the dict is not none,
2199 the value is not none.
2200 If the value is a list and any item is not none, the value is not none.
2201 If it's neither dict nore list, bool is used.
2202 """
2203 if isinstance(value, dict):
2204 return any([_value_is_not_none(value=_v) for _v in value.values()])
2205 if isinstance(value, list):
2206 return any([_value_is_not_none(value=_v) for _v in value])
2207 else:
2208 return bool(value)
2210 if first.keys() != second.keys():
2211 warnings.warn(
2212 "Subscriptions contain a different set of fields. "
2213 "Only comparing to new fields of the new one."
2214 )
2215 for k, v in first.items():
2216 ex_value = second.get(k, None)
2217 if isinstance(v, dict) and isinstance(ex_value, dict):
2218 equal = self._subscription_dicts_are_equal(v, ex_value)
2219 if equal:
2220 continue
2221 else:
2222 return False
2223 if v != ex_value:
2224 self.logger.debug(f"Not equal fields for key {k}: ({v}, {ex_value})")
2225 if (
2226 not _value_is_not_none(v)
2227 and not _value_is_not_none(ex_value)
2228 or k == "timesSent"
2229 ):
2230 continue
2231 return False
2232 return True
2235#
2236#
2237# def check_duplicate_subscription(self, subscription_body, limit: int = 20):
2238# """
2239# Function compares the subject of the subscription body, on whether a subscription
2240# already exists for a device / entity.
2241# :param subscription_body: the body of the new subscripton
2242# :param limit: pagination parameter, to set the number of
2243# subscriptions bodies the get request should grab
2244# :return: exists, boolean -> True, if such a subscription allready
2245# exists
2246# """
2247# exists = False
2248# subscription_subject = json.loads(subscription_body)["subject"]
2249# # Exact keys depend on subscription body
2250# try:
2251# subscription_url = json.loads(subscription_body)[
2252# "notification"]["httpCustom"]["url"]
2253# except KeyError:
2254# subscription_url = json.loads(subscription_body)[
2255# "notification"]["http"]["url"]
2256#
2257# # If the number of subscriptions is larger then the limit,
2258# paginations methods have to be used
2259# url = self.url + '/v2/subscriptions?limit=' + str(limit) +
2260# '&options=count'
2261# response = self.session.get(url, headers=self.get_header())
2262#
2263# sub_count = float(response.headers["Fiware-Total-Count"])
2264# response = json.loads(response.text)
2265# if sub_count >= limit:
2266# response = self.get_pagination(url=url, headers=self.get_header(),
2267# limit=limit, count=sub_count)
2268# response = json.loads(response)
2269#
2270# for existing_subscription in response:
2271# # check whether the exact same subscriptions already exists
2272# if existing_subscription["subject"] == subscription_subject:
2273# exists = True
2274# break
2275# try:
2276# existing_url = existing_subscription["notification"][
2277# "http"]["url"]
2278# except KeyError:
2279# existing_url = existing_subscription["notification"][
2280# "httpCustom"]["url"]
2281# # check whether both subscriptions notify to the same path
2282# if existing_url != subscription_url:
2283# continue
2284# else:
2285# # iterate over all entities included in the subscription object
2286# for entity in subscription_subject["entities"]:
2287# if 'type' in entity.keys():
2288# subscription_type = entity['type']
2289# else:
2290# subscription_type = entity['typePattern']
2291# if 'id' in entity.keys():
2292# subscription_id = entity['id']
2293# else:
2294# subscription_id = entity["idPattern"]
2295# # iterate over all entities included in the exisiting
2296# subscriptions
2297# for existing_entity in existing_subscription["subject"][
2298# "entities"]:
2299# if "type" in entity.keys():
2300# type_existing = entity["type"]
2301# else:
2302# type_existing = entity["typePattern"]
2303# if "id" in entity.keys():
2304# id_existing = entity["id"]
2305# else:
2306# id_existing = entity["idPattern"]
2307# # as the ID field is non optional, it has to match
2308# # check whether the type match
2309# # if the type field is empty, they match all types
2310# if (type_existing == subscription_type) or\
2311# ('*' in subscription_type) or \
2312# ('*' in type_existing)\
2313# or (type_existing == "") or (
2314# subscription_type == ""):
2315# # check if on of the subscriptions is a pattern,
2316# or if they both refer to the same id
2317# # Get the attrs first, to avoid code duplication
2318# # last thing to compare is the attributes
2319# # Assumption -> position is the same as the
2320# entities _list
2321# # i == j
2322# i = subscription_subject["entities"].index(entity)
2323# j = existing_subscription["subject"][
2324# "entities"].index(existing_entity)
2325# try:
2326# subscription_attrs = subscription_subject[
2327# "condition"]["attrs"][i]
2328# except (KeyError, IndexError):
2329# subscription_attrs = []
2330# try:
2331# existing_attrs = existing_subscription[
2332# "subject"]["condition"]["attrs"][j]
2333# except (KeyError, IndexError):
2334# existing_attrs = []
2335#
2336# if (".*" in subscription_id) or ('.*' in
2337# id_existing) or (subscription_id == id_existing):
2338# # Attributes have to match, or the have to
2339# be an empty array
2340# if (subscription_attrs == existing_attrs) or
2341# (subscription_attrs == []) or (existing_attrs == []):
2342# exists = True
2343# # if they do not match completely or subscribe
2344# to all ids they have to match up to a certain position
2345# elif ("*" in subscription_id) or ('*' in
2346# id_existing):
2347# regex_existing = id_existing.find('*')
2348# regex_subscription =
2349# subscription_id.find('*')
2350# # slice the strings to compare
2351# if (id_existing[:regex_existing] in
2352# subscription_id) or (subscription_id[:regex_subscription] in id_existing) or \
2353# (id_existing[regex_existing:] in
2354# subscription_id) or (subscription_id[regex_subscription:] in id_existing):
2355# if (subscription_attrs ==
2356# existing_attrs) or (subscription_attrs == []) or (existing_attrs == []):
2357# exists = True
2358# else:
2359# continue
2360# else:
2361# continue
2362# else:
2363# continue
2364# else:
2365# continue
2366# else:
2367# continue
2368# return exists
2369#