Coverage for filip/clients/ngsi_v2/cb.py: 82%
703 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-03-10 13:43 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-03-10 13:43 +0000
1"""
2Context Broker Module for API Client
3"""
5from __future__ import annotations
7import copy
8from copy import deepcopy
9from enum import Enum
10from math import inf
11from pkg_resources import parse_version
12from pydantic import PositiveInt, PositiveFloat, AnyHttpUrl, ValidationError
13from pydantic.type_adapter import TypeAdapter
14from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union
15import re
16import requests
17from urllib.parse import urljoin
18import warnings
19from filip.clients.base_http_client import BaseHttpClient, NgsiURLVersion
20from filip.config import settings
21from filip.models.base import FiwareHeader, PaginationMethod, DataType
22from filip.utils.simple_ql import QueryString
23from filip.models.ngsi_v2.context import (
24 ActionType,
25 Command,
26 ContextEntity,
27 ContextEntityKeyValues,
28 ContextAttribute,
29 NamedCommand,
30 NamedContextAttribute,
31 Query,
32 Update,
33 PropertyFormat,
34 ContextEntityList,
35 ContextEntityKeyValuesList,
36)
37from filip.models.ngsi_v2.base import AttrsFormat
38from filip.models.ngsi_v2.subscriptions import Subscription, Message
39from filip.models.ngsi_v2.registrations import Registration
41if TYPE_CHECKING:
42 from filip.clients.ngsi_v2.iota import IoTAClient
45class ContextBrokerClient(BaseHttpClient):
46 """
47 Implementation of NGSI Context Broker functionalities, such as creating
48 entities and subscriptions; retrieving, updating and deleting data.
49 Further documentation:
50 https://fiware-orion.readthedocs.io/en/master/
52 Api specifications for v2 are located here:
53 https://telefonicaid.github.io/fiware-orion/api/v2/stable/
55 Note:
56 We use the reference implementation for development. Therefore, some
57 other brokers may show slightly different behavior!
58 """
60 def __init__(
61 self,
62 url: str = None,
63 *,
64 session: requests.Session = None,
65 fiware_header: FiwareHeader = None,
66 **kwargs,
67 ):
68 """
70 Args:
71 url: Url of context broker server
72 session (requests.Session):
73 fiware_header (FiwareHeader): fiware service and fiware service path
74 **kwargs (Optional): Optional arguments that ``request`` takes.
75 """
76 # set service url
77 url = url or settings.CB_URL
78 self._url_version = NgsiURLVersion.v2_url.value
79 super().__init__(
80 url=url, session=session, fiware_header=fiware_header, **kwargs
81 )
83 def __pagination(
84 self,
85 *,
86 method: PaginationMethod = PaginationMethod.GET,
87 url: str,
88 headers: Dict,
89 limit: Union[PositiveInt, PositiveFloat] = None,
90 params: Dict = None,
91 data: str = None,
92 ) -> List[Dict]:
93 """
94 NGSIv2 implements a pagination mechanism in order to help clients to
95 retrieve large sets of resources. This mechanism works for all listing
96 operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions,
97 POST /v2/op/query, etc.). This function helps getting datasets that are
98 larger than the limit for the different GET operations.
100 https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html
102 Args:
103 url: Information about the url, obtained from the original function
104 headers: The headers from the original function
105 params:
106 limit:
108 Returns:
109 object:
111 """
112 if limit is None:
113 limit = inf
114 if limit > 1000:
115 params["limit"] = 1000 # maximum items per request
116 else:
117 params["limit"] = limit
119 if self.session:
120 session = self.session
121 else:
122 session = requests.Session()
123 with session:
124 res = session.request(
125 method=method, url=url, params=params, headers=headers, data=data
126 )
127 if res.ok:
128 items = res.json()
129 # do pagination
130 count = int(res.headers["Fiware-Total-Count"])
132 while len(items) < limit and len(items) < count:
133 # Establishing the offset from where entities are retrieved
134 params["offset"] = len(items)
135 params["limit"] = min(1000, (limit - len(items)))
136 res = session.request(
137 method=method,
138 url=url,
139 params=params,
140 headers=headers,
141 data=data,
142 )
143 if res.ok:
144 items.extend(res.json())
145 else:
146 res.raise_for_status()
147 self.logger.debug("Received: %s", items)
148 return items
149 res.raise_for_status()
151 # MANAGEMENT API
152 def get_version(self) -> Dict:
153 """
154 Gets version of IoT Agent
155 Returns:
156 Dictionary with response
157 """
158 url = urljoin(self.base_url, "version")
159 try:
160 res = self.get(url=url, headers=self.headers)
161 if res.ok:
162 return res.json()
163 res.raise_for_status()
164 except requests.RequestException as err:
165 self.logger.error(err)
166 raise
168 def get_resources(self) -> Dict:
169 """
170 Gets reo
172 Returns:
173 Dict
174 """
175 url = urljoin(self.base_url, self._url_version)
176 try:
177 res = self.get(url=url, headers=self.headers)
178 if res.ok:
179 return res.json()
180 res.raise_for_status()
181 except requests.RequestException as err:
182 self.logger.error(err)
183 raise
185 # STATISTICS API
186 def get_statistics(self) -> Dict:
187 """
188 Gets statistics of context broker
189 Returns:
190 Dictionary with response
191 """
192 url = urljoin(self.base_url, "statistics")
193 try:
194 res = self.get(url=url, headers=self.headers)
195 if res.ok:
196 return res.json()
197 res.raise_for_status()
198 except requests.RequestException as err:
199 self.logger.error(err)
200 raise
202 # CONTEXT MANAGEMENT API ENDPOINTS
203 # Entity Operations
204 def post_entity(
205 self,
206 entity: Union[ContextEntity, ContextEntityKeyValues],
207 update: bool = False,
208 patch: bool = False,
209 override_attr_metadata: bool = True,
210 key_values: bool = False,
211 ):
212 """
213 Function registers an Object with the NGSI Context Broker,
214 if it already exists it can be automatically updated (overwritten)
215 if the update bool is True.
216 First a post request with the entity is tried, if the response code
217 is 422 the entity is uncrossable, as it already exists there are two
218 options, either overwrite it, if the attribute have changed
219 (e.g. at least one new/new values) (update = True) or leave
220 it the way it is (update=False)
221 If you only want to manipulate the entities values, you need to set
222 patch argument.
224 Args:
225 entity (ContextEntity/ContextEntityKeyValues):
226 Context Entity Object
227 update (bool):
228 If the response.status_code is 422, whether the override and
229 existing entity
230 patch (bool):
231 If the response.status_code is 422, whether to manipulate the
232 existing entity. Omitted if update `True`.
233 override_attr_metadata:
234 Only applies for patch equal to `True`.
235 Whether to override or append the attribute's metadata.
236 `True` for overwrite or `False` for update/append
237 key_values(bool):
238 By default False. If set to True, "options=keyValues" will
239 be included in params of post request. The payload uses
240 the keyValues simplified entity representation, i.e.
241 ContextEntityKeyValues.
242 """
243 url = urljoin(self.base_url, f"{self._url_version}/entities")
244 headers = self.headers.copy()
245 params = {}
246 options = []
247 if key_values:
248 assert isinstance(entity, ContextEntityKeyValues)
249 options.append("keyValues")
250 else:
251 assert isinstance(entity, ContextEntity)
252 if options:
253 params.update({"options": ",".join(options)})
254 try:
255 res = self.post(
256 url=url,
257 headers=headers,
258 json=entity.model_dump(exclude_none=True),
259 params=params,
260 )
261 if res.ok:
262 self.logger.info("Entity successfully posted!")
263 return res.headers.get("Location")
264 res.raise_for_status()
265 except requests.RequestException as err:
266 if err.response is not None:
267 if update and err.response.status_code == 422:
268 return self.override_entity(entity=entity, key_values=key_values)
269 if patch and err.response.status_code == 422:
270 if not key_values:
271 return self.patch_entity(
272 entity=entity, override_attr_metadata=override_attr_metadata
273 )
274 else:
275 return self._patch_entity_key_values(entity=entity)
276 msg = f"Could not post entity {entity.id}"
277 self.log_error(err=err, msg=msg)
278 raise
280 def get_entity_list(
281 self,
282 *,
283 entity_ids: List[str] = None,
284 entity_types: List[str] = None,
285 id_pattern: str = None,
286 type_pattern: str = None,
287 q: Union[str, QueryString] = None,
288 mq: Union[str, QueryString] = None,
289 georel: str = None,
290 geometry: str = None,
291 coords: str = None,
292 limit: PositiveInt = inf,
293 attrs: List[str] = None,
294 metadata: str = None,
295 order_by: str = None,
296 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
297 ) -> List[Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]]:
298 r"""
299 Retrieves a list of context entities that match different criteria by
300 id, type, pattern matching (either id or type) and/or those which
301 match a query or geographical query (see Simple Query Language and
302 Geographical Queries). A given entity has to match all the criteria
303 to be retrieved (i.e., the criteria is combined in a logical AND
304 way). Note that pattern matching query parameters are incompatible
305 (i.e. mutually exclusive) with their corresponding exact matching
306 parameters, i.e. idPattern with id and typePattern with type.
308 Args:
309 entity_ids: A comma-separated list of elements. Retrieve entities
310 whose ID matches one of the elements in the list.
311 Incompatible with idPattern,e.g. Boe_Idarium
312 entity_types: comma-separated list of elements. Retrieve entities
313 whose type matches one of the elements in the list.
314 Incompatible with typePattern. Example: Room.
315 id_pattern: A correctly formatted regular expression. Retrieve
316 entities whose ID matches the regular expression. Incompatible
317 with id, e.g. ngsi-ld.* or sensor.*
318 type_pattern: A correctly formatted regular expression. Retrieve
319 entities whose type matches the regular expression.
320 Incompatible with type, e.g. room.*
321 q (SimpleQuery): A query expression, composed of a list of
322 statements separated by ;, i.e.,
323 q=statement1;statement2;statement3. See Simple Query
324 Language specification. Example: temperature>40.
325 mq (SimpleQuery): A query expression for attribute metadata,
326 composed of a list of statements separated by ;, i.e.,
327 mq=statement1;statement2;statement3. See Simple Query
328 Language specification. Example: temperature.accuracy<0.9.
329 georel: Spatial relationship between matching entities and a
330 reference shape. See Geographical Queries. Example: 'near'.
331 geometry: Geographical area to which the query is restricted.
332 See Geographical Queries. Example: point.
333 coords: List of latitude-longitude pairs of coordinates separated
334 by ';'. See Geographical Queries. Example: 41.390205,
335 2.154007;48.8566,2.3522.
336 limit: Limits the number of entities to be retrieved Example: 20
337 attrs: Comma-separated list of attribute names whose data are to
338 be included in the response. The attributes are retrieved in
339 the order specified by this parameter. If this parameter is
340 not included, the attributes are retrieved in arbitrary
341 order. See "Filtering out attributes and metadata" section
342 for more detail. Example: seatNumber.
343 metadata: A list of metadata names to include in the response.
344 See "Filtering out attributes and metadata" section for more
345 detail. Example: accuracy.
346 order_by: Criteria for ordering results. See "Ordering Results"
347 section for details. Example: temperature,!speed.
348 response_format (AttrsFormat, str): Response Format. Note: That if
349 'keyValues' or 'values' are used the response model will
350 change to List[ContextEntityKeyValues] and to List[Dict[str,
351 Any]], respectively.
352 Returns:
354 """
355 url = urljoin(self.base_url, f"{self._url_version}/entities/")
356 headers = self.headers.copy()
357 params = {}
359 if entity_ids and id_pattern:
360 raise ValueError
361 if entity_types and type_pattern:
362 raise ValueError
363 if entity_ids:
364 if not isinstance(entity_ids, list):
365 entity_ids = [entity_ids]
366 params.update({"id": ",".join(entity_ids)})
367 if id_pattern:
368 try:
369 re.compile(id_pattern)
370 except re.error as err:
371 raise ValueError(f"Invalid Pattern: {err}") from err
372 params.update({"idPattern": id_pattern})
373 if entity_types:
374 if not isinstance(entity_types, list):
375 entity_types = [entity_types]
376 params.update({"type": ",".join(entity_types)})
377 if type_pattern:
378 try:
379 re.compile(type_pattern)
380 except re.error as err:
381 raise ValueError(f"Invalid Pattern: {err.msg}") from err
382 params.update({"typePattern": type_pattern})
383 if attrs:
384 params.update({"attrs": ",".join(attrs)})
385 if metadata:
386 params.update({"metadata": ",".join(metadata)})
387 if q:
388 if isinstance(q, str):
389 q = QueryString.parse_str(q)
390 params.update({"q": str(q)})
391 if mq:
392 params.update({"mq": str(mq)})
393 if geometry:
394 params.update({"geometry": geometry})
395 if georel:
396 params.update({"georel": georel})
397 if coords:
398 params.update({"coords": coords})
399 if order_by:
400 params.update({"orderBy": order_by})
401 if response_format not in list(AttrsFormat):
402 raise ValueError(f"Value must be in {list(AttrsFormat)}")
403 response_format = ",".join(["count", response_format])
404 params.update({"options": response_format})
405 try:
406 items = self.__pagination(
407 method=PaginationMethod.GET,
408 limit=limit,
409 url=url,
410 params=params,
411 headers=headers,
412 )
413 if AttrsFormat.NORMALIZED in response_format:
414 return ContextEntityList.model_validate({"entities": items}).entities
415 elif AttrsFormat.KEY_VALUES in response_format:
416 return ContextEntityKeyValuesList.model_validate(
417 {"entities": items}
418 ).entities
419 return items # in case of VALUES as response_format
420 except requests.RequestException as err:
421 msg = "Could not load entities"
422 self.log_error(err=err, msg=msg)
423 raise
425 def get_entity(
426 self,
427 entity_id: str,
428 entity_type: str = None,
429 attrs: List[str] = None,
430 metadata: List[str] = None,
431 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
432 ) -> Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]:
433 """
434 This operation must return one entity element only, but there may be
435 more than one entity with the same ID (e.g. entities with same ID but
436 different types). In such case, an error message is returned, with
437 the HTTP status code set to 409 Conflict.
439 Args:
440 entity_id (String): Id of the entity to be retrieved
441 entity_type (String): Entity type, to avoid ambiguity in case
442 there are several entities with the same entity id.
443 attrs (List of Strings): List of attribute names whose data must be
444 included in the response. The attributes are retrieved in the
445 order specified by this parameter.
446 See "Filtering out attributes and metadata" section for more
447 detail. If this parameter is not included, the attributes are
448 retrieved in arbitrary order, and all the attributes of the
449 entity are included in the response.
450 Example: temperature, humidity.
451 metadata (List of Strings): A list of metadata names to include in
452 the response. See "Filtering out attributes and metadata"
453 section for more detail. Example: accuracy.
454 response_format (AttrsFormat, str): Representation format of
455 response
456 Returns:
457 ContextEntity
458 """
459 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}")
460 headers = self.headers.copy()
461 params = {}
462 if entity_type:
463 params.update({"type": entity_type})
464 if attrs:
465 params.update({"attrs": ",".join(attrs)})
466 if metadata:
467 params.update({"metadata": ",".join(metadata)})
468 if response_format not in list(AttrsFormat):
469 raise ValueError(f"Value must be in {list(AttrsFormat)}")
470 params.update({"options": response_format})
472 try:
473 res = self.get(url=url, params=params, headers=headers)
474 if res.ok:
475 self.logger.info("Entity successfully retrieved!")
476 self.logger.debug("Received: %s", res.json())
477 if response_format == AttrsFormat.NORMALIZED:
478 return ContextEntity(**res.json())
479 if response_format == AttrsFormat.KEY_VALUES:
480 return ContextEntityKeyValues(**res.json())
481 return res.json()
482 res.raise_for_status()
483 except requests.RequestException as err:
484 msg = f"Could not load entity {entity_id}"
485 self.log_error(err=err, msg=msg)
486 raise
488 def get_entity_attributes(
489 self,
490 entity_id: str,
491 entity_type: str = None,
492 attrs: List[str] = None,
493 metadata: List[str] = None,
494 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
495 ) -> Dict[str, ContextAttribute]:
496 """
497 This request is similar to retrieving the whole entity, however this
498 one omits the id and type fields. Just like the general request of
499 getting an entire entity, this operation must return only one entity
500 element. If more than one entity with the same ID is found (e.g.
501 entities with same ID but different type), an error message is
502 returned, with the HTTP status code set to 409 Conflict.
504 Args:
505 entity_id (String): Id of the entity to be retrieved
506 entity_type (String): Entity type, to avoid ambiguity in case
507 there are several entities with the same entity id.
508 attrs (List of Strings): List of attribute names whose data must be
509 included in the response. The attributes are retrieved in the
510 order specified by this parameter.
511 See "Filtering out attributes and metadata" section for more
512 detail. If this parameter is not included, the attributes are
513 retrieved in arbitrary order, and all the attributes of the
514 entity are included in the response. Example: temperature,
515 humidity.
516 metadata (List of Strings): A list of metadata names to include in
517 the response. See "Filtering out attributes and metadata"
518 section for more detail. Example: accuracy.
519 response_format (AttrsFormat, str): Representation format of
520 response
521 Returns:
522 Dict
523 """
524 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
525 headers = self.headers.copy()
526 params = {}
527 if entity_type:
528 params.update({"type": entity_type})
529 if attrs:
530 params.update({"attrs": ",".join(attrs)})
531 if metadata:
532 params.update({"metadata": ",".join(metadata)})
533 if response_format not in list(AttrsFormat):
534 raise ValueError(f"Value must be in {list(AttrsFormat)}")
535 params.update({"options": response_format})
536 try:
537 res = self.get(url=url, params=params, headers=headers)
538 if res.ok:
539 if response_format == AttrsFormat.NORMALIZED:
540 return {
541 key: ContextAttribute(**values)
542 for key, values in res.json().items()
543 }
544 return res.json()
545 res.raise_for_status()
546 except requests.RequestException as err:
547 msg = f"Could not load attributes from entity {entity_id} !"
548 self.log_error(err=err, msg=msg)
549 raise
551 def update_entity(
552 self,
553 entity: Union[ContextEntity, ContextEntityKeyValues, dict],
554 append_strict: bool = False,
555 key_values: bool = False,
556 ):
557 """
558 The request payload is an object representing the attributes to
559 append or update.
561 Note:
562 Update means overwriting the existing entity. If you want to
563 manipulate you should rather use patch_entity.
565 Args:
566 entity (ContextEntity):
567 append_strict: If `False` the entity attributes are updated (if they
568 previously exist) or appended (if they don't previously exist)
569 with the ones in the payload.
570 If `True` all the attributes in the payload not
571 previously existing in the entity are appended. In addition
572 to that, in case some of the attributes in the payload
573 already exist in the entity, an error is returned.
574 More precisely this means a strict append procedure.
575 key_values: By default False. If set to True, the payload uses
576 the keyValues simplified entity representation, i.e.
577 ContextEntityKeyValues.
578 Returns:
579 None
580 """
581 if key_values:
582 if isinstance(entity, dict):
583 entity = copy.deepcopy(entity)
584 _id = entity.pop("id")
585 _type = entity.pop("type")
586 attrs = entity
587 entity = ContextEntityKeyValues(id=_id, type=_type)
588 else:
589 attrs = entity.model_dump(exclude={"id", "type"})
590 else:
591 attrs = entity.get_attributes()
592 self.update_or_append_entity_attributes(
593 entity_id=entity.id,
594 entity_type=entity.type,
595 attrs=attrs,
596 append_strict=append_strict,
597 key_values=key_values,
598 )
600 def update_entity_properties(
601 self, entity: ContextEntity, append_strict: bool = False
602 ):
603 """
604 The request payload is an object representing the attributes, of any type
605 but Relationship, to append or update.
607 Note:
608 Update means overwriting the existing entity. If you want to
609 manipulate you should rather use patch_entity.
611 Args:
612 entity (ContextEntity):
613 append_strict: If `False` the entity attributes are updated (if they
614 previously exist) or appended (if they don't previously exist)
615 with the ones in the payload.
616 If `True` all the attributes in the payload not
617 previously existing in the entity are appended. In addition
618 to that, in case some of the attributes in the payload
619 already exist in the entity, an error is returned.
620 More precisely this means a strict append procedure.
622 Returns:
623 None
624 """
625 self.update_or_append_entity_attributes(
626 entity_id=entity.id,
627 entity_type=entity.type,
628 attrs=entity.get_properties(),
629 append_strict=append_strict,
630 )
632 def update_entity_relationships(
633 self, entity: ContextEntity, append_strict: bool = False
634 ):
635 """
636 The request payload is an object representing only the attributes, of type
637 Relationship, to append or update.
639 Note:
640 Update means overwriting the existing entity. If you want to
641 manipulate you should rather use patch_entity.
643 Args:
644 entity (ContextEntity):
645 append_strict: If `False` the entity attributes are updated (if they
646 previously exist) or appended (if they don't previously exist)
647 with the ones in the payload.
648 If `True` all the attributes in the payload not
649 previously existing in the entity are appended. In addition
650 to that, in case some of the attributes in the payload
651 already exist in the entity, an error is returned.
652 More precisely this means a strict append procedure.
654 Returns:
655 None
656 """
657 self.update_or_append_entity_attributes(
658 entity_id=entity.id,
659 entity_type=entity.type,
660 attrs=entity.get_relationships(),
661 append_strict=append_strict,
662 )
664 def delete_entity(
665 self,
666 entity_id: str,
667 entity_type: str = None,
668 delete_devices: bool = False,
669 iota_client: IoTAClient = None,
670 iota_url: AnyHttpUrl = settings.IOTA_URL,
671 ) -> None:
672 """
673 Remove a entity from the context broker. No payload is required
674 or received.
676 Args:
677 entity_id:
678 Id of the entity to be deleted
679 entity_type:
680 Entity type, to avoid ambiguity in case there are several
681 entities with the same entity id.
682 delete_devices:
683 If True, also delete all devices that reference this
684 entity (entity_id as entity_name)
685 iota_client:
686 Corresponding IoTA-Client used to access IoTA-Agent
687 iota_url:
688 URL of the corresponding IoT-Agent. This will autogenerate
689 an IoTA-Client, mirroring the information of the
690 ContextBrokerClient, e.g. FiwareHeader, and other headers
692 Returns:
693 None
694 """
695 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}")
696 headers = self.headers.copy()
697 if entity_type:
698 params = {"type": entity_type}
699 else:
700 params = None
701 try:
702 res = self.delete(url=url, params=params, headers=headers)
703 if res.ok:
704 self.logger.info("Entity '%s' successfully deleted!", entity_id)
705 else:
706 res.raise_for_status()
707 except requests.RequestException as err:
708 msg = f"Could not delete entity {entity_id} !"
709 self.log_error(err=err, msg=msg)
710 raise
712 if delete_devices:
713 from filip.clients.ngsi_v2 import IoTAClient
715 if iota_client:
716 iota_client_local = deepcopy(iota_client)
717 else:
718 warnings.warn(
719 "No IoTA-Client object provided! "
720 "Will try to generate one. "
721 "This usage is not recommended."
722 )
724 iota_client_local = IoTAClient(
725 url=iota_url,
726 fiware_header=self.fiware_headers,
727 headers=self.headers,
728 )
730 for device in iota_client_local.get_device_list(entity_names=[entity_id]):
731 if entity_type:
732 if device.entity_type == entity_type:
733 iota_client_local.delete_device(device_id=device.device_id)
734 else:
735 iota_client_local.delete_device(device_id=device.device_id)
736 iota_client_local.close()
738 def delete_entities(self, entities: List[ContextEntity]) -> None:
739 """
740 Remove a list of entities from the context broker. This methode is
741 more efficient than to call delete_entity() for each entity
743 Args:
744 entities: List[ContextEntity]: List of entities to be deleted
746 Raises:
747 Exception, if one of the entities is not in the ContextBroker
749 Returns:
750 None
751 """
753 # update() delete, deletes all entities without attributes completely,
754 # and removes the attributes for the other
755 # The entities are sorted based on the fact if they have
756 # attributes.
757 entities_with_attributes: List[ContextEntity] = []
758 for entity in entities:
759 attribute_names = [
760 key
761 for key in entity.model_dump()
762 if key not in ContextEntity.model_fields
763 ]
764 if len(attribute_names) > 0:
765 entities_with_attributes.append(
766 ContextEntity(id=entity.id, type=entity.type)
767 )
769 # Post update_delete for those without attribute only once,
770 # for the other post update_delete again but for the changed entity
771 # in the ContextBroker (only id and type left)
772 if len(entities) > 0:
773 self.update(entities=entities, action_type="delete")
774 if len(entities_with_attributes) > 0:
775 self.update(entities=entities_with_attributes, action_type="delete")
777 def update_or_append_entity_attributes(
778 self,
779 entity_id: str,
780 attrs: Union[
781 List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any]
782 ],
783 entity_type: str = None,
784 append_strict: bool = False,
785 forcedUpdate: bool = False,
786 key_values: bool = False,
787 ):
788 """
789 The request payload is an object representing the attributes to
790 append or update. This corresponds to a 'POST' request if append is
791 set to 'False'
793 Note:
794 Be careful not to update attributes that are
795 provided via context registration, e.g. commands. Commands are
796 removed before sending the request. To avoid breaking things.
798 Args:
799 entity_id: Entity id to be updated
800 entity_type: Entity type, to avoid ambiguity in case there are
801 several entities with the same entity id.
802 attrs: List of attributes to update or to append
803 append_strict: If `False` the entity attributes are updated (if they
804 previously exist) or appended (if they don't previously exist)
805 with the ones in the payload.
806 If `True` all the attributes in the payload not
807 previously existing in the entity are appended. In addition
808 to that, in case some of the attributes in the payload
809 already exist in the entity, an error is returned.
810 More precisely this means a strict append procedure.
811 forcedUpdate: Update operation have to trigger any matching
812 subscription, no matter if there is an actual attribute
813 update or no instead of the default behavior, which is to
814 updated only if attribute is effectively updated.
815 key_values: By default False. If set to True, the payload uses
816 the keyValues simplified entity representation, i.e.
817 ContextEntityKeyValues.
818 Returns:
819 None
821 """
822 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
823 headers = self.headers.copy()
824 params = {}
825 if entity_type:
826 params.update({"type": entity_type})
827 else:
828 entity_type = "dummy"
830 options = []
831 if append_strict:
832 options.append("append")
833 if forcedUpdate:
834 options.append("forcedUpdate")
835 if key_values:
836 assert isinstance(attrs, dict), "for keyValues attrs has to be a dict"
837 options.append("keyValues")
838 if options:
839 params.update({"options": ",".join(options)})
841 if key_values:
842 entity = ContextEntityKeyValues(id=entity_id, type=entity_type, **attrs)
843 else:
844 entity = ContextEntity(id=entity_id, type=entity_type)
845 entity.add_attributes(attrs)
846 # exclude commands from the send data,
847 # as they live in the IoTA-agent
848 excluded_keys = {"id", "type"}
849 # excluded_keys.update(
850 # entity.get_commands(response_format=PropertyFormat.DICT).keys()
851 # )
852 try:
853 res = self.post(
854 url=url,
855 headers=headers,
856 json=entity.model_dump(exclude=excluded_keys, exclude_none=True),
857 params=params,
858 )
859 if res.ok:
860 self.logger.info("Entity '%s' successfully " "updated!", entity.id)
861 else:
862 res.raise_for_status()
863 except requests.RequestException as err:
864 msg = f"Could not update or append attributes of entity" f" {entity.id} !"
865 self.log_error(err=err, msg=msg)
866 raise
868 def _patch_entity_key_values(
869 self,
870 entity: Union[ContextEntityKeyValues, dict],
871 ):
872 """
873 The entity are updated with a ContextEntityKeyValues object or a
874 dictionary contain the simplified entity data. This corresponds to a
875 'PATCH' request.
876 Only existing attribute can be updated!
878 Args:
879 entity: A ContextEntityKeyValues object or a dictionary contain
880 the simplified entity data
882 """
883 if isinstance(entity, dict):
884 entity = ContextEntityKeyValues(**entity)
885 url = urljoin(self.base_url, f"v2/entities/{entity.id}/attrs")
886 headers = self.headers.copy()
887 params = {"type": entity.type, "options": AttrsFormat.KEY_VALUES.value}
888 try:
889 res = self.patch(
890 url=url,
891 headers=headers,
892 json=entity.model_dump(exclude={"id", "type"}, exclude_unset=True),
893 params=params,
894 )
895 if res.ok:
896 self.logger.info("Entity '%s' successfully " "updated!", entity.id)
897 else:
898 res.raise_for_status()
899 except requests.RequestException as err:
900 msg = f"Could not update attributes of entity" f" {entity.id} !"
901 self.log_error(err=err, msg=msg)
902 raise
904 def update_existing_entity_attributes(
905 self,
906 entity_id: str,
907 attrs: Union[
908 List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any]
909 ],
910 entity_type: str = None,
911 forcedUpdate: bool = False,
912 override_metadata: bool = False,
913 key_values: bool = False,
914 ):
915 """
916 The entity attributes are updated with the ones in the payload.
917 In addition to that, if one or more attributes in the payload doesn't
918 exist in the entity, an error is returned. This corresponds to a
919 'PATCH' request.
921 Args:
922 entity_id: Entity id to be updated
923 entity_type: Entity type, to avoid ambiguity in case there are
924 several entities with the same entity id.
925 attrs: List of attributes to update or to append
926 forcedUpdate: Update operation have to trigger any matching
927 subscription, no matter if there is an actual attribute
928 update or no instead of the default behavior, which is to
929 updated only if attribute is effectively updated.
930 override_metadata:
931 Bool,replace the existing metadata with the one provided in
932 the request
933 key_values: By default False. If set to True, the payload uses
934 the keyValues simplified entity representation, i.e.
935 ContextEntityKeyValues.
936 Returns:
937 None
939 """
940 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
941 headers = self.headers.copy()
942 if entity_type:
943 params = {"type": entity_type}
944 else:
945 params = None
946 entity_type = "dummy"
948 options = []
949 if override_metadata:
950 options.append("overrideMetadata")
951 if forcedUpdate:
952 options.append("forcedUpdate")
953 if key_values:
954 assert isinstance(attrs, dict), "for keyValues the attrs must be dict"
955 payload = attrs
956 options.append("keyValues")
957 else:
958 entity = ContextEntity(id=entity_id, type=entity_type)
959 entity.add_attributes(attrs)
960 payload = entity.model_dump(exclude={"id", "type"}, exclude_none=True)
961 if options:
962 params.update({"options": ",".join(options)})
964 try:
965 res = self.patch(
966 url=url,
967 headers=headers,
968 json=payload,
969 params=params,
970 )
971 if res.ok:
972 self.logger.info("Entity '%s' successfully " "updated!", entity_id)
973 else:
974 res.raise_for_status()
975 except requests.RequestException as err:
976 msg = f"Could not update attributes of entity" f" {entity_id} !"
977 self.log_error(err=err, msg=msg)
978 raise
980 def override_entity(
981 self, entity: Union[ContextEntity, ContextEntityKeyValues], **kwargs
982 ):
983 """
984 The request payload is an object representing the attributes to
985 override the existing entity.
987 Note:
988 If you want to manipulate you should rather use patch_entity.
990 Args:
991 entity (ContextEntity or ContextEntityKeyValues):
992 Returns:
993 None
994 """
995 return self.replace_entity_attributes(
996 entity_id=entity.id,
997 entity_type=entity.type,
998 attrs=entity.get_attributes(),
999 **kwargs,
1000 )
1002 def replace_entity_attributes(
1003 self,
1004 entity_id: str,
1005 attrs: Union[
1006 List[Union[NamedContextAttribute, Dict[str, ContextAttribute]]], Dict
1007 ],
1008 entity_type: str = None,
1009 forcedUpdate: bool = False,
1010 key_values: bool = False,
1011 ):
1012 """
1013 The attributes previously existing in the entity are removed and
1014 replaced by the ones in the request. This corresponds to a 'PUT'
1015 request.
1017 Args:
1018 entity_id: Entity id to be updated
1019 entity_type: Entity type, to avoid ambiguity in case there are
1020 several entities with the same entity id.
1021 attrs: List of attributes to add to the entity or dict of
1022 attributes in case of key_values=True.
1023 forcedUpdate: Update operation have to trigger any matching
1024 subscription, no matter if there is an actual attribute
1025 update or no instead of the default behavior, which is to
1026 updated only if attribute is effectively updated.
1027 key_values(bool):
1028 By default False. If set to True, "options=keyValues" will
1029 be included in params of the request. The payload uses
1030 the keyValues simplified entity representation, i.e.
1031 ContextEntityKeyValues.
1032 Returns:
1033 None
1034 """
1035 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
1036 headers = self.headers.copy()
1037 params = {}
1038 options = []
1039 if entity_type:
1040 params.update({"type": entity_type})
1041 else:
1042 entity_type = "dummy"
1044 if forcedUpdate:
1045 options.append("forcedUpdate")
1047 if key_values:
1048 options.append("keyValues")
1049 assert isinstance(attrs, dict)
1050 else:
1051 entity = ContextEntity(id=entity_id, type=entity_type)
1052 entity.add_attributes(attrs)
1053 attrs = entity.model_dump(exclude={"id", "type"}, exclude_none=True)
1054 if options:
1055 params.update({"options": ",".join(options)})
1057 try:
1058 res = self.put(
1059 url=url,
1060 headers=headers,
1061 json=attrs,
1062 params=params,
1063 )
1064 if res.ok:
1065 self.logger.info("Entity '%s' successfully " "updated!", entity_id)
1066 else:
1067 res.raise_for_status()
1068 except requests.RequestException as err:
1069 msg = f"Could not replace attribute of entity {entity_id} !"
1070 self.log_error(err=err, msg=msg)
1071 raise
1073 # Attribute operations
1074 def get_attribute(
1075 self,
1076 entity_id: str,
1077 attr_name: str,
1078 entity_type: str = None,
1079 metadata: str = None,
1080 response_format="",
1081 ) -> ContextAttribute:
1082 """
1083 Retrieves a specified attribute from an entity.
1085 Args:
1086 entity_id: Id of the entity. Example: Bcn_Welt
1087 attr_name: Name of the attribute to be retrieved.
1088 entity_type (Optional): Type of the entity to retrieve
1089 metadata (Optional): A list of metadata names to include in the
1090 response. See "Filtering out attributes and metadata" section
1091 for more detail.
1093 Returns:
1094 The content of the retrieved attribute as ContextAttribute
1096 Raises:
1097 Error
1099 """
1100 url = urljoin(
1101 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
1102 )
1103 headers = self.headers.copy()
1104 params = {}
1105 if entity_type:
1106 params.update({"type": entity_type})
1107 if metadata:
1108 params.update({"metadata": ",".join(metadata)})
1109 try:
1110 res = self.get(url=url, params=params, headers=headers)
1111 if res.ok:
1112 self.logger.debug("Received: %s", res.json())
1113 return ContextAttribute(**res.json())
1114 res.raise_for_status()
1115 except requests.RequestException as err:
1116 msg = (
1117 f"Could not load attribute '{attr_name}' from entity" f"'{entity_id}' "
1118 )
1119 self.log_error(err=err, msg=msg)
1120 raise
1122 def update_entity_attribute(
1123 self,
1124 entity_id: str,
1125 attr: Union[ContextAttribute, NamedContextAttribute],
1126 *,
1127 entity_type: str = None,
1128 attr_name: str = None,
1129 override_metadata: bool = True,
1130 forcedUpdate: bool = False,
1131 ):
1132 """
1133 Updates a specified attribute from an entity.
1135 Args:
1136 attr:
1137 context attribute to update
1138 entity_id:
1139 Id of the entity. Example: Bcn_Welt
1140 entity_type:
1141 Entity type, to avoid ambiguity in case there are
1142 several entities with the same entity id.
1143 forcedUpdate: Update operation have to trigger any matching
1144 subscription, no matter if there is an actual attribute
1145 update or no instead of the default behavior, which is to
1146 updated only if attribute is effectively updated.
1147 attr_name:
1148 Name of the attribute to be updated.
1149 override_metadata:
1150 Bool, if set to `True` (default) the metadata will be
1151 overwritten. This is for backwards compatibility reasons.
1152 If `False` the metadata values will be either updated if
1153 already existing or append if not.
1154 See also:
1155 https://fiware-orion.readthedocs.io/en/master/user/metadata.html
1156 """
1157 headers = self.headers.copy()
1158 if not isinstance(attr, NamedContextAttribute):
1159 assert attr_name is not None, (
1160 "Missing name for attribute. "
1161 "attr_name must be present if"
1162 "attr is of type ContextAttribute"
1163 )
1164 else:
1165 assert attr_name is None, (
1166 "Invalid argument attr_name. Do not set "
1167 "attr_name if attr is of type "
1168 "NamedContextAttribute"
1169 )
1170 attr_name = attr.name
1172 url = urljoin(
1173 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
1174 )
1175 params = {}
1176 if entity_type:
1177 params.update({"type": entity_type})
1178 # set overrideMetadata option (we assure backwards compatibility here)
1179 options = []
1180 if override_metadata:
1181 options.append("overrideMetadata")
1182 if forcedUpdate:
1183 options.append("forcedUpdate")
1184 if options:
1185 params.update({"options": ",".join(options)})
1186 try:
1187 res = self.put(
1188 url=url,
1189 headers=headers,
1190 params=params,
1191 json=attr.model_dump(exclude={"name"}, exclude_none=True),
1192 )
1193 if res.ok:
1194 self.logger.info(
1195 "Attribute '%s' of '%s' " "successfully updated!",
1196 attr_name,
1197 entity_id,
1198 )
1199 else:
1200 res.raise_for_status()
1201 except requests.RequestException as err:
1202 msg = (
1203 f"Could not update attribute '{attr_name}' of entity" f"'{entity_id}' "
1204 )
1205 self.log_error(err=err, msg=msg)
1206 raise
1208 def delete_entity_attribute(
1209 self, entity_id: str, attr_name: str, entity_type: str = None
1210 ) -> None:
1211 """
1212 Removes a specified attribute from an entity.
1214 Args:
1215 entity_id: Id of the entity.
1216 attr_name: Name of the attribute to be retrieved.
1217 entity_type: Entity type, to avoid ambiguity in case there are
1218 several entities with the same entity id.
1219 Raises:
1220 Error
1222 """
1223 url = urljoin(
1224 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
1225 )
1226 headers = self.headers.copy()
1227 params = {}
1228 if entity_type:
1229 params.update({"type": entity_type})
1230 try:
1231 res = self.delete(url=url, headers=headers)
1232 if res.ok:
1233 self.logger.info(
1234 "Attribute '%s' of '%s' " "successfully deleted!",
1235 attr_name,
1236 entity_id,
1237 )
1238 else:
1239 res.raise_for_status()
1240 except requests.RequestException as err:
1241 msg = f"Could not delete attribute '{attr_name}' of entity '{entity_id}'"
1242 self.log_error(err=err, msg=msg)
1243 raise
1245 # Attribute value operations
1246 def get_attribute_value(
1247 self, entity_id: str, attr_name: str, entity_type: str = None
1248 ) -> Any:
1249 """
1250 This operation returns the value property with the value of the
1251 attribute.
1253 Args:
1254 entity_id: Id of the entity. Example: Bcn_Welt
1255 attr_name: Name of the attribute to be retrieved.
1256 Example: temperature.
1257 entity_type: Entity type, to avoid ambiguity in case there are
1258 several entities with the same entity id.
1260 Returns:
1262 """
1263 url = urljoin(
1264 self.base_url,
1265 f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value",
1266 )
1267 headers = self.headers.copy()
1268 params = {}
1269 if entity_type:
1270 params.update({"type": entity_type})
1271 try:
1272 res = self.get(url=url, params=params, headers=headers)
1273 if res.ok:
1274 self.logger.debug("Received: %s", res.json())
1275 return res.json()
1276 res.raise_for_status()
1277 except requests.RequestException as err:
1278 msg = (
1279 f"Could not load value of attribute '{attr_name}' from "
1280 f"entity'{entity_id}' "
1281 )
1282 self.log_error(err=err, msg=msg)
1283 raise
1285 def update_attribute_value(
1286 self,
1287 *,
1288 entity_id: str,
1289 attr_name: str,
1290 value: Any,
1291 entity_type: str = None,
1292 forcedUpdate: bool = False,
1293 ):
1294 """
1295 Updates the value of a specified attribute of an entity
1297 Args:
1298 value: update value
1299 entity_id: Id of the entity. Example: Bcn_Welt
1300 attr_name: Name of the attribute to be retrieved.
1301 Example: temperature.
1302 entity_type: Entity type, to avoid ambiguity in case there are
1303 several entities with the same entity id.
1304 forcedUpdate: Update operation have to trigger any matching
1305 subscription, no matter if there is an actual attribute
1306 update or no instead of the default behavior, which is to
1307 updated only if attribute is effectively updated.
1308 Returns:
1310 """
1311 url = urljoin(
1312 self.base_url,
1313 f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value",
1314 )
1315 headers = self.headers.copy()
1316 params = {}
1317 if entity_type:
1318 params.update({"type": entity_type})
1319 options = []
1320 if forcedUpdate:
1321 options.append("forcedUpdate")
1322 if options:
1323 params.update({"options": ",".join(options)})
1324 try:
1325 if not isinstance(value, (dict, list)):
1326 headers.update({"Content-Type": "text/plain"})
1327 if isinstance(value, str):
1328 value = f"{value}"
1329 res = self.put(url=url, headers=headers, json=value, params=params)
1330 else:
1331 res = self.put(url=url, headers=headers, json=value, params=params)
1332 if res.ok:
1333 self.logger.info(
1334 "Attribute '%s' of '%s' " "successfully updated!",
1335 attr_name,
1336 entity_id,
1337 )
1338 else:
1339 res.raise_for_status()
1340 except requests.RequestException as err:
1341 msg = (
1342 f"Could not update value of attribute '{attr_name}' from "
1343 f"entity '{entity_id}' "
1344 )
1345 self.log_error(err=err, msg=msg)
1346 raise
1348 # Types Operations
1349 def get_entity_types(
1350 self, *, limit: int = None, offset: int = None, options: str = None
1351 ) -> List[Dict[str, Any]]:
1352 """
1354 Args:
1355 limit: Limit the number of types to be retrieved.
1356 offset: Skip a number of records.
1357 options: Options dictionary. Allowed: count, values
1359 Returns:
1361 """
1362 url = urljoin(self.base_url, f"{self._url_version}/types")
1363 headers = self.headers.copy()
1364 params = {}
1365 if limit:
1366 params.update({"limit": limit})
1367 if offset:
1368 params.update({"offset": offset})
1369 if options:
1370 params.update({"options": options})
1371 try:
1372 res = self.get(url=url, params=params, headers=headers)
1373 if res.ok:
1374 self.logger.debug("Received: %s", res.json())
1375 return res.json()
1376 res.raise_for_status()
1377 except requests.RequestException as err:
1378 msg = "Could not load entity types!"
1379 self.log_error(err=err, msg=msg)
1380 raise
1382 def get_entity_type(self, entity_type: str) -> Dict[str, Any]:
1383 """
1385 Args:
1386 entity_type: Entity Type. Example: Room
1388 Returns:
1390 """
1391 url = urljoin(self.base_url, f"{self._url_version}/types/{entity_type}")
1392 headers = self.headers.copy()
1393 params = {}
1394 try:
1395 res = self.get(url=url, params=params, headers=headers)
1396 if res.ok:
1397 self.logger.debug("Received: %s", res.json())
1398 return res.json()
1399 res.raise_for_status()
1400 except requests.RequestException as err:
1401 msg = f"Could not load entities of type" f"'{entity_type}' "
1402 self.log_error(err=err, msg=msg)
1403 raise
1405 # SUBSCRIPTION API ENDPOINTS
1406 def get_subscription_list(self, limit: PositiveInt = inf) -> List[Subscription]:
1407 """
1408 Returns a list of all the subscriptions present in the system.
1409 Args:
1410 limit: Limit the number of subscriptions to be retrieved
1411 Returns:
1412 list of subscriptions
1413 """
1414 url = urljoin(self.base_url, f"{self._url_version}/subscriptions/")
1415 headers = self.headers.copy()
1416 params = {}
1418 # We always use the 'count' option to check weather pagination is
1419 # required
1420 params.update({"options": "count"})
1421 try:
1422 items = self.__pagination(
1423 limit=limit, url=url, params=params, headers=headers
1424 )
1425 adapter = TypeAdapter(List[Subscription])
1426 return adapter.validate_python(items)
1427 except requests.RequestException as err:
1428 msg = "Could not load subscriptions!"
1429 self.log_error(err=err, msg=msg)
1430 raise
1432 def post_subscription(
1433 self,
1434 subscription: Subscription,
1435 update: bool = False,
1436 skip_initial_notification: bool = False,
1437 ) -> str:
1438 """
1439 Creates a new subscription. The subscription is represented by a
1440 Subscription object defined in filip.cb.models.
1442 If the subscription already exists, the adding is prevented and the id
1443 of the existing subscription is returned.
1445 A subscription is deemed as already existing if there exists a
1446 subscription with the exact same subject and notification fields. All
1447 optional fields are not considered.
1449 Args:
1450 subscription: Subscription
1451 update: True - If the subscription already exists, update it
1452 False- If the subscription already exists, throw warning
1453 skip_initial_notification: True - Initial Notifications will be
1454 sent to recipient containing the whole data. This is
1455 deprecated and removed from version 3.0 of the context broker.
1456 False - skip the initial notification
1457 Returns:
1458 str: Id of the (created) subscription
1460 """
1461 existing_subscriptions = self.get_subscription_list()
1463 sub_dict = subscription.model_dump(include={"subject", "notification"})
1464 for ex_sub in existing_subscriptions:
1465 if self._subscription_dicts_are_equal(
1466 sub_dict, ex_sub.model_dump(include={"subject", "notification"})
1467 ):
1468 self.logger.info("Subscription already exists")
1469 if update:
1470 self.logger.info("Updated subscription")
1471 subscription.id = ex_sub.id
1472 self.update_subscription(subscription)
1473 else:
1474 warnings.warn(
1475 f"Subscription existed already with the id" f" {ex_sub.id}"
1476 )
1477 return ex_sub.id
1479 params = {}
1480 if skip_initial_notification:
1481 version = self.get_version()["orion"]["version"]
1482 if parse_version(version) <= parse_version("3.1"):
1483 params.update({"options": "skipInitialNotification"})
1484 else:
1485 pass
1486 warnings.warn(
1487 f"Skip initial notifications is a deprecated "
1488 f"feature of older versions <=3.1 of the context "
1489 f"broker. The Context Broker that you requesting has "
1490 f"version: {version}. For newer versions we "
1491 f"automatically skip this option. Consider "
1492 f"refactoring and updating your services",
1493 DeprecationWarning,
1494 )
1496 url = urljoin(self.base_url, "v2/subscriptions")
1497 headers = self.headers.copy()
1498 headers.update({"Content-Type": "application/json"})
1499 try:
1500 res = self.post(
1501 url=url,
1502 headers=headers,
1503 data=subscription.model_dump_json(exclude={"id"}, exclude_none=True),
1504 params=params,
1505 )
1506 if res.ok:
1507 self.logger.info("Subscription successfully created!")
1508 return res.headers["Location"].split("/")[-1]
1509 res.raise_for_status()
1510 except requests.RequestException as err:
1511 msg = "Could not send subscription!"
1512 self.log_error(err=err, msg=msg)
1513 raise
1515 def get_subscription(self, subscription_id: str) -> Subscription:
1516 """
1517 Retrieves a subscription from
1518 Args:
1519 subscription_id: id of the subscription
1521 Returns:
1523 """
1524 url = urljoin(
1525 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}"
1526 )
1527 headers = self.headers.copy()
1528 try:
1529 res = self.get(url=url, headers=headers)
1530 if res.ok:
1531 self.logger.debug("Received: %s", res.json())
1532 return Subscription(**res.json())
1533 res.raise_for_status()
1534 except requests.RequestException as err:
1535 msg = f"Could not load subscription {subscription_id}"
1536 self.log_error(err=err, msg=msg)
1537 raise
1539 def update_subscription(
1540 self, subscription: Subscription, skip_initial_notification: bool = False
1541 ):
1542 """
1543 Only the fields included in the request are updated in the subscription.
1545 Args:
1546 subscription: Subscription to update
1547 skip_initial_notification: True - Initial Notifications will be
1548 sent to recipient containing the whole data. This is
1549 deprecated and removed from version 3.0 of the context broker.
1550 False - skip the initial notification
1552 Returns:
1553 None
1554 """
1555 params = {}
1556 if skip_initial_notification:
1557 version = self.get_version()["orion"]["version"]
1558 if parse_version(version) <= parse_version("3.1"):
1559 params.update({"options": "skipInitialNotification"})
1560 else:
1561 pass
1562 warnings.warn(
1563 f"Skip initial notifications is a deprecated "
1564 f"feature of older versions <3.1 of the context "
1565 f"broker. The Context Broker that you requesting has "
1566 f"version: {version}. For newer versions we "
1567 f"automatically skip this option. Consider "
1568 f"refactoring and updating your services",
1569 DeprecationWarning,
1570 )
1572 url = urljoin(
1573 self.base_url, f"{self._url_version}/subscriptions/{subscription.id}"
1574 )
1575 headers = self.headers.copy()
1576 headers.update({"Content-Type": "application/json"})
1577 try:
1578 res = self.patch(
1579 url=url,
1580 headers=headers,
1581 data=subscription.model_dump_json(exclude={"id"}, exclude_none=True),
1582 )
1583 if res.ok:
1584 self.logger.info("Subscription successfully updated!")
1585 else:
1586 res.raise_for_status()
1587 except requests.RequestException as err:
1588 msg = f"Could not update subscription {subscription.id}"
1589 self.log_error(err=err, msg=msg)
1590 raise
1592 def delete_subscription(self, subscription_id: str) -> None:
1593 """
1594 Deletes a subscription from a Context Broker
1595 Args:
1596 subscription_id: id of the subscription
1597 """
1598 url = urljoin(
1599 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}"
1600 )
1601 headers = self.headers.copy()
1602 try:
1603 res = self.delete(url=url, headers=headers)
1604 if res.ok:
1605 self.logger.info(
1606 f"Subscription '{subscription_id}' " f"successfully deleted!"
1607 )
1608 else:
1609 res.raise_for_status()
1610 except requests.RequestException as err:
1611 msg = f"Could not delete subscription {subscription_id}"
1612 self.log_error(err=err, msg=msg)
1613 raise
1615 # Registration API
1616 def get_registration_list(self, *, limit: PositiveInt = None) -> List[Registration]:
1617 """
1618 Lists all the context provider registrations present in the system.
1620 Args:
1621 limit: Limit the number of registrations to be retrieved
1622 Returns:
1624 """
1625 url = urljoin(self.base_url, f"{self._url_version}/registrations/")
1626 headers = self.headers.copy()
1627 params = {}
1629 # We always use the 'count' option to check weather pagination is
1630 # required
1631 params.update({"options": "count"})
1632 try:
1633 items = self.__pagination(
1634 limit=limit, url=url, params=params, headers=headers
1635 )
1636 adapter = TypeAdapter(List[Registration])
1637 return adapter.validate_python(items)
1638 except requests.RequestException as err:
1639 msg = "Could not load registrations!"
1640 self.log_error(err=err, msg=msg)
1641 raise
1643 def post_registration(self, registration: Registration):
1644 """
1645 Creates a new context provider registration. This is typically used
1646 for binding context sources as providers of certain data. The
1647 registration is represented by cb.models.Registration
1649 Args:
1650 registration (Registration):
1652 Returns:
1654 """
1655 url = urljoin(self.base_url, f"{self._url_version}/registrations")
1656 headers = self.headers.copy()
1657 headers.update({"Content-Type": "application/json"})
1658 try:
1659 res = self.post(
1660 url=url,
1661 headers=headers,
1662 data=registration.model_dump_json(exclude={"id"}, exclude_none=True),
1663 )
1664 if res.ok:
1665 self.logger.info("Registration successfully created!")
1666 return res.headers["Location"].split("/")[-1]
1667 res.raise_for_status()
1668 except requests.RequestException as err:
1669 msg = f"Could not send registration {registration.id}!"
1670 self.log_error(err=err, msg=msg)
1671 raise
1673 def get_registration(self, registration_id: str) -> Registration:
1674 """
1675 Retrieves a registration from context broker by id
1677 Args:
1678 registration_id: id of the registration
1680 Returns:
1681 Registration
1682 """
1683 url = urljoin(
1684 self.base_url, f"{self._url_version}/registrations/{registration_id}"
1685 )
1686 headers = self.headers.copy()
1687 try:
1688 res = self.get(url=url, headers=headers)
1689 if res.ok:
1690 self.logger.debug("Received: %s", res.json())
1691 return Registration(**res.json())
1692 res.raise_for_status()
1693 except requests.RequestException as err:
1694 msg = f"Could not load registration {registration_id} !"
1695 self.log_error(err=err, msg=msg)
1696 raise
1698 def add_valid_relationships(
1699 self, entities: List[ContextEntity]
1700 ) -> List[ContextEntity]:
1701 """
1702 Validate all attributes in the given entities. If the attribute value points to
1703 an existing entity, it is assumed that this attribute is a relationship, and it
1704 will be assigned with the attribute type "relationship"
1706 Args:
1707 entities: list of entities that need to be validated.
1709 Returns:
1710 updated entities
1711 """
1712 updated_entities = []
1713 for entity in entities[:]:
1714 for attr_name, attr_value in entity.model_dump(
1715 exclude={"id", "type"}
1716 ).items():
1717 if isinstance(attr_value, dict):
1718 if self.validate_relationship(attr_value):
1719 entity.update_attribute(
1720 {
1721 attr_name: ContextAttribute(
1722 **{
1723 "type": DataType.RELATIONSHIP,
1724 "value": attr_value.get("value"),
1725 }
1726 )
1727 }
1728 )
1729 updated_entities.append(entity)
1730 return updated_entities
1732 def remove_invalid_relationships(
1733 self, entities: List[ContextEntity], hard_remove: bool = True
1734 ) -> List[ContextEntity]:
1735 """
1736 Removes invalid relationships from the entities. An invalid relationship
1737 is a relationship that has no destination entity.
1739 Args:
1740 entities: list of entities that need to be validated.
1741 hard_remove: If True, invalid relationships will be deleted.
1742 If False, invalid relationships will be changed to Text
1743 attributes.
1745 Returns:
1746 updated entities
1747 """
1748 updated_entities = []
1749 for entity in entities[:]:
1750 for relationship in entity.get_relationships():
1751 if not self.validate_relationship(relationship):
1752 if hard_remove:
1753 entity.delete_attributes(attrs=[relationship])
1754 else:
1755 # change the attribute type to "Text"
1756 entity.update_attribute(
1757 attrs=[
1758 NamedContextAttribute(
1759 name=relationship.name,
1760 type=DataType.TEXT,
1761 value=relationship.value,
1762 )
1763 ]
1764 )
1765 updated_entities.append(entity)
1766 return updated_entities
1768 def validate_relationship(
1769 self, relationship: Union[NamedContextAttribute, ContextAttribute, Dict]
1770 ) -> bool:
1771 """
1772 Validates a relationship. A relationship is valid if it points to an existing
1773 entity. Otherwise, it is considered invalid
1775 Args:
1776 relationship: relationship to validate
1777 Returns
1778 True if the relationship is valid, False otherwise
1779 """
1780 if isinstance(relationship, NamedContextAttribute) or isinstance(
1781 relationship, ContextAttribute
1782 ):
1783 destination_id = relationship.value
1784 elif isinstance(relationship, dict):
1785 destination_id = relationship.get("value")
1786 if destination_id is None:
1787 raise ValueError(
1788 "Invalid relationship dictionary format\n"
1789 "Expected format: {"
1790 f'"type": "{DataType.RELATIONSHIP.value}", '
1791 '"value" "entity_id"}'
1792 )
1793 else:
1794 raise ValueError("Invalid relationship type.")
1795 try:
1796 destination_entity = self.get_entity(entity_id=destination_id)
1797 return destination_entity.id == destination_id
1798 except requests.RequestException as err:
1799 if err.response.status_code == 404:
1800 return False
1802 def update_registration(self, registration: Registration):
1803 """
1804 Only the fields included in the request are updated in the registration.
1806 Args:
1807 registration: Registration to update
1808 Returns:
1810 """
1811 url = urljoin(
1812 self.base_url, f"{self._url_version}/registrations/{registration.id}"
1813 )
1814 headers = self.headers.copy()
1815 headers.update({"Content-Type": "application/json"})
1816 try:
1817 res = self.patch(
1818 url=url,
1819 headers=headers,
1820 data=registration.model_dump_json(exclude={"id"}, exclude_none=True),
1821 )
1822 if res.ok:
1823 self.logger.info("Registration successfully updated!")
1824 else:
1825 res.raise_for_status()
1826 except requests.RequestException as err:
1827 msg = f"Could not update registration {registration.id} !"
1828 self.log_error(err=err, msg=msg)
1829 raise
1831 def delete_registration(self, registration_id: str) -> None:
1832 """
1833 Deletes a subscription from a Context Broker
1834 Args:
1835 registration_id: id of the subscription
1836 """
1837 url = urljoin(
1838 self.base_url, f"{self._url_version}/registrations/{registration_id}"
1839 )
1840 headers = self.headers.copy()
1841 try:
1842 res = self.delete(url=url, headers=headers)
1843 if res.ok:
1844 self.logger.info(
1845 "Registration '%s' " "successfully deleted!", registration_id
1846 )
1847 res.raise_for_status()
1848 except requests.RequestException as err:
1849 msg = f"Could not delete registration {registration_id} !"
1850 self.log_error(err=err, msg=msg)
1851 raise
1853 # Batch operation API
1854 def update(
1855 self,
1856 *,
1857 entities: List[Union[ContextEntity, ContextEntityKeyValues]],
1858 action_type: Union[ActionType, str],
1859 update_format: str = None,
1860 forcedUpdate: bool = False,
1861 override_metadata: bool = False,
1862 ) -> None:
1863 """
1864 This operation allows to create, update and/or delete several entities
1865 in a single batch operation.
1867 This operation is split in as many individual operations as entities
1868 in the entities vector, so the actionType is executed for each one of
1869 them. Depending on the actionType, a mapping with regular non-batch
1870 operations can be done:
1872 append: maps to POST /v2/entities (if the entity does not already exist)
1873 or POST /v2/entities/<id>/attrs (if the entity already exists).
1875 appendStrict: maps to POST /v2/entities (if the entity does not
1876 already exist) or POST /v2/entities/<id>/attrs?options=append (if the
1877 entity already exists).
1879 update: maps to PATCH /v2/entities/<id>/attrs.
1881 delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every
1882 attribute included in the entity or to DELETE /v2/entities/<id> if
1883 no attribute were included in the entity.
1885 replace: maps to PUT /v2/entities/<id>/attrs.
1887 Args:
1888 entities: "an array of entities, each entity specified using the "
1889 "JSON entity representation format "
1890 action_type (Update): "actionType, to specify the kind of update
1891 action to do: either append, appendStrict, update, delete,
1892 or replace. "
1893 update_format (str): Optional 'keyValues'
1894 forcedUpdate: Update operation have to trigger any matching
1895 subscription, no matter if there is an actual attribute
1896 update or no instead of the default behavior, which is to
1897 updated only if attribute is effectively updated.
1898 override_metadata:
1899 Bool, replace the existing metadata with the one provided in
1900 the request
1901 Returns:
1903 """
1905 url = urljoin(self.base_url, f"{self._url_version}/op/update")
1906 headers = self.headers.copy()
1907 headers.update({"Content-Type": "application/json"})
1908 params = {}
1909 options = []
1910 if override_metadata:
1911 options.append("overrideMetadata")
1912 if forcedUpdate:
1913 options.append("forcedUpdate")
1914 if update_format:
1915 assert (
1916 update_format == AttrsFormat.KEY_VALUES.value
1917 ), "Only 'keyValues' is allowed as update format"
1918 options.append("keyValues")
1919 if options:
1920 params.update({"options": ",".join(options)})
1921 update = Update(actionType=action_type, entities=entities)
1922 try:
1923 res = self.post(
1924 url=url,
1925 headers=headers,
1926 params=params,
1927 json=update.model_dump(by_alias=True),
1928 )
1929 if res.ok:
1930 self.logger.info("Update operation '%s' succeeded!", action_type)
1931 else:
1932 res.raise_for_status()
1933 except requests.RequestException as err:
1934 msg = f"Update operation '{action_type}' failed!"
1935 self.log_error(err=err, msg=msg)
1936 raise
1938 def query(
1939 self,
1940 *,
1941 query: Query,
1942 limit: PositiveInt = None,
1943 order_by: str = None,
1944 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
1945 ) -> List[Any]:
1946 """
1947 Generate api query
1948 Args:
1949 query (Query):
1950 limit (PositiveInt):
1951 order_by (str):
1952 response_format (AttrsFormat, str):
1953 Returns:
1954 The response payload is an Array containing one object per matching
1955 entity, or an empty array [] if no entities are found. The entities
1956 follow the JSON entity representation format (described in the
1957 section "JSON Entity Representation").
1958 """
1959 url = urljoin(self.base_url, f"{self._url_version}/op/query")
1960 headers = self.headers.copy()
1961 headers.update({"Content-Type": "application/json"})
1962 params = {"options": "count"}
1964 if response_format:
1965 if response_format not in list(AttrsFormat):
1966 raise ValueError(f"Value must be in {list(AttrsFormat)}")
1967 params["options"] = ",".join([response_format, "count"])
1968 try:
1969 items = self.__pagination(
1970 method=PaginationMethod.POST,
1971 url=url,
1972 headers=headers,
1973 params=params,
1974 data=query.model_dump_json(exclude_none=True),
1975 limit=limit,
1976 )
1977 if response_format == AttrsFormat.NORMALIZED:
1978 adapter = TypeAdapter(List[ContextEntity])
1979 return adapter.validate_python(items)
1980 if response_format == AttrsFormat.KEY_VALUES:
1981 adapter = TypeAdapter(List[ContextEntityKeyValues])
1982 return adapter.validate_python(items)
1983 return items
1984 except requests.RequestException as err:
1985 msg = "Query operation failed!"
1986 self.log_error(err=err, msg=msg)
1987 raise
1989 def notify(self, message: Message) -> None:
1990 """
1991 This operation is intended to consume a notification payload so that
1992 all the entity data included by such notification is persisted,
1993 overwriting if necessary. This operation is useful when one NGSIv2
1994 endpoint is subscribed to another NGSIv2 endpoint (federation
1995 scenarios). The request payload must be an NGSIv2 notification
1996 payload. The behaviour must be exactly the same as 'update'
1997 with 'action_type' equal to append.
1999 Args:
2000 message: Notification message
2002 Returns:
2003 None
2004 """
2005 url = urljoin(self.base_url, "v2/op/notify")
2006 headers = self.headers.copy()
2007 headers.update({"Content-Type": "application/json"})
2008 params = {}
2009 try:
2010 res = self.post(
2011 url=url,
2012 headers=headers,
2013 params=params,
2014 data=message.model_dump_json(by_alias=True),
2015 )
2016 if res.ok:
2017 self.logger.info("Notification message sent!")
2018 else:
2019 res.raise_for_status()
2020 except requests.RequestException as err:
2021 msg = (
2022 f"Sending notifcation message failed! \n "
2023 f"{message.model_dump_json(indent=2)}"
2024 )
2025 self.log_error(err=err, msg=msg)
2026 raise
2028 def post_command(
2029 self,
2030 *,
2031 entity_id: str,
2032 command: Union[Command, NamedCommand, Dict],
2033 entity_type: str = None,
2034 command_name: str = None,
2035 ) -> None:
2036 """
2037 Post a command to a context entity this corresponds to 'PATCH' of the
2038 specified command attribute.
2040 Args:
2041 entity_id: Entity identifier
2042 command: Command
2043 entity_type: Entity type
2044 command_name: Name of the command in the entity
2046 Returns:
2047 None
2048 """
2049 if command_name:
2050 assert isinstance(command, (Command, dict))
2051 if isinstance(command, dict):
2052 command = Command(**command)
2053 command = {command_name: command.model_dump()}
2054 else:
2055 assert isinstance(command, (NamedCommand, dict))
2056 if isinstance(command, dict):
2057 command = NamedCommand(**command)
2059 self.update_existing_entity_attributes(
2060 entity_id=entity_id, entity_type=entity_type, attrs=[command]
2061 )
2063 def does_entity_exist(self, entity_id: str, entity_type: str) -> bool:
2064 """
2065 Test if an entity with given id and type is present in the CB
2067 Args:
2068 entity_id: Entity id
2069 entity_type: Entity type
2071 Returns:
2072 bool; True if entity exists
2074 Raises:
2075 RequestException, if any error occurs (e.g: No Connection),
2076 except that the entity is not found
2077 """
2078 url = urljoin(self.base_url, f"v2/entities/{entity_id}")
2079 headers = self.headers.copy()
2080 params = {"type": entity_type}
2082 try:
2083 res = self.get(url=url, params=params, headers=headers)
2084 if res.ok:
2085 return True
2086 res.raise_for_status()
2087 except requests.RequestException as err:
2088 if err.response is None or not err.response.status_code == 404:
2089 self.log_error(err=err, msg="Checking entity existence failed!")
2090 raise
2091 return False
2093 def patch_entity(
2094 self,
2095 entity: ContextEntity,
2096 old_entity: Optional[ContextEntity] = None,
2097 override_attr_metadata: bool = True,
2098 ) -> None:
2099 """
2100 Takes a given entity and updates the state in the CB to match it.
2101 It is an extended equivalent to the HTTP method PATCH, which applies
2102 partial modifications to a resource.
2104 Args:
2105 entity: Entity to update
2106 old_entity: OPTIONAL, if given only the differences between the
2107 old_entity and entity are updated in the CB.
2108 Other changes made to the entity in CB, can be kept.
2109 If type or id was changed, the old_entity will be
2110 deleted.
2111 override_attr_metadata:
2112 Whether to override or append the attributes metadata.
2113 `True` for overwrite or `False` for update/append
2115 Returns:
2116 None
2117 """
2119 new_entity = entity
2121 if old_entity is None:
2122 # If no old entity_was provided we use the current state to compare
2123 # the entity to
2124 if self.does_entity_exist(
2125 entity_id=new_entity.id, entity_type=new_entity.type
2126 ):
2127 old_entity = self.get_entity(
2128 entity_id=new_entity.id, entity_type=new_entity.type
2129 )
2130 else:
2131 # the entity is new, post and finish
2132 self.post_entity(new_entity, update=False)
2133 return
2135 else:
2136 # An old_entity was provided
2137 # check if the old_entity (still) exists else recall methode
2138 # and discard old_entity
2139 if not self.does_entity_exist(
2140 entity_id=old_entity.id, entity_type=old_entity.type
2141 ):
2142 self.patch_entity(
2143 new_entity, override_attr_metadata=override_attr_metadata
2144 )
2145 return
2147 # if type or id was changed, the old_entity needs to be deleted
2148 # and the new_entity created
2149 # In this case we will lose the current state of the entity
2150 if old_entity.id != new_entity.id or old_entity.type != new_entity.type:
2151 self.delete_entity(entity_id=old_entity.id, entity_type=old_entity.type)
2153 if not self.does_entity_exist(
2154 entity_id=new_entity.id, entity_type=new_entity.type
2155 ):
2156 self.post_entity(entity=new_entity, update=False)
2157 return
2159 # At this point we know that we need to patch only the attributes of
2160 # the entity
2161 # Check the differences between the attributes of old and new entity
2162 # Delete the removed attributes, create the new ones,
2163 # and update the existing if necessary
2164 old_attributes = old_entity.get_attributes()
2165 new_attributes = new_entity.get_attributes()
2167 # Manage attributes that existed before
2168 for old_attr in old_attributes:
2169 # commands do not exist in the ContextEntity and are only
2170 # registrations to the corresponding device. Operations as
2171 # delete will fail as it does not technically exist
2172 corresponding_new_attr = None
2173 for new_attr in new_attributes:
2174 if new_attr.name == old_attr.name:
2175 corresponding_new_attr = new_attr
2177 if corresponding_new_attr is None:
2178 # Attribute no longer exists, delete it
2179 try:
2180 self.delete_entity_attribute(
2181 entity_id=new_entity.id,
2182 entity_type=new_entity.type,
2183 attr_name=old_attr.name,
2184 )
2185 except requests.RequestException as err:
2186 msg = (
2187 f"Failed to delete attribute {old_attr.name} of "
2188 f"entity {new_entity.id}."
2189 )
2190 if err.response is not None and err.response.status_code == 404:
2191 # if the attribute is provided by a registration the
2192 # deletion will fail
2193 msg += (
2194 f" The attribute is probably provided "
2195 f"by a registration."
2196 )
2197 self.log_error(err=err, msg=msg)
2198 else:
2199 self.log_error(err=err, msg=msg)
2200 raise
2201 else:
2202 # Check if attributed changed in any way, if yes update
2203 # else do nothing and keep current state
2204 if old_attr != corresponding_new_attr:
2205 try:
2206 self.update_entity_attribute(
2207 entity_id=new_entity.id,
2208 entity_type=new_entity.type,
2209 attr=corresponding_new_attr,
2210 override_metadata=override_attr_metadata,
2211 )
2212 except requests.RequestException as err:
2213 msg = (
2214 f"Failed to update attribute {old_attr.name} of "
2215 f"entity {new_entity.id}."
2216 )
2217 if err.response is not None and err.response.status_code == 404:
2218 # if the attribute is provided by a registration the
2219 # update will fail
2220 msg += (
2221 f" The attribute is probably provided "
2222 f"by a registration."
2223 )
2224 self.log_error(err=err, msg=msg)
2225 raise
2227 # Create new attributes
2228 update_entity = ContextEntity(id=entity.id, type=entity.type)
2229 update_needed = False
2230 for new_attr in new_attributes:
2231 # commands do not exist in the ContextEntity and are only
2232 # registrations to the corresponding device. Operations as
2233 # delete will fail as it does not technically exists
2234 attr_existed = False
2235 for old_attr in old_attributes:
2236 if new_attr.name == old_attr.name:
2237 attr_existed = True
2239 if not attr_existed:
2240 update_needed = True
2241 update_entity.add_attributes([new_attr])
2243 if update_needed:
2244 self.update_entity(update_entity)
2246 def _subscription_dicts_are_equal(self, first: dict, second: dict):
2247 """
2248 Check if two dictionaries and all sub-dictionaries are equal.
2249 Logs a warning if the keys are not equal, but ignores the
2250 comparison of such keys.
2252 Args:
2253 first dict: Dictionary of first subscription
2254 second dict: Dictionary of second subscription
2256 Returns:
2257 True if equal, else False
2258 """
2260 def _value_is_not_none(value):
2261 """
2262 Recursive function to check if a value equals none.
2263 If the value is a dict and any value of the dict is not none,
2264 the value is not none.
2265 If the value is a list and any item is not none, the value is not none.
2266 If it's neither dict nore list, bool is used.
2267 """
2268 if isinstance(value, dict):
2269 return any([_value_is_not_none(value=_v) for _v in value.values()])
2270 if isinstance(value, list):
2271 return any([_value_is_not_none(value=_v) for _v in value])
2272 else:
2273 return bool(value)
2275 if first.keys() != second.keys():
2276 warnings.warn(
2277 "Subscriptions contain a different set of fields. "
2278 "Only comparing to new fields of the new one."
2279 )
2280 for k, v in first.items():
2281 ex_value = second.get(k, None)
2282 if isinstance(v, dict) and isinstance(ex_value, dict):
2283 equal = self._subscription_dicts_are_equal(v, ex_value)
2284 if equal:
2285 continue
2286 else:
2287 return False
2288 if v != ex_value:
2289 self.logger.debug(f"Not equal fields for key {k}: ({v}, {ex_value})")
2290 if (
2291 not _value_is_not_none(v)
2292 and not _value_is_not_none(ex_value)
2293 or k == "timesSent"
2294 ):
2295 continue
2296 return False
2297 return True
2300#
2301#
2302# def check_duplicate_subscription(self, subscription_body, limit: int = 20):
2303# """
2304# Function compares the subject of the subscription body, on whether a subscription
2305# already exists for a device / entity.
2306# :param subscription_body: the body of the new subscripton
2307# :param limit: pagination parameter, to set the number of
2308# subscriptions bodies the get request should grab
2309# :return: exists, boolean -> True, if such a subscription allready
2310# exists
2311# """
2312# exists = False
2313# subscription_subject = json.loads(subscription_body)["subject"]
2314# # Exact keys depend on subscription body
2315# try:
2316# subscription_url = json.loads(subscription_body)[
2317# "notification"]["httpCustom"]["url"]
2318# except KeyError:
2319# subscription_url = json.loads(subscription_body)[
2320# "notification"]["http"]["url"]
2321#
2322# # If the number of subscriptions is larger then the limit,
2323# paginations methods have to be used
2324# url = self.url + '/v2/subscriptions?limit=' + str(limit) +
2325# '&options=count'
2326# response = self.session.get(url, headers=self.get_header())
2327#
2328# sub_count = float(response.headers["Fiware-Total-Count"])
2329# response = json.loads(response.text)
2330# if sub_count >= limit:
2331# response = self.get_pagination(url=url, headers=self.get_header(),
2332# limit=limit, count=sub_count)
2333# response = json.loads(response)
2334#
2335# for existing_subscription in response:
2336# # check whether the exact same subscriptions already exists
2337# if existing_subscription["subject"] == subscription_subject:
2338# exists = True
2339# break
2340# try:
2341# existing_url = existing_subscription["notification"][
2342# "http"]["url"]
2343# except KeyError:
2344# existing_url = existing_subscription["notification"][
2345# "httpCustom"]["url"]
2346# # check whether both subscriptions notify to the same path
2347# if existing_url != subscription_url:
2348# continue
2349# else:
2350# # iterate over all entities included in the subscription object
2351# for entity in subscription_subject["entities"]:
2352# if 'type' in entity.keys():
2353# subscription_type = entity['type']
2354# else:
2355# subscription_type = entity['typePattern']
2356# if 'id' in entity.keys():
2357# subscription_id = entity['id']
2358# else:
2359# subscription_id = entity["idPattern"]
2360# # iterate over all entities included in the exisiting
2361# subscriptions
2362# for existing_entity in existing_subscription["subject"][
2363# "entities"]:
2364# if "type" in entity.keys():
2365# type_existing = entity["type"]
2366# else:
2367# type_existing = entity["typePattern"]
2368# if "id" in entity.keys():
2369# id_existing = entity["id"]
2370# else:
2371# id_existing = entity["idPattern"]
2372# # as the ID field is non optional, it has to match
2373# # check whether the type match
2374# # if the type field is empty, they match all types
2375# if (type_existing == subscription_type) or\
2376# ('*' in subscription_type) or \
2377# ('*' in type_existing)\
2378# or (type_existing == "") or (
2379# subscription_type == ""):
2380# # check if on of the subscriptions is a pattern,
2381# or if they both refer to the same id
2382# # Get the attrs first, to avoid code duplication
2383# # last thing to compare is the attributes
2384# # Assumption -> position is the same as the
2385# entities _list
2386# # i == j
2387# i = subscription_subject["entities"].index(entity)
2388# j = existing_subscription["subject"][
2389# "entities"].index(existing_entity)
2390# try:
2391# subscription_attrs = subscription_subject[
2392# "condition"]["attrs"][i]
2393# except (KeyError, IndexError):
2394# subscription_attrs = []
2395# try:
2396# existing_attrs = existing_subscription[
2397# "subject"]["condition"]["attrs"][j]
2398# except (KeyError, IndexError):
2399# existing_attrs = []
2400#
2401# if (".*" in subscription_id) or ('.*' in
2402# id_existing) or (subscription_id == id_existing):
2403# # Attributes have to match, or the have to
2404# be an empty array
2405# if (subscription_attrs == existing_attrs) or
2406# (subscription_attrs == []) or (existing_attrs == []):
2407# exists = True
2408# # if they do not match completely or subscribe
2409# to all ids they have to match up to a certain position
2410# elif ("*" in subscription_id) or ('*' in
2411# id_existing):
2412# regex_existing = id_existing.find('*')
2413# regex_subscription =
2414# subscription_id.find('*')
2415# # slice the strings to compare
2416# if (id_existing[:regex_existing] in
2417# subscription_id) or (subscription_id[:regex_subscription] in id_existing) or \
2418# (id_existing[regex_existing:] in
2419# subscription_id) or (subscription_id[regex_subscription:] in id_existing):
2420# if (subscription_attrs ==
2421# existing_attrs) or (subscription_attrs == []) or (existing_attrs == []):
2422# exists = True
2423# else:
2424# continue
2425# else:
2426# continue
2427# else:
2428# continue
2429# else:
2430# continue
2431# else:
2432# continue
2433# return exists
2434#