Coverage for filip/clients/ngsi_v2/cb.py: 80%
680 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-05 11:07 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-05 11:07 +0000
1"""
2Context Broker Module for API Client
3"""
5from __future__ import annotations
7import copy
8from copy import deepcopy
9from enum import Enum
10from math import inf
11from packaging.version import parse as parse_version
12from pydantic import PositiveInt, PositiveFloat, AnyHttpUrl, ValidationError
13from pydantic.type_adapter import TypeAdapter
14from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union
15import re
16import requests
17from urllib.parse import urljoin
18import warnings
19from requests import RequestException
20from filip.clients.base_http_client import BaseHttpClient, NgsiURLVersion
21from filip.config import settings
22from filip.models.base import FiwareHeader, PaginationMethod, DataType
23from filip.utils.simple_ql import QueryString
24from filip.models.ngsi_v2.context import (
25 ActionType,
26 Command,
27 ContextEntity,
28 ContextEntityKeyValues,
29 ContextAttribute,
30 NamedCommand,
31 NamedContextAttribute,
32 Query,
33 Update,
34 PropertyFormat,
35 ContextEntityList,
36 ContextEntityKeyValuesList,
37 ContextEntityValidationList,
38 ContextEntityKeyValuesValidationList,
39)
40from filip.models.ngsi_v2.base import AttrsFormat
41from filip.models.ngsi_v2.subscriptions import Subscription, Message
42from filip.models.ngsi_v2.registrations import Registration
43from filip.clients.exceptions import BaseHttpClientException
45if TYPE_CHECKING:
46 from filip.clients.ngsi_v2.iota import IoTAClient
49class ContextBrokerClient(BaseHttpClient):
50 """
51 Implementation of NGSI Context Broker functionalities, such as creating
52 entities and subscriptions; retrieving, updating and deleting data.
53 Further documentation:
54 https://fiware-orion.readthedocs.io/en/master/
56 Api specifications for v2 are located here:
57 https://telefonicaid.github.io/fiware-orion/api/v2/stable/
59 Note:
60 We use the reference implementation for development. Therefore, some
61 other brokers may show slightly different behavior!
62 """
64 def __init__(
65 self,
66 url: str = None,
67 *,
68 session: requests.Session = None,
69 fiware_header: FiwareHeader = None,
70 **kwargs,
71 ):
72 """
74 Args:
75 url: Url of context broker server
76 session (requests.Session):
77 fiware_header (FiwareHeader): fiware service and fiware service path
78 **kwargs (Optional): Optional arguments that ``request`` takes.
79 """
80 # set service url
81 url = url or settings.CB_URL
82 self._url_version = NgsiURLVersion.v2_url.value
83 super().__init__(
84 url=url, session=session, fiware_header=fiware_header, **kwargs
85 )
87 def __pagination(
88 self,
89 *,
90 method: PaginationMethod = PaginationMethod.GET,
91 url: str,
92 headers: Dict,
93 limit: Union[PositiveInt, PositiveFloat] = None,
94 params: Dict = None,
95 data: str = None,
96 ) -> List[Dict]:
97 """
98 NGSIv2 implements a pagination mechanism in order to help clients to
99 retrieve large sets of resources. This mechanism works for all listing
100 operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions,
101 POST /v2/op/query, etc.). This function helps getting datasets that are
102 larger than the limit for the different GET operations.
104 https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html
106 Args:
107 url: Information about the url, obtained from the original function
108 headers: The headers from the original function
109 params:
110 limit:
112 Returns:
113 object:
115 """
116 if limit is None:
117 limit = inf
118 if limit > 1000:
119 params["limit"] = 1000 # maximum items per request
120 else:
121 params["limit"] = limit
123 if self.session:
124 session = self.session
125 else:
126 session = requests.Session()
127 with session:
128 res = session.request(
129 method=method, url=url, params=params, headers=headers, data=data
130 )
131 if res.ok:
132 items = res.json()
133 # do pagination
134 count = int(res.headers["Fiware-Total-Count"])
136 while len(items) < limit and len(items) < count:
137 # Establishing the offset from where entities are retrieved
138 params["offset"] = len(items)
139 params["limit"] = min(1000, (limit - len(items)))
140 res = session.request(
141 method=method,
142 url=url,
143 params=params,
144 headers=headers,
145 data=data,
146 )
147 if res.ok:
148 items.extend(res.json())
149 else:
150 res.raise_for_status()
151 self.logger.debug("Received: %s", items)
152 return items
153 res.raise_for_status()
155 # MANAGEMENT API
156 def get_version(self) -> Dict:
157 """
158 Gets version of IoT Agent
159 Returns:
160 Dictionary with response
161 """
162 url = urljoin(self.base_url, "version")
163 try:
164 res = self.get(url=url, headers=self.headers)
165 if res.ok:
166 return res.json()
167 res.raise_for_status()
168 except requests.RequestException as err:
169 self.logger.error(err)
170 raise RequestException(response=err.response) from err
172 def get_resources(self) -> Dict:
173 """
174 Gets reo
176 Returns:
177 Dict
178 """
179 url = urljoin(self.base_url, self._url_version)
180 try:
181 res = self.get(url=url, headers=self.headers)
182 if res.ok:
183 return res.json()
184 res.raise_for_status()
185 except requests.RequestException as err:
186 self.logger.error(err)
187 raise RequestException(response=err.response) from err
189 # STATISTICS API
190 def get_statistics(self) -> Dict:
191 """
192 Gets statistics of context broker
193 Returns:
194 Dictionary with response
195 """
196 url = urljoin(self.base_url, "statistics")
197 try:
198 res = self.get(url=url, headers=self.headers)
199 if res.ok:
200 return res.json()
201 res.raise_for_status()
202 except requests.RequestException as err:
203 self.logger.error(err)
204 raise BaseHttpClientException(
205 message=err.response.text, response=err.response
206 ) from err
208 # CONTEXT MANAGEMENT API ENDPOINTS
209 # Entity Operations
210 def post_entity(
211 self,
212 entity: Union[ContextEntity, ContextEntityKeyValues],
213 update: bool = False,
214 patch: bool = False,
215 override_metadata: bool = True,
216 key_values: bool = False,
217 ):
218 """
219 Function registers an Object with the NGSI Context Broker,
220 if it already exists it can be automatically updated (overwritten)
221 if the update bool is True.
222 First a post request with the entity is tried, if the response code
223 is 422 the entity is uncrossable, as it already exists there are two
224 options, either overwrite it, if the attribute have changed
225 (e.g. at least one new/new values) (update = True) or leave
226 it the way it is (update=False)
227 If you only want to manipulate the entities values, you need to set
228 patch argument.
230 Args:
231 entity (ContextEntity/ContextEntityKeyValues):
232 Context Entity Object
233 update (bool):
234 If the response.status_code is 422, whether the override and
235 existing entity
236 patch (bool):
237 If the response.status_code is 422, whether to manipulate the
238 existing entity. Omitted if update `True`.
239 override_metadata:
240 Only applies for patch equal to `True`.
241 Whether to override or append the attribute's metadata.
242 `True` for overwrite or `False` for update/append
243 key_values(bool):
244 By default False. If set to True, "options=keyValues" will
245 be included in params of post request. The payload uses
246 the keyValues simplified entity representation, i.e.
247 ContextEntityKeyValues.
248 """
249 url = urljoin(self.base_url, f"{self._url_version}/entities")
250 headers = self.headers.copy()
251 params = {}
252 options = []
253 if key_values:
254 assert isinstance(entity, ContextEntityKeyValues)
255 options.append("keyValues")
256 else:
257 assert isinstance(entity, ContextEntity)
258 if options:
259 params.update({"options": ",".join(options)})
260 try:
261 res = self.post(
262 url=url,
263 headers=headers,
264 json=entity.model_dump(exclude_none=True),
265 params=params,
266 )
267 if res.ok:
268 self.logger.info("Entity successfully posted!")
269 return res.headers.get("Location")
270 res.raise_for_status()
271 except requests.RequestException as err:
272 if err.response is not None:
273 if update and err.response.status_code == 422:
274 return self.override_entity(entity=entity, key_values=key_values)
275 if patch and err.response.status_code == 422:
276 return self.patch_entity(
277 entity=entity,
278 override_metadata=override_metadata,
279 key_values=key_values,
280 )
281 msg = f"Could not post entity {entity.id}"
282 raise BaseHttpClientException(message=msg, response=err.response) from err
284 def get_entity_list(
285 self,
286 *,
287 entity_ids: List[str] = None,
288 entity_types: List[str] = None,
289 id_pattern: str = None,
290 type_pattern: str = None,
291 q: Union[str, QueryString] = None,
292 mq: Union[str, QueryString] = None,
293 georel: str = None,
294 geometry: str = None,
295 coords: str = None,
296 limit: PositiveInt = inf,
297 attrs: List[str] = None,
298 metadata: str = None,
299 order_by: str = None,
300 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
301 include_invalid: bool = False,
302 ) -> Union[
303 List[Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]],
304 ContextEntityValidationList,
305 ContextEntityKeyValuesValidationList,
306 ]:
307 r"""
308 Retrieves a list of context entities that match different criteria by
309 id, type, pattern matching (either id or type) and/or those which
310 match a query or geographical query (see Simple Query Language and
311 Geographical Queries). A given entity has to match all the criteria
312 to be retrieved (i.e., the criteria is combined in a logical AND
313 way). Note that pattern matching query parameters are incompatible
314 (i.e. mutually exclusive) with their corresponding exact matching
315 parameters, i.e. idPattern with id and typePattern with type.
317 Args:
318 entity_ids: A comma-separated list of elements. Retrieve entities
319 whose ID matches one of the elements in the list.
320 Incompatible with idPattern,e.g. Boe_Idarium
321 entity_types: comma-separated list of elements. Retrieve entities
322 whose type matches one of the elements in the list.
323 Incompatible with typePattern. Example: Room.
324 id_pattern: A correctly formatted regular expression. Retrieve
325 entities whose ID matches the regular expression. Incompatible
326 with id, e.g. ngsi-ld.* or sensor.*
327 type_pattern: A correctly formatted regular expression. Retrieve
328 entities whose type matches the regular expression.
329 Incompatible with type, e.g. room.*
330 q (SimpleQuery): A query expression, composed of a list of
331 statements separated by ;, i.e.,
332 q=statement1;statement2;statement3. See Simple Query
333 Language specification. Example: temperature>40.
334 mq (SimpleQuery): A query expression for attribute metadata,
335 composed of a list of statements separated by ;, i.e.,
336 mq=statement1;statement2;statement3. See Simple Query
337 Language specification. Example: temperature.accuracy<0.9.
338 georel: Spatial relationship between matching entities and a
339 reference shape. See Geographical Queries. Example: 'near'.
340 geometry: Geographical area to which the query is restricted.
341 See Geographical Queries. Example: point.
342 coords: List of latitude-longitude pairs of coordinates separated
343 by ';'. See Geographical Queries. Example: 41.390205,
344 2.154007;48.8566,2.3522.
345 limit: Limits the number of entities to be retrieved Example: 20
346 attrs: Comma-separated list of attribute names whose data are to
347 be included in the response. The attributes are retrieved in
348 the order specified by this parameter. If this parameter is
349 not included, the attributes are retrieved in arbitrary
350 order. See "Filtering out attributes and metadata" section
351 for more detail. Example: seatNumber.
352 metadata: A list of metadata names to include in the response.
353 See "Filtering out attributes and metadata" section for more
354 detail. Example: accuracy.
355 order_by: Criteria for ordering results. See "Ordering Results"
356 section for details. Example: temperature,!speed.
357 response_format (AttrsFormat, str): Response Format. Note: That if
358 'keyValues' or 'values' are used the response model will
359 change to List[ContextEntityKeyValues] and to List[Dict[str,
360 Any]], respectively.
361 include_invalid: Specify if the returned list should also contain a list of invalid entity IDs or not.
362 Returns:
364 """
365 url = urljoin(self.base_url, f"{self._url_version}/entities/")
366 headers = self.headers.copy()
367 params = {}
369 if entity_ids and id_pattern:
370 raise ValueError
371 if entity_types and type_pattern:
372 raise ValueError
373 if entity_ids:
374 if not isinstance(entity_ids, list):
375 entity_ids = [entity_ids]
376 params.update({"id": ",".join(entity_ids)})
377 if id_pattern:
378 try:
379 re.compile(id_pattern)
380 except re.error as err:
381 raise ValueError(f"Invalid Pattern: {err}") from err
382 params.update({"idPattern": id_pattern})
383 if entity_types:
384 if not isinstance(entity_types, list):
385 entity_types = [entity_types]
386 params.update({"type": ",".join(entity_types)})
387 if type_pattern:
388 try:
389 re.compile(type_pattern)
390 except re.error as err:
391 raise ValueError(f"Invalid Pattern: {err.msg}") from err
392 params.update({"typePattern": type_pattern})
393 if attrs:
394 params.update({"attrs": ",".join(attrs)})
395 if metadata:
396 params.update({"metadata": ",".join(metadata)})
397 if q:
398 if isinstance(q, str):
399 q = QueryString.parse_str(q)
400 params.update({"q": str(q)})
401 if mq:
402 params.update({"mq": str(mq)})
403 if geometry:
404 params.update({"geometry": geometry})
405 if georel:
406 params.update({"georel": georel})
407 if coords:
408 params.update({"coords": coords})
409 if order_by:
410 params.update({"orderBy": order_by})
411 if response_format not in list(AttrsFormat):
412 raise ValueError(f"Value must be in {list(AttrsFormat)}")
413 response_format = ",".join(["count", response_format])
414 params.update({"options": response_format})
415 try:
416 items = self.__pagination(
417 method=PaginationMethod.GET,
418 limit=limit,
419 url=url,
420 params=params,
421 headers=headers,
422 )
423 if include_invalid:
424 valid_entities = []
425 invalid_entities = []
427 if AttrsFormat.NORMALIZED in response_format:
428 adapter = TypeAdapter(ContextEntity)
430 for entity in items:
431 try:
432 valid_entity = adapter.validate_python(entity)
433 valid_entities.append(valid_entity)
434 except ValidationError:
435 invalid_entities.append(entity.get("id"))
437 return ContextEntityValidationList.model_validate(
438 {
439 "entities": valid_entities,
440 "invalid_entities": invalid_entities,
441 }
442 )
443 elif AttrsFormat.KEY_VALUES in response_format:
444 adapter = TypeAdapter(ContextEntityKeyValues)
446 for entity in items:
447 try:
448 valid_entity = adapter.validate_python(entity)
449 valid_entities.append(valid_entity)
450 except ValidationError:
451 invalid_entities.append(entity.get("id"))
453 return ContextEntityKeyValuesValidationList.model_validate(
454 {
455 "entities": valid_entities,
456 "invalid_entities": invalid_entities,
457 }
458 )
459 else:
460 return items
461 else:
462 if AttrsFormat.NORMALIZED in response_format:
463 return ContextEntityList.model_validate(
464 {"entities": items}
465 ).entities
466 elif AttrsFormat.KEY_VALUES in response_format:
467 return ContextEntityKeyValuesList.model_validate(
468 {"entities": items}
469 ).entities
470 return items # in case of VALUES as response_format
472 except requests.RequestException as err:
473 msg = "Could not load entities"
474 raise BaseHttpClientException(message=msg, response=err.response) from err
476 def get_entity(
477 self,
478 entity_id: str,
479 entity_type: str = None,
480 attrs: List[str] = None,
481 metadata: List[str] = None,
482 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
483 ) -> Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]:
484 """
485 This operation must return one entity element only, but there may be
486 more than one entity with the same ID (e.g. entities with same ID but
487 different types). In such case, an error message is returned, with
488 the HTTP status code set to 409 Conflict.
490 Args:
491 entity_id (String): Id of the entity to be retrieved
492 entity_type (String): Entity type, to avoid ambiguity in case
493 there are several entities with the same entity id.
494 attrs (List of Strings): List of attribute names whose data must be
495 included in the response. The attributes are retrieved in the
496 order specified by this parameter.
497 See "Filtering out attributes and metadata" section for more
498 detail. If this parameter is not included, the attributes are
499 retrieved in arbitrary order, and all the attributes of the
500 entity are included in the response.
501 Example: temperature, humidity.
502 metadata (List of Strings): A list of metadata names to include in
503 the response. See "Filtering out attributes and metadata"
504 section for more detail. Example: accuracy.
505 response_format (AttrsFormat, str): Representation format of
506 response
507 Returns:
508 ContextEntity
509 """
510 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}")
511 headers = self.headers.copy()
512 params = {}
513 if entity_type:
514 params.update({"type": entity_type})
515 if attrs:
516 params.update({"attrs": ",".join(attrs)})
517 if metadata:
518 params.update({"metadata": ",".join(metadata)})
519 if response_format not in list(AttrsFormat):
520 raise ValueError(f"Value must be in {list(AttrsFormat)}")
521 params.update({"options": response_format})
523 try:
524 res = self.get(url=url, params=params, headers=headers)
525 if res.ok:
526 self.logger.info("Entity successfully retrieved!")
527 self.logger.debug("Received: %s", res.json())
528 if response_format == AttrsFormat.NORMALIZED:
529 return ContextEntity(**res.json())
530 if response_format == AttrsFormat.KEY_VALUES:
531 return ContextEntityKeyValues(**res.json())
532 return res.json()
533 res.raise_for_status()
534 except requests.RequestException as err:
535 msg = f"Could not load entity {entity_id}"
536 raise BaseHttpClientException(message=msg, response=err.response) from err
538 def get_entity_attributes(
539 self,
540 entity_id: str,
541 entity_type: str = None,
542 attrs: List[str] = None,
543 metadata: List[str] = None,
544 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
545 ) -> Dict[str, ContextAttribute]:
546 """
547 This request is similar to retrieving the whole entity, however this
548 one omits the id and type fields. Just like the general request of
549 getting an entire entity, this operation must return only one entity
550 element. If more than one entity with the same ID is found (e.g.
551 entities with same ID but different type), an error message is
552 returned, with the HTTP status code set to 409 Conflict.
554 Args:
555 entity_id (String): Id of the entity to be retrieved
556 entity_type (String): Entity type, to avoid ambiguity in case
557 there are several entities with the same entity id.
558 attrs (List of Strings): List of attribute names whose data must be
559 included in the response. The attributes are retrieved in the
560 order specified by this parameter.
561 See "Filtering out attributes and metadata" section for more
562 detail. If this parameter is not included, the attributes are
563 retrieved in arbitrary order, and all the attributes of the
564 entity are included in the response. Example: temperature,
565 humidity.
566 metadata (List of Strings): A list of metadata names to include in
567 the response. See "Filtering out attributes and metadata"
568 section for more detail. Example: accuracy.
569 response_format (AttrsFormat, str): Representation format of
570 response
571 Returns:
572 Dict
573 """
574 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
575 headers = self.headers.copy()
576 params = {}
577 if entity_type:
578 params.update({"type": entity_type})
579 if attrs:
580 params.update({"attrs": ",".join(attrs)})
581 if metadata:
582 params.update({"metadata": ",".join(metadata)})
583 if response_format not in list(AttrsFormat):
584 raise ValueError(f"Value must be in {list(AttrsFormat)}")
585 params.update({"options": response_format})
586 try:
587 res = self.get(url=url, params=params, headers=headers)
588 if res.ok:
589 if response_format == AttrsFormat.NORMALIZED:
590 return {
591 key: ContextAttribute(**values)
592 for key, values in res.json().items()
593 }
594 return res.json()
595 res.raise_for_status()
596 except requests.RequestException as err:
597 msg = f"Could not load attributes from entity {entity_id} !"
598 raise BaseHttpClientException(message=msg, response=err.response) from err
600 def update_entity(
601 self,
602 entity: Union[ContextEntity, ContextEntityKeyValues, dict],
603 append_strict: bool = False,
604 key_values: bool = False,
605 ):
606 """
607 The request payload is an object representing the attributes to
608 append or update.
610 Note:
611 Update means overwriting the existing entity. If you want to
612 manipulate you should rather use patch_entity.
614 Args:
615 entity (ContextEntity):
616 append_strict: If `False` the entity attributes are updated (if they
617 previously exist) or appended (if they don't previously exist)
618 with the ones in the payload.
619 If `True` all the attributes in the payload not
620 previously existing in the entity are appended. In addition
621 to that, in case some of the attributes in the payload
622 already exist in the entity, an error is returned.
623 More precisely this means a strict append procedure.
624 key_values: By default False. If set to True, the payload uses
625 the keyValues simplified entity representation, i.e.
626 ContextEntityKeyValues.
627 Returns:
628 None
629 """
630 if key_values:
631 if isinstance(entity, dict):
632 entity = copy.deepcopy(entity)
633 _id = entity.pop("id")
634 _type = entity.pop("type")
635 attrs = entity
636 entity = ContextEntityKeyValues(id=_id, type=_type)
637 else:
638 attrs = entity.model_dump(exclude={"id", "type"})
639 else:
640 attrs = entity.get_attributes()
641 self.update_or_append_entity_attributes(
642 entity_id=entity.id,
643 entity_type=entity.type,
644 attrs=attrs,
645 append_strict=append_strict,
646 key_values=key_values,
647 )
649 def update_entity_properties(
650 self, entity: ContextEntity, append_strict: bool = False
651 ):
652 """
653 The request payload is an object representing the attributes, of any type
654 but Relationship, to append or update.
656 Note:
657 Update means overwriting the existing entity. If you want to
658 manipulate you should rather use patch_entity.
660 Args:
661 entity (ContextEntity):
662 append_strict: If `False` the entity attributes are updated (if they
663 previously exist) or appended (if they don't previously exist)
664 with the ones in the payload.
665 If `True` all the attributes in the payload not
666 previously existing in the entity are appended. In addition
667 to that, in case some of the attributes in the payload
668 already exist in the entity, an error is returned.
669 More precisely this means a strict append procedure.
671 Returns:
672 None
673 """
674 self.update_or_append_entity_attributes(
675 entity_id=entity.id,
676 entity_type=entity.type,
677 attrs=entity.get_properties(),
678 append_strict=append_strict,
679 )
681 def update_entity_relationships(
682 self, entity: ContextEntity, append_strict: bool = False
683 ):
684 """
685 The request payload is an object representing only the attributes, of type
686 Relationship, to append or update.
688 Note:
689 Update means overwriting the existing entity. If you want to
690 manipulate you should rather use patch_entity.
692 Args:
693 entity (ContextEntity):
694 append_strict: If `False` the entity attributes are updated (if they
695 previously exist) or appended (if they don't previously exist)
696 with the ones in the payload.
697 If `True` all the attributes in the payload not
698 previously existing in the entity are appended. In addition
699 to that, in case some of the attributes in the payload
700 already exist in the entity, an error is returned.
701 More precisely this means a strict append procedure.
703 Returns:
704 None
705 """
706 self.update_or_append_entity_attributes(
707 entity_id=entity.id,
708 entity_type=entity.type,
709 attrs=entity.get_relationships(),
710 append_strict=append_strict,
711 )
713 def delete_entity(
714 self,
715 entity_id: str,
716 entity_type: str = None,
717 delete_devices: bool = False,
718 iota_client: IoTAClient = None,
719 iota_url: AnyHttpUrl = settings.IOTA_URL,
720 ) -> None:
721 """
722 Remove a entity from the context broker. No payload is required
723 or received.
725 Args:
726 entity_id:
727 Id of the entity to be deleted
728 entity_type:
729 Entity type, to avoid ambiguity in case there are several
730 entities with the same entity id.
731 delete_devices:
732 If True, also delete all devices that reference this
733 entity (entity_id as entity_name)
734 iota_client:
735 Corresponding IoTA-Client used to access IoTA-Agent
736 iota_url:
737 URL of the corresponding IoT-Agent. This will autogenerate
738 an IoTA-Client, mirroring the information of the
739 ContextBrokerClient, e.g. FiwareHeader, and other headers
741 Returns:
742 None
743 """
744 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}")
745 headers = self.headers.copy()
746 if entity_type:
747 params = {"type": entity_type}
748 else:
749 params = None
750 try:
751 res = self.delete(url=url, params=params, headers=headers)
752 if res.ok:
753 self.logger.info("Entity '%s' successfully deleted!", entity_id)
754 else:
755 res.raise_for_status()
756 except requests.RequestException as err:
757 msg = f"Could not delete entity {entity_id} !"
758 raise BaseHttpClientException(message=msg, response=err.response) from err
760 if delete_devices:
761 from filip.clients.ngsi_v2 import IoTAClient
763 if iota_client:
764 iota_client_local = deepcopy(iota_client)
765 else:
766 warnings.warn(
767 "No IoTA-Client object provided! "
768 "Will try to generate one. "
769 "This usage is not recommended."
770 )
772 iota_client_local = IoTAClient(
773 url=iota_url,
774 fiware_header=self.fiware_headers,
775 headers=self.headers,
776 )
778 for device in iota_client_local.get_device_list(entity_names=[entity_id]):
779 if entity_type:
780 if device.entity_type == entity_type:
781 iota_client_local.delete_device(device_id=device.device_id)
782 else:
783 iota_client_local.delete_device(device_id=device.device_id)
784 iota_client_local.close()
786 def delete_entities(self, entities: List[ContextEntity]) -> None:
787 """
788 Remove a list of entities from the context broker. This methode is
789 more efficient than to call delete_entity() for each entity
791 Args:
792 entities: List[ContextEntity]: List of entities to be deleted
794 Raises:
795 Exception, if one of the entities is not in the ContextBroker
797 Returns:
798 None
799 """
801 # update() delete, deletes all entities without attributes completely,
802 # and removes the attributes for the other
803 # The entities are sorted based on the fact if they have
804 # attributes.
805 limit = 1000 # max number of entities that will be deleted at once
806 entities_with_attributes: List[ContextEntity] = []
807 for entity in entities:
808 attribute_names = [
809 key
810 for key in entity.model_dump()
811 if key not in ContextEntity.model_fields
812 ]
813 if len(attribute_names) > 0:
814 entities_with_attributes.append(
815 ContextEntity(id=entity.id, type=entity.type)
816 )
818 # Post update_delete for those without attribute only once,
819 # for the other post update_delete again but for the changed entity
820 # in the ContextBroker (only id and type left)
821 while len(entities) > 0:
822 self.update(entities=entities[0:limit], action_type="delete")
823 entities = entities[limit:]
824 while len(entities_with_attributes) > 0:
825 self.update(
826 entities=entities_with_attributes[0:limit], action_type="delete"
827 )
828 entities_with_attributes = entities_with_attributes[limit:]
830 def update_or_append_entity_attributes(
831 self,
832 entity_id: str,
833 attrs: Union[
834 List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any]
835 ],
836 entity_type: str = None,
837 append_strict: bool = False,
838 forcedUpdate: bool = False,
839 key_values: bool = False,
840 ):
841 """
842 The request payload is an object representing the attributes to
843 append or update. This corresponds to a 'POST' request if append is
844 set to 'False'
846 Note:
847 Be careful not to update attributes that are
848 provided via context registration, e.g. commands. Commands are
849 removed before sending the request. To avoid breaking things.
851 Args:
852 entity_id: Entity id to be updated
853 entity_type: Entity type, to avoid ambiguity in case there are
854 several entities with the same entity id.
855 attrs: List of attributes to update or to append
856 append_strict: If `False` the entity attributes are updated (if they
857 previously exist) or appended (if they don't previously exist)
858 with the ones in the payload.
859 If `True` all the attributes in the payload not
860 previously existing in the entity are appended. In addition
861 to that, in case some of the attributes in the payload
862 already exist in the entity, an error is returned.
863 More precisely this means a strict append procedure.
864 forcedUpdate: Update operation have to trigger any matching
865 subscription, no matter if there is an actual attribute
866 update or no instead of the default behavior, which is to
867 updated only if attribute is effectively updated.
868 key_values: By default False. If set to True, the payload uses
869 the keyValues simplified entity representation, i.e.
870 ContextEntityKeyValues.
871 Returns:
872 None
874 """
875 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
876 headers = self.headers.copy()
877 params = {}
878 if entity_type:
879 params.update({"type": entity_type})
880 else:
881 entity_type = "dummy"
883 options = []
884 if append_strict:
885 options.append("append")
886 if forcedUpdate:
887 options.append("forcedUpdate")
888 if key_values:
889 assert isinstance(attrs, dict), "for keyValues attrs has to be a dict"
890 options.append("keyValues")
891 if options:
892 params.update({"options": ",".join(options)})
894 if key_values:
895 entity = ContextEntityKeyValues(id=entity_id, type=entity_type, **attrs)
896 else:
897 entity = ContextEntity(id=entity_id, type=entity_type)
898 entity.add_attributes(attrs)
899 # exclude commands from the send data,
900 # as they live in the IoTA-agent
901 excluded_keys = {"id", "type"}
902 # excluded_keys.update(
903 # entity.get_commands(response_format=PropertyFormat.DICT).keys()
904 # )
905 try:
906 res = self.post(
907 url=url,
908 headers=headers,
909 json=entity.model_dump(exclude=excluded_keys, exclude_none=True),
910 params=params,
911 )
912 if res.ok:
913 self.logger.info("Entity '%s' successfully " "updated!", entity.id)
914 else:
915 res.raise_for_status()
916 except requests.RequestException as err:
917 msg = f"Could not update or append attributes of entity" f" {entity.id} !"
918 self.log_error(err=err, msg=msg)
919 raise BaseHttpClientException(message=msg, response=err.response) from err
921 def update_existing_entity_attributes(
922 self,
923 entity_id: str,
924 attrs: Union[
925 List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any]
926 ],
927 entity_type: str = None,
928 forcedUpdate: bool = False,
929 override_metadata: bool = False,
930 key_values: bool = False,
931 ):
932 """
933 The entity attributes are updated with the ones in the payload.
934 In addition to that, if one or more attributes in the payload doesn't
935 exist in the entity, an error is returned. This corresponds to a
936 'PATCH' request.
938 Args:
939 entity_id: Entity id to be updated
940 entity_type: Entity type, to avoid ambiguity in case there are
941 several entities with the same entity id.
942 attrs: List of attributes to update or to append
943 forcedUpdate: Update operation have to trigger any matching
944 subscription, no matter if there is an actual attribute
945 update or no instead of the default behavior, which is to
946 updated only if attribute is effectively updated.
947 override_metadata:
948 Bool,replace the existing metadata with the one provided in
949 the request
950 key_values: By default False. If set to True, the payload uses
951 the keyValues simplified entity representation, i.e.
952 ContextEntityKeyValues.
953 Returns:
954 None
956 """
957 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
958 headers = self.headers.copy()
959 if entity_type:
960 params = {"type": entity_type}
961 else:
962 params = None
963 entity_type = "dummy"
965 options = []
966 if override_metadata:
967 options.append("overrideMetadata")
968 if forcedUpdate:
969 options.append("forcedUpdate")
970 if key_values:
971 assert isinstance(attrs, dict), "for keyValues the attrs must be dict"
972 payload = attrs
973 options.append("keyValues")
974 else:
975 entity = ContextEntity(id=entity_id, type=entity_type)
976 entity.add_attributes(attrs)
977 payload = entity.model_dump(exclude={"id", "type"}, exclude_none=True)
978 if options:
979 params.update({"options": ",".join(options)})
981 try:
982 res = self.patch(
983 url=url,
984 headers=headers,
985 json=payload,
986 params=params,
987 )
988 if res.ok:
989 self.logger.info("Entity '%s' successfully " "updated!", entity_id)
990 else:
991 res.raise_for_status()
992 except requests.RequestException as err:
993 msg = f"Could not update attributes of entity"
994 raise BaseHttpClientException(message=msg, response=err.response) from err
996 def override_entity(
997 self, entity: Union[ContextEntity, ContextEntityKeyValues], **kwargs
998 ):
999 """
1000 The request payload is an object representing the attributes to
1001 override the existing entity.
1003 Note:
1004 If you want to manipulate you should rather use patch_entity.
1006 Args:
1007 entity (ContextEntity or ContextEntityKeyValues):
1008 Returns:
1009 None
1010 """
1011 return self.replace_entity_attributes(
1012 entity_id=entity.id,
1013 entity_type=entity.type,
1014 attrs=entity.get_attributes(),
1015 **kwargs,
1016 )
1018 def replace_entity_attributes(
1019 self,
1020 entity_id: str,
1021 attrs: Union[
1022 List[Union[NamedContextAttribute, Dict[str, ContextAttribute]]], Dict
1023 ],
1024 entity_type: str = None,
1025 forcedUpdate: bool = False,
1026 key_values: bool = False,
1027 ):
1028 """
1029 The attributes previously existing in the entity are removed and
1030 replaced by the ones in the request. This corresponds to a 'PUT'
1031 request.
1033 Args:
1034 entity_id: Entity id to be updated
1035 entity_type: Entity type, to avoid ambiguity in case there are
1036 several entities with the same entity id.
1037 attrs: List of attributes to add to the entity or dict of
1038 attributes in case of key_values=True.
1039 forcedUpdate: Update operation have to trigger any matching
1040 subscription, no matter if there is an actual attribute
1041 update or no instead of the default behavior, which is to
1042 updated only if attribute is effectively updated.
1043 key_values(bool):
1044 By default False. If set to True, "options=keyValues" will
1045 be included in params of the request. The payload uses
1046 the keyValues simplified entity representation, i.e.
1047 ContextEntityKeyValues.
1048 Returns:
1049 None
1050 """
1051 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
1052 headers = self.headers.copy()
1053 params = {}
1054 options = []
1055 if entity_type:
1056 params.update({"type": entity_type})
1057 else:
1058 entity_type = "dummy"
1060 if forcedUpdate:
1061 options.append("forcedUpdate")
1063 if key_values:
1064 options.append("keyValues")
1065 assert isinstance(attrs, dict)
1066 else:
1067 entity = ContextEntity(id=entity_id, type=entity_type)
1068 entity.add_attributes(attrs)
1069 attrs = entity.model_dump(exclude={"id", "type"}, exclude_none=True)
1070 if options:
1071 params.update({"options": ",".join(options)})
1073 try:
1074 res = self.put(
1075 url=url,
1076 headers=headers,
1077 json=attrs,
1078 params=params,
1079 )
1080 if res.ok:
1081 self.logger.info("Entity '%s' successfully " "updated!", entity_id)
1082 else:
1083 res.raise_for_status()
1084 except requests.RequestException as err:
1085 msg = f"Could not replace attribute of entity {entity_id} !"
1086 raise BaseHttpClientException(message=msg, response=err.response) from err
1088 # Attribute operations
1089 def get_attribute(
1090 self,
1091 entity_id: str,
1092 attr_name: str,
1093 entity_type: str = None,
1094 metadata: str = None,
1095 response_format="",
1096 ) -> ContextAttribute:
1097 """
1098 Retrieves a specified attribute from an entity.
1100 Args:
1101 entity_id: Id of the entity. Example: Bcn_Welt
1102 attr_name: Name of the attribute to be retrieved.
1103 entity_type (Optional): Type of the entity to retrieve
1104 metadata (Optional): A list of metadata names to include in the
1105 response. See "Filtering out attributes and metadata" section
1106 for more detail.
1108 Returns:
1109 The content of the retrieved attribute as ContextAttribute
1111 Raises:
1112 Error
1114 """
1115 url = urljoin(
1116 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
1117 )
1118 headers = self.headers.copy()
1119 params = {}
1120 if entity_type:
1121 params.update({"type": entity_type})
1122 if metadata:
1123 params.update({"metadata": ",".join(metadata)})
1124 try:
1125 res = self.get(url=url, params=params, headers=headers)
1126 if res.ok:
1127 self.logger.debug("Received: %s", res.json())
1128 return ContextAttribute(**res.json())
1129 res.raise_for_status()
1130 except requests.RequestException as err:
1131 msg = (
1132 f"Could not load attribute '{attr_name}' from entity" f"'{entity_id}' "
1133 )
1134 raise BaseHttpClientException(message=msg, response=err.response) from err
1136 def update_entity_attribute(
1137 self,
1138 entity_id: str,
1139 attr: Union[ContextAttribute, NamedContextAttribute],
1140 *,
1141 entity_type: str = None,
1142 attr_name: str = None,
1143 override_metadata: bool = True,
1144 forcedUpdate: bool = False,
1145 ):
1146 """
1147 Updates a specified attribute from an entity.
1149 Args:
1150 attr:
1151 context attribute to update
1152 entity_id:
1153 Id of the entity. Example: Bcn_Welt
1154 entity_type:
1155 Entity type, to avoid ambiguity in case there are
1156 several entities with the same entity id.
1157 forcedUpdate: Update operation have to trigger any matching
1158 subscription, no matter if there is an actual attribute
1159 update or no instead of the default behavior, which is to
1160 updated only if attribute is effectively updated.
1161 attr_name:
1162 Name of the attribute to be updated.
1163 override_metadata:
1164 Bool, if set to `True` (default) the metadata will be
1165 overwritten. This is for backwards compatibility reasons.
1166 If `False` the metadata values will be either updated if
1167 already existing or append if not.
1168 See also:
1169 https://fiware-orion.readthedocs.io/en/master/user/metadata.html
1170 """
1171 headers = self.headers.copy()
1172 if not isinstance(attr, NamedContextAttribute):
1173 assert attr_name is not None, (
1174 "Missing name for attribute. "
1175 "attr_name must be present if"
1176 "attr is of type ContextAttribute"
1177 )
1178 else:
1179 assert attr_name is None, (
1180 "Invalid argument attr_name. Do not set "
1181 "attr_name if attr is of type "
1182 "NamedContextAttribute"
1183 )
1184 attr_name = attr.name
1186 url = urljoin(
1187 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
1188 )
1189 params = {}
1190 if entity_type:
1191 params.update({"type": entity_type})
1192 # set overrideMetadata option (we assure backwards compatibility here)
1193 options = []
1194 if override_metadata:
1195 options.append("overrideMetadata")
1196 if forcedUpdate:
1197 options.append("forcedUpdate")
1198 if options:
1199 params.update({"options": ",".join(options)})
1200 try:
1201 res = self.put(
1202 url=url,
1203 headers=headers,
1204 params=params,
1205 json=attr.model_dump(exclude={"name"}, exclude_none=True),
1206 )
1207 if res.ok:
1208 self.logger.info(
1209 "Attribute '%s' of '%s' " "successfully updated!",
1210 attr_name,
1211 entity_id,
1212 )
1213 else:
1214 res.raise_for_status()
1215 except requests.RequestException as err:
1216 msg = (
1217 f"Could not update attribute '{attr_name}' of entity" f"'{entity_id}' "
1218 )
1219 raise BaseHttpClientException(message=msg, response=err.response) from err
1221 def delete_entity_attribute(
1222 self, entity_id: str, attr_name: str, entity_type: str = None
1223 ) -> None:
1224 """
1225 Removes a specified attribute from an entity.
1227 Args:
1228 entity_id: Id of the entity.
1229 attr_name: Name of the attribute to be retrieved.
1230 entity_type: Entity type, to avoid ambiguity in case there are
1231 several entities with the same entity id.
1232 Raises:
1233 Error
1235 """
1236 url = urljoin(
1237 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
1238 )
1239 headers = self.headers.copy()
1240 params = {}
1241 if entity_type:
1242 params.update({"type": entity_type})
1243 try:
1244 res = self.delete(url=url, headers=headers)
1245 if res.ok:
1246 self.logger.info(
1247 "Attribute '%s' of '%s' " "successfully deleted!",
1248 attr_name,
1249 entity_id,
1250 )
1251 else:
1252 res.raise_for_status()
1253 except requests.RequestException as err:
1254 msg = f"Could not delete attribute '{attr_name}' of entity '{entity_id}'"
1255 raise BaseHttpClientException(message=msg, response=err.response) from err
1257 # Attribute value operations
1258 def get_attribute_value(
1259 self, entity_id: str, attr_name: str, entity_type: str = None
1260 ) -> Any:
1261 """
1262 This operation returns the value property with the value of the
1263 attribute.
1265 Args:
1266 entity_id: Id of the entity. Example: Bcn_Welt
1267 attr_name: Name of the attribute to be retrieved.
1268 Example: temperature.
1269 entity_type: Entity type, to avoid ambiguity in case there are
1270 several entities with the same entity id.
1272 Returns:
1274 """
1275 url = urljoin(
1276 self.base_url,
1277 f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value",
1278 )
1279 headers = self.headers.copy()
1280 params = {}
1281 if entity_type:
1282 params.update({"type": entity_type})
1283 try:
1284 res = self.get(url=url, params=params, headers=headers)
1285 if res.ok:
1286 self.logger.debug("Received: %s", res.json())
1287 return res.json()
1288 res.raise_for_status()
1289 except requests.RequestException as err:
1290 msg = (
1291 f"Could not load value of attribute '{attr_name}' from "
1292 f"entity'{entity_id}' "
1293 )
1294 raise BaseHttpClientException(message=msg, response=err.response) from err
1296 def update_attribute_value(
1297 self,
1298 *,
1299 entity_id: str,
1300 attr_name: str,
1301 value: Any,
1302 entity_type: str = None,
1303 forcedUpdate: bool = False,
1304 ):
1305 """
1306 Updates the value of a specified attribute of an entity
1308 Args:
1309 value: update value
1310 entity_id: Id of the entity. Example: Bcn_Welt
1311 attr_name: Name of the attribute to be retrieved.
1312 Example: temperature.
1313 entity_type: Entity type, to avoid ambiguity in case there are
1314 several entities with the same entity id.
1315 forcedUpdate: Update operation have to trigger any matching
1316 subscription, no matter if there is an actual attribute
1317 update or no instead of the default behavior, which is to
1318 updated only if attribute is effectively updated.
1319 Returns:
1321 """
1322 url = urljoin(
1323 self.base_url,
1324 f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value",
1325 )
1326 headers = self.headers.copy()
1327 params = {}
1328 if entity_type:
1329 params.update({"type": entity_type})
1330 options = []
1331 if forcedUpdate:
1332 options.append("forcedUpdate")
1333 if options:
1334 params.update({"options": ",".join(options)})
1335 try:
1336 if not isinstance(value, (dict, list)):
1337 headers.update({"Content-Type": "text/plain"})
1338 if isinstance(value, str):
1339 value = f"{value}"
1340 res = self.put(url=url, headers=headers, json=value, params=params)
1341 else:
1342 res = self.put(url=url, headers=headers, json=value, params=params)
1343 if res.ok:
1344 self.logger.info(
1345 "Attribute '%s' of '%s' " "successfully updated!",
1346 attr_name,
1347 entity_id,
1348 )
1349 else:
1350 res.raise_for_status()
1351 except requests.RequestException as err:
1352 msg = (
1353 f"Could not update value of attribute '{attr_name}' from "
1354 f"entity '{entity_id}' "
1355 )
1356 raise BaseHttpClientException(message=msg, response=err.response) from err
1358 # Types Operations
1359 def get_entity_types(
1360 self, *, limit: int = None, offset: int = None, options: str = None
1361 ) -> List[Dict[str, Any]]:
1362 """
1364 Args:
1365 limit: Limit the number of types to be retrieved.
1366 offset: Skip a number of records.
1367 options: Options dictionary. Allowed: count, values
1369 Returns:
1371 """
1372 url = urljoin(self.base_url, f"{self._url_version}/types")
1373 headers = self.headers.copy()
1374 params = {}
1375 if limit:
1376 params.update({"limit": limit})
1377 if offset:
1378 params.update({"offset": offset})
1379 if options:
1380 params.update({"options": options})
1381 try:
1382 res = self.get(url=url, params=params, headers=headers)
1383 if res.ok:
1384 self.logger.debug("Received: %s", res.json())
1385 return res.json()
1386 res.raise_for_status()
1387 except requests.RequestException as err:
1388 msg = "Could not load entity types!"
1389 raise BaseHttpClientException(message=msg, response=err.response) from err
1391 def get_entity_type(self, entity_type: str) -> Dict[str, Any]:
1392 """
1394 Args:
1395 entity_type: Entity Type. Example: Room
1397 Returns:
1399 """
1400 url = urljoin(self.base_url, f"{self._url_version}/types/{entity_type}")
1401 headers = self.headers.copy()
1402 params = {}
1403 try:
1404 res = self.get(url=url, params=params, headers=headers)
1405 if res.ok:
1406 self.logger.debug("Received: %s", res.json())
1407 return res.json()
1408 res.raise_for_status()
1409 except requests.RequestException as err:
1410 msg = f"Could not load entities of type" f"'{entity_type}' "
1411 raise BaseHttpClientException(message=msg, response=err.response) from err
1413 # SUBSCRIPTION API ENDPOINTS
1414 def get_subscription_list(self, limit: PositiveInt = inf) -> List[Subscription]:
1415 """
1416 Returns a list of all the subscriptions present in the system.
1417 Args:
1418 limit: Limit the number of subscriptions to be retrieved
1419 Returns:
1420 list of subscriptions
1421 """
1422 url = urljoin(self.base_url, f"{self._url_version}/subscriptions/")
1423 headers = self.headers.copy()
1424 params = {}
1426 # We always use the 'count' option to check weather pagination is
1427 # required
1428 params.update({"options": "count"})
1429 try:
1430 items = self.__pagination(
1431 limit=limit, url=url, params=params, headers=headers
1432 )
1433 adapter = TypeAdapter(List[Subscription])
1434 return adapter.validate_python(items)
1435 except requests.RequestException as err:
1436 msg = "Could not load subscriptions!"
1437 raise BaseHttpClientException(message=msg, response=err.response) from err
1439 def post_subscription(
1440 self,
1441 subscription: Subscription,
1442 update: bool = False,
1443 skip_initial_notification: bool = False,
1444 ) -> str:
1445 """
1446 Creates a new subscription. The subscription is represented by a
1447 Subscription object defined in filip.cb.models.
1449 If the subscription already exists, the adding is prevented and the id
1450 of the existing subscription is returned.
1452 A subscription is deemed as already existing if there exists a
1453 subscription with the exact same subject and notification fields. All
1454 optional fields are not considered.
1456 Args:
1457 subscription: Subscription
1458 update: True - If the subscription already exists, update it
1459 False- If the subscription already exists, throw warning
1460 skip_initial_notification: True - Initial Notifications will be
1461 sent to recipient containing the whole data. This is
1462 deprecated and removed from version 3.0 of the context broker.
1463 False - skip the initial notification
1464 Returns:
1465 str: Id of the (created) subscription
1467 """
1468 existing_subscriptions = self.get_subscription_list()
1470 sub_dict = subscription.model_dump(
1471 include={"subject", "notification"},
1472 exclude={
1473 "notification": {
1474 "lastSuccess",
1475 "lastFailure",
1476 "lastSuccessCode",
1477 "lastFailureReason",
1478 }
1479 },
1480 )
1481 for ex_sub in existing_subscriptions:
1482 if self._subscription_dicts_are_equal(
1483 sub_dict,
1484 ex_sub.model_dump(
1485 include={"subject", "notification"},
1486 exclude={
1487 "lastSuccess",
1488 "lastFailure",
1489 "lastSuccessCode",
1490 "lastFailureReason",
1491 },
1492 ),
1493 ):
1494 self.logger.info("Subscription already exists")
1495 if update:
1496 self.logger.info("Updated subscription")
1497 subscription.id = ex_sub.id
1498 self.update_subscription(subscription)
1499 else:
1500 warnings.warn(
1501 f"Subscription existed already with the id" f" {ex_sub.id}"
1502 )
1503 return ex_sub.id
1505 params = {}
1506 if skip_initial_notification:
1507 version = self.get_version()["orion"]["version"]
1508 if parse_version(version) <= parse_version("3.1"):
1509 params.update({"options": "skipInitialNotification"})
1510 else:
1511 pass
1512 warnings.warn(
1513 f"Skip initial notifications is a deprecated "
1514 f"feature of older versions <=3.1 of the context "
1515 f"broker. The Context Broker that you requesting has "
1516 f"version: {version}. For newer versions we "
1517 f"automatically skip this option. Consider "
1518 f"refactoring and updating your services",
1519 DeprecationWarning,
1520 )
1522 url = urljoin(self.base_url, "v2/subscriptions")
1523 headers = self.headers.copy()
1524 headers.update({"Content-Type": "application/json"})
1525 try:
1526 res = self.post(
1527 url=url,
1528 headers=headers,
1529 data=subscription.model_dump_json(
1530 exclude={
1531 "id": True,
1532 "notification": {
1533 "lastSuccess",
1534 "lastFailure",
1535 "lastSuccessCode",
1536 "lastFailureReason",
1537 },
1538 },
1539 exclude_none=True,
1540 ),
1541 params=params,
1542 )
1543 if res.ok:
1544 self.logger.info("Subscription successfully created!")
1545 return res.headers["Location"].split("/")[-1]
1546 res.raise_for_status()
1547 except requests.RequestException as err:
1548 msg = "Could not send subscription!"
1549 raise BaseHttpClientException(message=msg, response=err.response) from err
1551 def get_subscription(self, subscription_id: str) -> Subscription:
1552 """
1553 Retrieves a subscription from
1554 Args:
1555 subscription_id: id of the subscription
1557 Returns:
1559 """
1560 url = urljoin(
1561 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}"
1562 )
1563 headers = self.headers.copy()
1564 try:
1565 res = self.get(url=url, headers=headers)
1566 if res.ok:
1567 self.logger.debug("Received: %s", res.json())
1568 return Subscription(**res.json())
1569 res.raise_for_status()
1570 except requests.RequestException as err:
1571 msg = f"Could not load subscription {subscription_id}"
1572 raise BaseHttpClientException(message=msg, response=err.response) from err
1574 def update_subscription(
1575 self, subscription: Subscription, skip_initial_notification: bool = False
1576 ):
1577 """
1578 Only the fields included in the request are updated in the subscription.
1580 Args:
1581 subscription: Subscription to update
1582 skip_initial_notification: True - Initial Notifications will be
1583 sent to recipient containing the whole data. This is
1584 deprecated and removed from version 3.0 of the context broker.
1585 False - skip the initial notification
1587 Returns:
1588 None
1589 """
1590 params = {}
1591 if skip_initial_notification:
1592 version = self.get_version()["orion"]["version"]
1593 if parse_version(version) <= parse_version("3.1"):
1594 params.update({"options": "skipInitialNotification"})
1595 else:
1596 pass
1597 warnings.warn(
1598 f"Skip initial notifications is a deprecated "
1599 f"feature of older versions <3.1 of the context "
1600 f"broker. The Context Broker that you requesting has "
1601 f"version: {version}. For newer versions we "
1602 f"automatically skip this option. Consider "
1603 f"refactoring and updating your services",
1604 DeprecationWarning,
1605 )
1607 url = urljoin(
1608 self.base_url, f"{self._url_version}/subscriptions/{subscription.id}"
1609 )
1610 headers = self.headers.copy()
1611 headers.update({"Content-Type": "application/json"})
1612 try:
1613 res = self.patch(
1614 url=url,
1615 headers=headers,
1616 data=subscription.model_dump_json(
1617 exclude={
1618 "id": True,
1619 "notification": {
1620 "lastSuccess",
1621 "lastFailure",
1622 "lastSuccessCode",
1623 "lastFailureReason",
1624 },
1625 },
1626 exclude_none=True,
1627 ),
1628 )
1629 if res.ok:
1630 self.logger.info("Subscription successfully updated!")
1631 else:
1632 res.raise_for_status()
1633 except requests.RequestException as err:
1634 msg = f"Could not update subscription {subscription.id}"
1635 raise BaseHttpClientException(message=msg, response=err.response) from err
1637 def delete_subscription(self, subscription_id: str) -> None:
1638 """
1639 Deletes a subscription from a Context Broker
1640 Args:
1641 subscription_id: id of the subscription
1642 """
1643 url = urljoin(
1644 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}"
1645 )
1646 headers = self.headers.copy()
1647 try:
1648 res = self.delete(url=url, headers=headers)
1649 if res.ok:
1650 self.logger.info(
1651 f"Subscription '{subscription_id}' " f"successfully deleted!"
1652 )
1653 else:
1654 res.raise_for_status()
1655 except requests.RequestException as err:
1656 msg = f"Could not delete subscription {subscription_id}"
1657 raise BaseHttpClientException(message=msg, response=err.response) from err
1659 # Registration API
1660 def get_registration_list(self, *, limit: PositiveInt = None) -> List[Registration]:
1661 """
1662 Lists all the context provider registrations present in the system.
1664 Args:
1665 limit: Limit the number of registrations to be retrieved
1666 Returns:
1668 """
1669 url = urljoin(self.base_url, f"{self._url_version}/registrations/")
1670 headers = self.headers.copy()
1671 params = {}
1673 # We always use the 'count' option to check weather pagination is
1674 # required
1675 params.update({"options": "count"})
1676 try:
1677 items = self.__pagination(
1678 limit=limit, url=url, params=params, headers=headers
1679 )
1680 adapter = TypeAdapter(List[Registration])
1681 return adapter.validate_python(items)
1682 except requests.RequestException as err:
1683 msg = "Could not load registrations!"
1684 raise BaseHttpClientException(message=msg, response=err.response) from err
1686 def post_registration(self, registration: Registration):
1687 """
1688 Creates a new context provider registration. This is typically used
1689 for binding context sources as providers of certain data. The
1690 registration is represented by cb.models.Registration
1692 Args:
1693 registration (Registration):
1695 Returns:
1697 """
1698 url = urljoin(self.base_url, f"{self._url_version}/registrations")
1699 headers = self.headers.copy()
1700 headers.update({"Content-Type": "application/json"})
1701 try:
1702 res = self.post(
1703 url=url,
1704 headers=headers,
1705 data=registration.model_dump_json(exclude={"id"}, exclude_none=True),
1706 )
1707 if res.ok:
1708 self.logger.info("Registration successfully created!")
1709 return res.headers["Location"].split("/")[-1]
1710 res.raise_for_status()
1711 except requests.RequestException as err:
1712 msg = f"Could not send registration {registration.id}!"
1713 raise BaseHttpClientException(message=msg, response=err.response) from err
1715 def get_registration(self, registration_id: str) -> Registration:
1716 """
1717 Retrieves a registration from context broker by id
1719 Args:
1720 registration_id: id of the registration
1722 Returns:
1723 Registration
1724 """
1725 url = urljoin(
1726 self.base_url, f"{self._url_version}/registrations/{registration_id}"
1727 )
1728 headers = self.headers.copy()
1729 try:
1730 res = self.get(url=url, headers=headers)
1731 if res.ok:
1732 self.logger.debug("Received: %s", res.json())
1733 return Registration(**res.json())
1734 res.raise_for_status()
1735 except requests.RequestException as err:
1736 msg = f"Could not load registration {registration_id} !"
1737 raise BaseHttpClientException(message=msg, response=err.response) from err
1739 def add_valid_relationships(
1740 self, entities: List[ContextEntity]
1741 ) -> List[ContextEntity]:
1742 """
1743 Validate all attributes in the given entities. If the attribute value points to
1744 an existing entity, it is assumed that this attribute is a relationship, and it
1745 will be assigned with the attribute type "relationship"
1747 Args:
1748 entities: list of entities that need to be validated.
1750 Returns:
1751 updated entities
1752 """
1753 updated_entities = []
1754 for entity in entities[:]:
1755 for attr_name, attr_value in entity.model_dump(
1756 exclude={"id", "type"}
1757 ).items():
1758 if isinstance(attr_value, dict):
1759 if self.validate_relationship(attr_value):
1760 entity.update_attribute(
1761 {
1762 attr_name: ContextAttribute(
1763 **{
1764 "type": DataType.RELATIONSHIP,
1765 "value": attr_value.get("value"),
1766 }
1767 )
1768 }
1769 )
1770 updated_entities.append(entity)
1771 return updated_entities
1773 def remove_invalid_relationships(
1774 self, entities: List[ContextEntity], hard_remove: bool = True
1775 ) -> List[ContextEntity]:
1776 """
1777 Removes invalid relationships from the entities. An invalid relationship
1778 is a relationship that has no destination entity.
1780 Args:
1781 entities: list of entities that need to be validated.
1782 hard_remove: If True, invalid relationships will be deleted.
1783 If False, invalid relationships will be changed to Text
1784 attributes.
1786 Returns:
1787 updated entities
1788 """
1789 updated_entities = []
1790 for entity in entities[:]:
1791 for relationship in entity.get_relationships():
1792 if not self.validate_relationship(relationship):
1793 if hard_remove:
1794 entity.delete_attributes(attrs=[relationship])
1795 else:
1796 # change the attribute type to "Text"
1797 entity.update_attribute(
1798 attrs=[
1799 NamedContextAttribute(
1800 name=relationship.name,
1801 type=DataType.TEXT,
1802 value=relationship.value,
1803 )
1804 ]
1805 )
1806 updated_entities.append(entity)
1807 return updated_entities
1809 def validate_relationship(
1810 self, relationship: Union[NamedContextAttribute, ContextAttribute, Dict]
1811 ) -> bool:
1812 """
1813 Validates a relationship. A relationship is valid if it points to an existing
1814 entity. Otherwise, it is considered invalid
1816 Args:
1817 relationship: relationship to validate
1818 Returns
1819 True if the relationship is valid, False otherwise
1820 """
1821 if isinstance(relationship, NamedContextAttribute) or isinstance(
1822 relationship, ContextAttribute
1823 ):
1824 destination_id = relationship.value
1825 elif isinstance(relationship, dict):
1826 _sentinel = object()
1827 destination_id = relationship.get("value", _sentinel)
1828 if destination_id is _sentinel:
1829 raise ValueError(
1830 "Invalid relationship dictionary format\n"
1831 "Expected format: {"
1832 f'"type": "{DataType.RELATIONSHIP.value}", '
1833 '"value" "entity_id"}'
1834 )
1835 else:
1836 raise ValueError("Invalid relationship type.")
1837 try:
1838 destination_entity = self.get_entity(entity_id=destination_id)
1839 return destination_entity.id == destination_id
1840 except requests.RequestException as err:
1841 if err.response.status_code == 404:
1842 return False
1844 def update_registration(self, registration: Registration):
1845 """
1846 Only the fields included in the request are updated in the registration.
1848 Args:
1849 registration: Registration to update
1850 Returns:
1852 """
1853 url = urljoin(
1854 self.base_url, f"{self._url_version}/registrations/{registration.id}"
1855 )
1856 headers = self.headers.copy()
1857 headers.update({"Content-Type": "application/json"})
1858 try:
1859 res = self.patch(
1860 url=url,
1861 headers=headers,
1862 data=registration.model_dump_json(exclude={"id"}, exclude_none=True),
1863 )
1864 if res.ok:
1865 self.logger.info("Registration successfully updated!")
1866 else:
1867 res.raise_for_status()
1868 except requests.RequestException as err:
1869 msg = f"Could not update registration {registration.id} !"
1870 raise BaseHttpClientException(message=msg, response=err.response) from err
1872 def delete_registration(self, registration_id: str) -> None:
1873 """
1874 Deletes a subscription from a Context Broker
1875 Args:
1876 registration_id: id of the subscription
1877 """
1878 url = urljoin(
1879 self.base_url, f"{self._url_version}/registrations/{registration_id}"
1880 )
1881 headers = self.headers.copy()
1882 try:
1883 res = self.delete(url=url, headers=headers)
1884 if res.ok:
1885 self.logger.info(
1886 "Registration '%s' " "successfully deleted!", registration_id
1887 )
1888 res.raise_for_status()
1889 except requests.RequestException as err:
1890 msg = f"Could not delete registration {registration_id} !"
1891 raise BaseHttpClientException(message=msg, response=err.response) from err
1893 # Batch operation API
1894 def update(
1895 self,
1896 *,
1897 entities: List[Union[ContextEntity, ContextEntityKeyValues]],
1898 action_type: Union[ActionType, str],
1899 update_format: str = None,
1900 forcedUpdate: bool = False,
1901 override_metadata: bool = False,
1902 ) -> None:
1903 """
1904 This operation allows to create, update and/or delete several entities
1905 in a single batch operation.
1907 This operation is split in as many individual operations as entities
1908 in the entities vector, so the actionType is executed for each one of
1909 them. Depending on the actionType, a mapping with regular non-batch
1910 operations can be done:
1912 append: maps to POST /v2/entities (if the entity does not already exist)
1913 or POST /v2/entities/<id>/attrs (if the entity already exists).
1915 appendStrict: maps to POST /v2/entities (if the entity does not
1916 already exist) or POST /v2/entities/<id>/attrs?options=append (if the
1917 entity already exists).
1919 update: maps to PATCH /v2/entities/<id>/attrs.
1921 delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every
1922 attribute included in the entity or to DELETE /v2/entities/<id> if
1923 no attribute were included in the entity.
1925 replace: maps to PUT /v2/entities/<id>/attrs.
1927 Args:
1928 entities: "an array of entities, each entity specified using the "
1929 "JSON entity representation format "
1930 action_type (Update): "actionType, to specify the kind of update
1931 action to do: either append, appendStrict, update, delete,
1932 or replace. "
1933 update_format (str): Optional 'keyValues'
1934 forcedUpdate: Update operation have to trigger any matching
1935 subscription, no matter if there is an actual attribute
1936 update or no instead of the default behavior, which is to
1937 updated only if attribute is effectively updated.
1938 override_metadata:
1939 Bool, replace the existing metadata with the one provided in
1940 the request
1941 Returns:
1943 """
1945 url = urljoin(self.base_url, f"{self._url_version}/op/update")
1946 headers = self.headers.copy()
1947 headers.update({"Content-Type": "application/json"})
1948 params = {}
1949 options = []
1950 if override_metadata:
1951 options.append("overrideMetadata")
1952 if forcedUpdate:
1953 options.append("forcedUpdate")
1954 if update_format:
1955 assert (
1956 update_format == AttrsFormat.KEY_VALUES.value
1957 ), "Only 'keyValues' is allowed as update format"
1958 options.append("keyValues")
1959 if options:
1960 params.update({"options": ",".join(options)})
1961 update = Update(actionType=action_type, entities=entities)
1962 try:
1963 res = self.post(
1964 url=url,
1965 headers=headers,
1966 params=params,
1967 json=update.model_dump(by_alias=True),
1968 )
1969 if res.ok:
1970 self.logger.info("Update operation '%s' succeeded!", action_type)
1971 else:
1972 res.raise_for_status()
1973 except requests.RequestException as err:
1974 msg = f"Update operation '{action_type}' failed!"
1975 raise BaseHttpClientException(message=msg, response=err.response) from err
1977 def query(
1978 self,
1979 *,
1980 query: Query,
1981 limit: PositiveInt = None,
1982 order_by: str = None,
1983 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
1984 ) -> List[Any]:
1985 """
1986 Generate api query
1987 Args:
1988 query (Query):
1989 limit (PositiveInt):
1990 order_by (str):
1991 response_format (AttrsFormat, str):
1992 Returns:
1993 The response payload is an Array containing one object per matching
1994 entity, or an empty array [] if no entities are found. The entities
1995 follow the JSON entity representation format (described in the
1996 section "JSON Entity Representation").
1997 """
1998 url = urljoin(self.base_url, f"{self._url_version}/op/query")
1999 headers = self.headers.copy()
2000 headers.update({"Content-Type": "application/json"})
2001 params = {"options": "count"}
2003 if response_format:
2004 if response_format not in list(AttrsFormat):
2005 raise ValueError(f"Value must be in {list(AttrsFormat)}")
2006 params["options"] = ",".join([response_format, "count"])
2007 try:
2008 items = self.__pagination(
2009 method=PaginationMethod.POST,
2010 url=url,
2011 headers=headers,
2012 params=params,
2013 data=query.model_dump_json(exclude_none=True),
2014 limit=limit,
2015 )
2016 if response_format == AttrsFormat.NORMALIZED:
2017 adapter = TypeAdapter(List[ContextEntity])
2018 return adapter.validate_python(items)
2019 if response_format == AttrsFormat.KEY_VALUES:
2020 adapter = TypeAdapter(List[ContextEntityKeyValues])
2021 return adapter.validate_python(items)
2022 return items
2023 except requests.RequestException as err:
2024 msg = "Query operation failed!"
2025 raise BaseHttpClientException(message=msg, response=err.response) from err
2027 def notify(self, message: Message) -> None:
2028 """
2029 This operation is intended to consume a notification payload so that
2030 all the entity data included by such notification is persisted,
2031 overwriting if necessary. This operation is useful when one NGSIv2
2032 endpoint is subscribed to another NGSIv2 endpoint (federation
2033 scenarios). The request payload must be an NGSIv2 notification
2034 payload. The behaviour must be exactly the same as 'update'
2035 with 'action_type' equal to append.
2037 Args:
2038 message: Notification message
2040 Returns:
2041 None
2042 """
2043 url = urljoin(self.base_url, "v2/op/notify")
2044 headers = self.headers.copy()
2045 headers.update({"Content-Type": "application/json"})
2046 params = {}
2047 try:
2048 res = self.post(
2049 url=url,
2050 headers=headers,
2051 params=params,
2052 data=message.model_dump_json(by_alias=True),
2053 )
2054 if res.ok:
2055 self.logger.info("Notification message sent!")
2056 else:
2057 res.raise_for_status()
2058 except requests.RequestException as err:
2059 msg = (
2060 f"Sending notifcation message failed! \n "
2061 f"{message.model_dump_json(indent=2)}"
2062 )
2063 raise BaseHttpClientException(message=msg, response=err.response) from err
2065 def post_command(
2066 self,
2067 *,
2068 entity_id: str,
2069 command: Union[Command, NamedCommand, Dict],
2070 entity_type: str = None,
2071 command_name: str = None,
2072 ) -> None:
2073 """
2074 Post a command to a context entity this corresponds to 'PATCH' of the
2075 specified command attribute.
2077 Args:
2078 entity_id: Entity identifier
2079 command: Command
2080 entity_type: Entity type
2081 command_name: Name of the command in the entity
2083 Returns:
2084 None
2085 """
2086 if command_name:
2087 assert isinstance(command, (Command, dict))
2088 if isinstance(command, dict):
2089 command = Command(**command)
2090 command = {command_name: command.model_dump()}
2091 else:
2092 assert isinstance(command, (NamedCommand, dict))
2093 if isinstance(command, dict):
2094 command = NamedCommand(**command)
2096 self.update_existing_entity_attributes(
2097 entity_id=entity_id, entity_type=entity_type, attrs=[command]
2098 )
2100 def does_entity_exist(self, entity_id: str, entity_type: str) -> bool:
2101 """
2102 Test if an entity with given id and type is present in the CB
2104 Args:
2105 entity_id: Entity id
2106 entity_type: Entity type
2108 Returns:
2109 bool; True if entity exists
2111 Raises:
2112 RequestException, if any error occurs (e.g: No Connection),
2113 except that the entity is not found
2114 """
2115 url = urljoin(self.base_url, f"v2/entities/{entity_id}")
2116 headers = self.headers.copy()
2117 params = {"type": entity_type}
2119 try:
2120 res = self.get(url=url, params=params, headers=headers)
2121 if res.ok:
2122 return True
2123 res.raise_for_status()
2124 except requests.RequestException as err:
2125 if err.response is None or not err.response.status_code == 404:
2126 self.log_error(err=err, msg="Checking entity existence failed!")
2127 raise
2128 return False
2130 def patch_entity(
2131 self,
2132 entity: Union[ContextEntity, ContextEntityKeyValues],
2133 key_values: bool = False,
2134 forcedUpdate: bool = False,
2135 override_metadata: bool = False,
2136 ) -> None:
2137 """
2138 Takes a given entity and updates the state in the CB to match it.
2139 It is an extended equivalent to the HTTP method PATCH, which applies
2140 partial modifications to a resource.
2142 Args:
2143 entity: Entity to update
2144 key_values: If True, the entity is updated in key-values format.
2145 forcedUpdate: Update operation have to trigger any matching
2146 subscription, no matter if there is an actual attribute
2147 update or no instead of the default behavior, which is to
2148 updated only if attribute is effectively updated.
2149 override_metadata: If True, the existing metadata of the entity
2150 is replaced
2151 Returns:
2152 None
2153 """
2154 attributes = entity.get_attributes()
2156 self.update_existing_entity_attributes(
2157 entity_id=entity.id,
2158 entity_type=entity.type,
2159 attrs=attributes,
2160 key_values=key_values,
2161 forcedUpdate=forcedUpdate,
2162 override_metadata=override_metadata,
2163 )
2165 def _subscription_dicts_are_equal(self, first: dict, second: dict):
2166 """
2167 Check if two dictionaries and all sub-dictionaries are equal.
2168 Logs a warning if the keys are not equal, but ignores the
2169 comparison of such keys.
2171 Args:
2172 first dict: Dictionary of first subscription
2173 second dict: Dictionary of second subscription
2175 Returns:
2176 True if equal, else False
2177 """
2179 def _value_is_not_none(value):
2180 """
2181 Recursive function to check if a value equals none.
2182 If the value is a dict and any value of the dict is not none,
2183 the value is not none.
2184 If the value is a list and any item is not none, the value is not none.
2185 If it's neither dict nore list, bool is used.
2186 """
2187 if isinstance(value, dict):
2188 return any([_value_is_not_none(value=_v) for _v in value.values()])
2189 if isinstance(value, list):
2190 return any([_value_is_not_none(value=_v) for _v in value])
2191 else:
2192 return bool(value)
2194 if first.keys() != second.keys():
2195 warnings.warn(
2196 "Subscriptions contain a different set of fields. "
2197 "Only comparing to new fields of the new one."
2198 )
2199 for k, v in first.items():
2200 ex_value = second.get(k, None)
2201 if isinstance(v, dict) and isinstance(ex_value, dict):
2202 equal = self._subscription_dicts_are_equal(v, ex_value)
2203 if equal:
2204 continue
2205 else:
2206 return False
2207 if v != ex_value:
2208 self.logger.debug(f"Not equal fields for key {k}: ({v}, {ex_value})")
2209 if (
2210 not _value_is_not_none(v)
2211 and not _value_is_not_none(ex_value)
2212 or k == "timesSent"
2213 ):
2214 continue
2215 return False
2216 return True
2219#
2220#
2221# def check_duplicate_subscription(self, subscription_body, limit: int = 20):
2222# """
2223# Function compares the subject of the subscription body, on whether a subscription
2224# already exists for a device / entity.
2225# :param subscription_body: the body of the new subscripton
2226# :param limit: pagination parameter, to set the number of
2227# subscriptions bodies the get request should grab
2228# :return: exists, boolean -> True, if such a subscription allready
2229# exists
2230# """
2231# exists = False
2232# subscription_subject = json.loads(subscription_body)["subject"]
2233# # Exact keys depend on subscription body
2234# try:
2235# subscription_url = json.loads(subscription_body)[
2236# "notification"]["httpCustom"]["url"]
2237# except KeyError:
2238# subscription_url = json.loads(subscription_body)[
2239# "notification"]["http"]["url"]
2240#
2241# # If the number of subscriptions is larger then the limit,
2242# paginations methods have to be used
2243# url = self.url + '/v2/subscriptions?limit=' + str(limit) +
2244# '&options=count'
2245# response = self.session.get(url, headers=self.get_header())
2246#
2247# sub_count = float(response.headers["Fiware-Total-Count"])
2248# response = json.loads(response.text)
2249# if sub_count >= limit:
2250# response = self.get_pagination(url=url, headers=self.get_header(),
2251# limit=limit, count=sub_count)
2252# response = json.loads(response)
2253#
2254# for existing_subscription in response:
2255# # check whether the exact same subscriptions already exists
2256# if existing_subscription["subject"] == subscription_subject:
2257# exists = True
2258# break
2259# try:
2260# existing_url = existing_subscription["notification"][
2261# "http"]["url"]
2262# except KeyError:
2263# existing_url = existing_subscription["notification"][
2264# "httpCustom"]["url"]
2265# # check whether both subscriptions notify to the same path
2266# if existing_url != subscription_url:
2267# continue
2268# else:
2269# # iterate over all entities included in the subscription object
2270# for entity in subscription_subject["entities"]:
2271# if 'type' in entity.keys():
2272# subscription_type = entity['type']
2273# else:
2274# subscription_type = entity['typePattern']
2275# if 'id' in entity.keys():
2276# subscription_id = entity['id']
2277# else:
2278# subscription_id = entity["idPattern"]
2279# # iterate over all entities included in the exisiting
2280# subscriptions
2281# for existing_entity in existing_subscription["subject"][
2282# "entities"]:
2283# if "type" in entity.keys():
2284# type_existing = entity["type"]
2285# else:
2286# type_existing = entity["typePattern"]
2287# if "id" in entity.keys():
2288# id_existing = entity["id"]
2289# else:
2290# id_existing = entity["idPattern"]
2291# # as the ID field is non optional, it has to match
2292# # check whether the type match
2293# # if the type field is empty, they match all types
2294# if (type_existing == subscription_type) or\
2295# ('*' in subscription_type) or \
2296# ('*' in type_existing)\
2297# or (type_existing == "") or (
2298# subscription_type == ""):
2299# # check if on of the subscriptions is a pattern,
2300# or if they both refer to the same id
2301# # Get the attrs first, to avoid code duplication
2302# # last thing to compare is the attributes
2303# # Assumption -> position is the same as the
2304# entities _list
2305# # i == j
2306# i = subscription_subject["entities"].index(entity)
2307# j = existing_subscription["subject"][
2308# "entities"].index(existing_entity)
2309# try:
2310# subscription_attrs = subscription_subject[
2311# "condition"]["attrs"][i]
2312# except (KeyError, IndexError):
2313# subscription_attrs = []
2314# try:
2315# existing_attrs = existing_subscription[
2316# "subject"]["condition"]["attrs"][j]
2317# except (KeyError, IndexError):
2318# existing_attrs = []
2319#
2320# if (".*" in subscription_id) or ('.*' in
2321# id_existing) or (subscription_id == id_existing):
2322# # Attributes have to match, or the have to
2323# be an empty array
2324# if (subscription_attrs == existing_attrs) or
2325# (subscription_attrs == []) or (existing_attrs == []):
2326# exists = True
2327# # if they do not match completely or subscribe
2328# to all ids they have to match up to a certain position
2329# elif ("*" in subscription_id) or ('*' in
2330# id_existing):
2331# regex_existing = id_existing.find('*')
2332# regex_subscription =
2333# subscription_id.find('*')
2334# # slice the strings to compare
2335# if (id_existing[:regex_existing] in
2336# subscription_id) or (subscription_id[:regex_subscription] in id_existing) or \
2337# (id_existing[regex_existing:] in
2338# subscription_id) or (subscription_id[regex_subscription:] in id_existing):
2339# if (subscription_attrs ==
2340# existing_attrs) or (subscription_attrs == []) or (existing_attrs == []):
2341# exists = True
2342# else:
2343# continue
2344# else:
2345# continue
2346# else:
2347# continue
2348# else:
2349# continue
2350# else:
2351# continue
2352# return exists
2353#