Coverage for filip/clients/ngsi_v2/cb.py: 81%
673 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-20 16:54 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-20 16:54 +0000
1"""
2Context Broker Module for API Client
3"""
4from __future__ import annotations
6import copy
7from copy import deepcopy
8from math import inf
9from pkg_resources import parse_version
10from pydantic import PositiveInt, PositiveFloat, AnyHttpUrl
11from pydantic.type_adapter import TypeAdapter
12from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union
13import re
14import requests
15from urllib.parse import urljoin
16import warnings
17from filip.clients.base_http_client import BaseHttpClient
18from filip.config import settings
19from filip.models.base import FiwareHeader, PaginationMethod
20from filip.utils.simple_ql import QueryString
21from filip.models.ngsi_v2.context import (
22 ActionType,
23 Command,
24 ContextEntity,
25 ContextEntityKeyValues,
26 ContextAttribute,
27 NamedCommand,
28 NamedContextAttribute,
29 Query,
30 Update,
31 PropertyFormat,
32)
33from filip.models.ngsi_v2.base import AttrsFormat
34from filip.models.ngsi_v2.subscriptions import Subscription, Message
35from filip.models.ngsi_v2.registrations import Registration
37if TYPE_CHECKING:
38 from filip.clients.ngsi_v2.iota import IoTAClient
41class ContextBrokerClient(BaseHttpClient):
42 """
43 Implementation of NGSI Context Broker functionalities, such as creating
44 entities and subscriptions; retrieving, updating and deleting data.
45 Further documentation:
46 https://fiware-orion.readthedocs.io/en/master/
48 Api specifications for v2 are located here:
49 https://telefonicaid.github.io/fiware-orion/api/v2/stable/
51 Note:
52 We use the reference implementation for development. Therefore, some
53 other brokers may show slightly different behavior!
54 """
56 def __init__(
57 self,
58 url: str = None,
59 *,
60 session: requests.Session = None,
61 fiware_header: FiwareHeader = None,
62 **kwargs,
63 ):
64 """
66 Args:
67 url: Url of context broker server
68 session (requests.Session):
69 fiware_header (FiwareHeader): fiware service and fiware service path
70 **kwargs (Optional): Optional arguments that ``request`` takes.
71 """
72 # set service url
73 url = url or settings.CB_URL
74 super().__init__(
75 url=url, session=session, fiware_header=fiware_header, **kwargs
76 )
78 def __pagination(
79 self,
80 *,
81 method: PaginationMethod = PaginationMethod.GET,
82 url: str,
83 headers: Dict,
84 limit: Union[PositiveInt, PositiveFloat] = None,
85 params: Dict = None,
86 data: str = None,
87 ) -> List[Dict]:
88 """
89 NGSIv2 implements a pagination mechanism in order to help clients to
90 retrieve large sets of resources. This mechanism works for all listing
91 operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions,
92 POST /v2/op/query, etc.). This function helps getting datasets that are
93 larger than the limit for the different GET operations.
95 https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html
97 Args:
98 url: Information about the url, obtained from the original function
99 headers: The headers from the original function
100 params:
101 limit:
103 Returns:
104 object:
106 """
107 if limit is None:
108 limit = inf
109 if limit > 1000:
110 params["limit"] = 1000 # maximum items per request
111 else:
112 params["limit"] = limit
114 if self.session:
115 session = self.session
116 else:
117 session = requests.Session()
118 with session:
119 res = session.request(
120 method=method, url=url, params=params, headers=headers, data=data
121 )
122 if res.ok:
123 items = res.json()
124 # do pagination
125 count = int(res.headers["Fiware-Total-Count"])
127 while len(items) < limit and len(items) < count:
128 # Establishing the offset from where entities are retrieved
129 params["offset"] = len(items)
130 params["limit"] = min(1000, (limit - len(items)))
131 res = session.request(
132 method=method,
133 url=url,
134 params=params,
135 headers=headers,
136 data=data,
137 )
138 if res.ok:
139 items.extend(res.json())
140 else:
141 res.raise_for_status()
142 self.logger.debug("Received: %s", items)
143 return items
144 res.raise_for_status()
146 # MANAGEMENT API
147 def get_version(self) -> Dict:
148 """
149 Gets version of IoT Agent
150 Returns:
151 Dictionary with response
152 """
153 url = urljoin(self.base_url, "version")
154 try:
155 res = self.get(url=url, headers=self.headers)
156 if res.ok:
157 return res.json()
158 res.raise_for_status()
159 except requests.RequestException as err:
160 self.logger.error(err)
161 raise
163 def get_resources(self) -> Dict:
164 """
165 Gets reo
167 Returns:
168 Dict
169 """
170 url = urljoin(self.base_url, "v2")
171 try:
172 res = self.get(url=url, headers=self.headers)
173 if res.ok:
174 return res.json()
175 res.raise_for_status()
176 except requests.RequestException as err:
177 self.logger.error(err)
178 raise
180 # STATISTICS API
181 def get_statistics(self) -> Dict:
182 """
183 Gets statistics of context broker
184 Returns:
185 Dictionary with response
186 """
187 url = urljoin(self.base_url, "statistics")
188 try:
189 res = self.get(url=url, headers=self.headers)
190 if res.ok:
191 return res.json()
192 res.raise_for_status()
193 except requests.RequestException as err:
194 self.logger.error(err)
195 raise
197 # CONTEXT MANAGEMENT API ENDPOINTS
198 # Entity Operations
199 def post_entity(
200 self,
201 entity: Union[ContextEntity, ContextEntityKeyValues],
202 update: bool = False,
203 patch: bool = False,
204 override_attr_metadata: bool = True,
205 key_values: bool = False,
206 ):
207 """
208 Function registers an Object with the NGSI Context Broker,
209 if it already exists it can be automatically updated (overwritten)
210 if the update bool is True.
211 First a post request with the entity is tried, if the response code
212 is 422 the entity is uncrossable, as it already exists there are two
213 options, either overwrite it, if the attribute have changed
214 (e.g. at least one new/new values) (update = True) or leave
215 it the way it is (update=False)
216 If you only want to manipulate the entities values, you need to set
217 patch argument.
219 Args:
220 entity (ContextEntity/ContextEntityKeyValues):
221 Context Entity Object
222 update (bool):
223 If the response.status_code is 422, whether the override and
224 existing entity
225 patch (bool):
226 If the response.status_code is 422, whether to manipulate the
227 existing entity. Omitted if update `True`.
228 override_attr_metadata:
229 Only applies for patch equal to `True`.
230 Whether to override or append the attribute's metadata.
231 `True` for overwrite or `False` for update/append
232 key_values(bool):
233 By default False. If set to True, "options=keyValues" will
234 be included in params of post request. The payload uses
235 the keyValues simplified entity representation, i.e.
236 ContextEntityKeyValues.
237 """
238 url = urljoin(self.base_url, "v2/entities")
239 headers = self.headers.copy()
240 params = {}
241 options = []
242 if key_values:
243 assert isinstance(entity, ContextEntityKeyValues)
244 options.append("keyValues")
245 else:
246 assert isinstance(entity, ContextEntity)
247 if options:
248 params.update({'options': ",".join(options)})
249 try:
250 res = self.post(
251 url=url, headers=headers, json=entity.model_dump(exclude_none=True),
252 params=params,
253 )
254 if res.ok:
255 self.logger.info("Entity successfully posted!")
256 return res.headers.get("Location")
257 res.raise_for_status()
258 except requests.RequestException as err:
259 if update and err.response.status_code == 422:
260 return self.override_entity(
261 entity=entity, key_values=key_values)
262 if patch and err.response.status_code == 422:
263 if not key_values:
264 return self.patch_entity(
265 entity=entity, override_attr_metadata=override_attr_metadata
266 )
267 else:
268 return self._patch_entity_key_values(entity=entity)
269 msg = f"Could not post entity {entity.id}"
270 self.log_error(err=err, msg=msg)
271 raise
273 def get_entity_list(
274 self,
275 *,
276 entity_ids: List[str] = None,
277 entity_types: List[str] = None,
278 id_pattern: str = None,
279 type_pattern: str = None,
280 q: Union[str, QueryString] = None,
281 mq: Union[str, QueryString] = None,
282 georel: str = None,
283 geometry: str = None,
284 coords: str = None,
285 limit: PositiveInt = inf,
286 attrs: List[str] = None,
287 metadata: str = None,
288 order_by: str = None,
289 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
290 ) -> List[Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]]:
291 r"""
292 Retrieves a list of context entities that match different criteria by
293 id, type, pattern matching (either id or type) and/or those which
294 match a query or geographical query (see Simple Query Language and
295 Geographical Queries). A given entity has to match all the criteria
296 to be retrieved (i.e., the criteria is combined in a logical AND
297 way). Note that pattern matching query parameters are incompatible
298 (i.e. mutually exclusive) with their corresponding exact matching
299 parameters, i.e. idPattern with id and typePattern with type.
301 Args:
302 entity_ids: A comma-separated list of elements. Retrieve entities
303 whose ID matches one of the elements in the list.
304 Incompatible with idPattern,e.g. Boe_Idarium
305 entity_types: comma-separated list of elements. Retrieve entities
306 whose type matches one of the elements in the list.
307 Incompatible with typePattern. Example: Room.
308 id_pattern: A correctly formatted regular expression. Retrieve
309 entities whose ID matches the regular expression. Incompatible
310 with id, e.g. ngsi-ld.* or sensor.*
311 type_pattern: A correctly formatted regular expression. Retrieve
312 entities whose type matches the regular expression.
313 Incompatible with type, e.g. room.*
314 q (SimpleQuery): A query expression, composed of a list of
315 statements separated by ;, i.e.,
316 q=statement1;statement2;statement3. See Simple Query
317 Language specification. Example: temperature>40.
318 mq (SimpleQuery): A query expression for attribute metadata,
319 composed of a list of statements separated by ;, i.e.,
320 mq=statement1;statement2;statement3. See Simple Query
321 Language specification. Example: temperature.accuracy<0.9.
322 georel: Spatial relationship between matching entities and a
323 reference shape. See Geographical Queries. Example: 'near'.
324 geometry: Geographical area to which the query is restricted.
325 See Geographical Queries. Example: point.
326 coords: List of latitude-longitude pairs of coordinates separated
327 by ';'. See Geographical Queries. Example: 41.390205,
328 2.154007;48.8566,2.3522.
329 limit: Limits the number of entities to be retrieved Example: 20
330 attrs: Comma-separated list of attribute names whose data are to
331 be included in the response. The attributes are retrieved in
332 the order specified by this parameter. If this parameter is
333 not included, the attributes are retrieved in arbitrary
334 order. See "Filtering out attributes and metadata" section
335 for more detail. Example: seatNumber.
336 metadata: A list of metadata names to include in the response.
337 See "Filtering out attributes and metadata" section for more
338 detail. Example: accuracy.
339 order_by: Criteria for ordering results. See "Ordering Results"
340 section for details. Example: temperature,!speed.
341 response_format (AttrsFormat, str): Response Format. Note: That if
342 'keyValues' or 'values' are used the response model will
343 change to List[ContextEntityKeyValues] and to List[Dict[str,
344 Any]], respectively.
345 Returns:
347 """
348 url = urljoin(self.base_url, "v2/entities/")
349 headers = self.headers.copy()
350 params = {}
352 if entity_ids and id_pattern:
353 raise ValueError
354 if entity_types and type_pattern:
355 raise ValueError
356 if entity_ids:
357 if not isinstance(entity_ids, list):
358 entity_ids = [entity_ids]
359 params.update({"id": ",".join(entity_ids)})
360 if id_pattern:
361 try:
362 re.compile(id_pattern)
363 except re.error as err:
364 raise ValueError(f"Invalid Pattern: {err}") from err
365 params.update({"idPattern": id_pattern})
366 if entity_types:
367 if not isinstance(entity_types, list):
368 entity_types = [entity_types]
369 params.update({"type": ",".join(entity_types)})
370 if type_pattern:
371 try:
372 re.compile(type_pattern)
373 except re.error as err:
374 raise ValueError(f"Invalid Pattern: {err.msg}") from err
375 params.update({"typePattern": type_pattern})
376 if attrs:
377 params.update({"attrs": ",".join(attrs)})
378 if metadata:
379 params.update({"metadata": ",".join(metadata)})
380 if q:
381 if isinstance(q, str):
382 q = QueryString.parse_str(q)
383 params.update({"q": str(q)})
384 if mq:
385 params.update({"mq": str(mq)})
386 if geometry:
387 params.update({"geometry": geometry})
388 if georel:
389 params.update({"georel": georel})
390 if coords:
391 params.update({"coords": coords})
392 if order_by:
393 params.update({"orderBy": order_by})
394 if response_format not in list(AttrsFormat):
395 raise ValueError(f"Value must be in {list(AttrsFormat)}")
396 response_format = ",".join(["count", response_format])
397 params.update({"options": response_format})
398 try:
399 items = self.__pagination(
400 method=PaginationMethod.GET,
401 limit=limit,
402 url=url,
403 params=params,
404 headers=headers,
405 )
406 if AttrsFormat.NORMALIZED in response_format:
407 adapter = TypeAdapter(List[ContextEntity])
408 return adapter.validate_python(items)
409 if AttrsFormat.KEY_VALUES in response_format:
410 adapter = TypeAdapter(List[ContextEntityKeyValues])
411 return adapter.validate_python(items)
412 return items
414 except requests.RequestException as err:
415 msg = "Could not load entities"
416 self.log_error(err=err, msg=msg)
417 raise
419 def get_entity(
420 self,
421 entity_id: str,
422 entity_type: str = None,
423 attrs: List[str] = None,
424 metadata: List[str] = None,
425 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
426 ) -> Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]:
427 """
428 This operation must return one entity element only, but there may be
429 more than one entity with the same ID (e.g. entities with same ID but
430 different types). In such case, an error message is returned, with
431 the HTTP status code set to 409 Conflict.
433 Args:
434 entity_id (String): Id of the entity to be retrieved
435 entity_type (String): Entity type, to avoid ambiguity in case
436 there are several entities with the same entity id.
437 attrs (List of Strings): List of attribute names whose data must be
438 included in the response. The attributes are retrieved in the
439 order specified by this parameter.
440 See "Filtering out attributes and metadata" section for more
441 detail. If this parameter is not included, the attributes are
442 retrieved in arbitrary order, and all the attributes of the
443 entity are included in the response.
444 Example: temperature, humidity.
445 metadata (List of Strings): A list of metadata names to include in
446 the response. See "Filtering out attributes and metadata"
447 section for more detail. Example: accuracy.
448 response_format (AttrsFormat, str): Representation format of
449 response
450 Returns:
451 ContextEntity
452 """
453 url = urljoin(self.base_url, f"v2/entities/{entity_id}")
454 headers = self.headers.copy()
455 params = {}
456 if entity_type:
457 params.update({"type": entity_type})
458 if attrs:
459 params.update({"attrs": ",".join(attrs)})
460 if metadata:
461 params.update({"metadata": ",".join(metadata)})
462 if response_format not in list(AttrsFormat):
463 raise ValueError(f"Value must be in {list(AttrsFormat)}")
464 params.update({"options": response_format})
466 try:
467 res = self.get(url=url, params=params, headers=headers)
468 if res.ok:
469 self.logger.info("Entity successfully retrieved!")
470 self.logger.debug("Received: %s", res.json())
471 if response_format == AttrsFormat.NORMALIZED:
472 return ContextEntity(**res.json())
473 if response_format == AttrsFormat.KEY_VALUES:
474 return ContextEntityKeyValues(**res.json())
475 return res.json()
476 res.raise_for_status()
477 except requests.RequestException as err:
478 msg = f"Could not load entity {entity_id}"
479 self.log_error(err=err, msg=msg)
480 raise
482 def get_entity_attributes(
483 self,
484 entity_id: str,
485 entity_type: str = None,
486 attrs: List[str] = None,
487 metadata: List[str] = None,
488 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
489 ) -> Dict[str, ContextAttribute]:
490 """
491 This request is similar to retrieving the whole entity, however this
492 one omits the id and type fields. Just like the general request of
493 getting an entire entity, this operation must return only one entity
494 element. If more than one entity with the same ID is found (e.g.
495 entities with same ID but different type), an error message is
496 returned, with the HTTP status code set to 409 Conflict.
498 Args:
499 entity_id (String): Id of the entity to be retrieved
500 entity_type (String): Entity type, to avoid ambiguity in case
501 there are several entities with the same entity id.
502 attrs (List of Strings): List of attribute names whose data must be
503 included in the response. The attributes are retrieved in the
504 order specified by this parameter.
505 See "Filtering out attributes and metadata" section for more
506 detail. If this parameter is not included, the attributes are
507 retrieved in arbitrary order, and all the attributes of the
508 entity are included in the response. Example: temperature,
509 humidity.
510 metadata (List of Strings): A list of metadata names to include in
511 the response. See "Filtering out attributes and metadata"
512 section for more detail. Example: accuracy.
513 response_format (AttrsFormat, str): Representation format of
514 response
515 Returns:
516 Dict
517 """
518 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs")
519 headers = self.headers.copy()
520 params = {}
521 if entity_type:
522 params.update({"type": entity_type})
523 if attrs:
524 params.update({"attrs": ",".join(attrs)})
525 if metadata:
526 params.update({"metadata": ",".join(metadata)})
527 if response_format not in list(AttrsFormat):
528 raise ValueError(f"Value must be in {list(AttrsFormat)}")
529 params.update({"options": response_format})
530 try:
531 res = self.get(url=url, params=params, headers=headers)
532 if res.ok:
533 if response_format == AttrsFormat.NORMALIZED:
534 return {
535 key: ContextAttribute(**values)
536 for key, values in res.json().items()
537 }
538 return res.json()
539 res.raise_for_status()
540 except requests.RequestException as err:
541 msg = f"Could not load attributes from entity {entity_id} !"
542 self.log_error(err=err, msg=msg)
543 raise
545 def update_entity(self, entity: Union[ContextEntity, ContextEntityKeyValues, dict],
546 append_strict: bool = False,
547 key_values: bool = False
548 ):
549 """
550 The request payload is an object representing the attributes to
551 append or update.
553 Note:
554 Update means overwriting the existing entity. If you want to
555 manipulate you should rather use patch_entity.
557 Args:
558 entity (ContextEntity):
559 append_strict: If `False` the entity attributes are updated (if they
560 previously exist) or appended (if they don't previously exist)
561 with the ones in the payload.
562 If `True` all the attributes in the payload not
563 previously existing in the entity are appended. In addition
564 to that, in case some of the attributes in the payload
565 already exist in the entity, an error is returned.
566 More precisely this means a strict append procedure.
567 key_values: By default False. If set to True, the payload uses
568 the keyValues simplified entity representation, i.e.
569 ContextEntityKeyValues.
570 Returns:
571 None
572 """
573 if key_values:
574 if isinstance(entity, dict):
575 entity = copy.deepcopy(entity)
576 _id = entity.pop("id")
577 _type = entity.pop("type")
578 attrs = entity
579 entity = ContextEntityKeyValues(id=_id, type=_type)
580 else:
581 attrs = entity.model_dump(exclude={"id", "type"})
582 else:
583 attrs = entity.get_attributes()
584 self.update_or_append_entity_attributes(
585 entity_id=entity.id,
586 entity_type=entity.type,
587 attrs=attrs,
588 append_strict=append_strict,
589 key_values=key_values,
590 )
592 def update_entity_properties(self, entity: ContextEntity, append_strict: bool = False):
593 """
594 The request payload is an object representing the attributes, of any type
595 but Relationship, to append or update.
597 Note:
598 Update means overwriting the existing entity. If you want to
599 manipulate you should rather use patch_entity.
601 Args:
602 entity (ContextEntity):
603 append_strict: If `False` the entity attributes are updated (if they
604 previously exist) or appended (if they don't previously exist)
605 with the ones in the payload.
606 If `True` all the attributes in the payload not
607 previously existing in the entity are appended. In addition
608 to that, in case some of the attributes in the payload
609 already exist in the entity, an error is returned.
610 More precisely this means a strict append procedure.
612 Returns:
613 None
614 """
615 self.update_or_append_entity_attributes(
616 entity_id=entity.id,
617 entity_type=entity.type,
618 attrs=entity.get_properties(),
619 append_strict=append_strict,
620 )
622 def update_entity_relationships(self, entity: ContextEntity,
623 append_strict: bool = False):
624 """
625 The request payload is an object representing only the attributes, of type
626 Relationship, to append or update.
628 Note:
629 Update means overwriting the existing entity. If you want to
630 manipulate you should rather use patch_entity.
632 Args:
633 entity (ContextEntity):
634 append_strict: If `False` the entity attributes are updated (if they
635 previously exist) or appended (if they don't previously exist)
636 with the ones in the payload.
637 If `True` all the attributes in the payload not
638 previously existing in the entity are appended. In addition
639 to that, in case some of the attributes in the payload
640 already exist in the entity, an error is returned.
641 More precisely this means a strict append procedure.
643 Returns:
644 None
645 """
646 self.update_or_append_entity_attributes(
647 entity_id=entity.id,
648 entity_type=entity.type,
649 attrs=entity.get_relationships(),
650 append_strict=append_strict,
651 )
653 def delete_entity(
654 self,
655 entity_id: str,
656 entity_type: str= None,
657 delete_devices: bool = False,
658 iota_client: IoTAClient = None,
659 iota_url: AnyHttpUrl = settings.IOTA_URL,
660 ) -> None:
661 """
662 Remove a entity from the context broker. No payload is required
663 or received.
665 Args:
666 entity_id:
667 Id of the entity to be deleted
668 entity_type:
669 Entity type, to avoid ambiguity in case there are several
670 entities with the same entity id.
671 delete_devices:
672 If True, also delete all devices that reference this
673 entity (entity_id as entity_name)
674 iota_client:
675 Corresponding IoTA-Client used to access IoTA-Agent
676 iota_url:
677 URL of the corresponding IoT-Agent. This will autogenerate
678 an IoTA-Client, mirroring the information of the
679 ContextBrokerClient, e.g. FiwareHeader, and other headers
681 Returns:
682 None
683 """
684 url = urljoin(self.base_url, f"v2/entities/{entity_id}")
685 headers = self.headers.copy()
686 if entity_type:
687 params = {'type': entity_type}
688 else:
689 params = None
690 try:
691 res = self.delete(url=url, params=params, headers=headers)
692 if res.ok:
693 self.logger.info("Entity '%s' successfully deleted!", entity_id)
694 else:
695 res.raise_for_status()
696 except requests.RequestException as err:
697 msg = f"Could not delete entity {entity_id} !"
698 self.log_error(err=err, msg=msg)
699 raise
701 if delete_devices:
702 from filip.clients.ngsi_v2 import IoTAClient
704 if iota_client:
705 iota_client_local = deepcopy(iota_client)
706 else:
707 warnings.warn(
708 "No IoTA-Client object provided! "
709 "Will try to generate one. "
710 "This usage is not recommended."
711 )
713 iota_client_local = IoTAClient(
714 url=iota_url,
715 fiware_header=self.fiware_headers,
716 headers=self.headers,
717 )
719 for device in iota_client_local.get_device_list(
720 entity_names=[entity_id]):
721 if entity_type:
722 if device.entity_type == entity_type:
723 iota_client_local.delete_device(device_id=device.device_id)
724 else:
725 iota_client_local.delete_device(device_id=device.device_id)
726 iota_client_local.close()
728 def delete_entities(self, entities: List[ContextEntity]) -> None:
729 """
730 Remove a list of entities from the context broker. This methode is
731 more efficient than to call delete_entity() for each entity
733 Args:
734 entities: List[ContextEntity]: List of entities to be deleted
736 Raises:
737 Exception, if one of the entities is not in the ContextBroker
739 Returns:
740 None
741 """
743 # update() delete, deletes all entities without attributes completely,
744 # and removes the attributes for the other
745 # The entities are sorted based on the fact if they have
746 # attributes.
747 entities_with_attributes: List[ContextEntity] = []
748 for entity in entities:
749 attribute_names = [
750 key
751 for key in entity.model_dump()
752 if key not in ContextEntity.model_fields
753 ]
754 if len(attribute_names) > 0:
755 entities_with_attributes.append(
756 ContextEntity(id=entity.id, type=entity.type)
757 )
759 # Post update_delete for those without attribute only once,
760 # for the other post update_delete again but for the changed entity
761 # in the ContextBroker (only id and type left)
762 if len(entities) > 0:
763 self.update(entities=entities, action_type="delete")
764 if len(entities_with_attributes) > 0:
765 self.update(entities=entities_with_attributes, action_type="delete")
767 def update_or_append_entity_attributes(
768 self,
769 entity_id: str,
770 attrs: Union[List[NamedContextAttribute],
771 Dict[str, ContextAttribute],
772 Dict[str, Any]],
773 entity_type: str = None,
774 append_strict: bool = False,
775 forcedUpdate: bool = False,
776 key_values: bool = False
777 ):
778 """
779 The request payload is an object representing the attributes to
780 append or update. This corresponds to a 'POST' request if append is
781 set to 'False'
783 Note:
784 Be careful not to update attributes that are
785 provided via context registration, e.g. commands. Commands are
786 removed before sending the request. To avoid breaking things.
788 Args:
789 entity_id: Entity id to be updated
790 entity_type: Entity type, to avoid ambiguity in case there are
791 several entities with the same entity id.
792 attrs: List of attributes to update or to append
793 append_strict: If `False` the entity attributes are updated (if they
794 previously exist) or appended (if they don't previously exist)
795 with the ones in the payload.
796 If `True` all the attributes in the payload not
797 previously existing in the entity are appended. In addition
798 to that, in case some of the attributes in the payload
799 already exist in the entity, an error is returned.
800 More precisely this means a strict append procedure.
801 forcedUpdate: Update operation have to trigger any matching
802 subscription, no matter if there is an actual attribute
803 update or no instead of the default behavior, which is to
804 updated only if attribute is effectively updated.
805 key_values: By default False. If set to True, the payload uses
806 the keyValues simplified entity representation, i.e.
807 ContextEntityKeyValues.
808 Returns:
809 None
811 """
812 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs")
813 headers = self.headers.copy()
814 params = {}
815 if entity_type:
816 params.update({'type': entity_type})
817 else:
818 entity_type = "dummy"
820 options = []
821 if append_strict:
822 options.append("append")
823 if forcedUpdate:
824 options.append("forcedUpdate")
825 if key_values:
826 assert isinstance(attrs, dict), "for keyValues attrs has to be a dict"
827 options.append("keyValues")
828 if options:
829 params.update({'options': ",".join(options)})
831 if key_values:
832 entity = ContextEntityKeyValues(id=entity_id, type=entity_type, **attrs)
833 else:
834 entity = ContextEntity(id=entity_id, type=entity_type)
835 entity.add_attributes(attrs)
836 # exclude commands from the send data,
837 # as they live in the IoTA-agent
838 excluded_keys = {"id", "type"}
839 # excluded_keys.update(
840 # entity.get_commands(response_format=PropertyFormat.DICT).keys()
841 # )
842 try:
843 res = self.post(
844 url=url,
845 headers=headers,
846 json=entity.model_dump(
847 exclude=excluded_keys,
848 exclude_none=True
849 ),
850 params=params,
851 )
852 if res.ok:
853 self.logger.info("Entity '%s' successfully " "updated!", entity.id)
854 else:
855 res.raise_for_status()
856 except requests.RequestException as err:
857 msg = f"Could not update or append attributes of entity" f" {entity.id} !"
858 self.log_error(err=err, msg=msg)
859 raise
861 def _patch_entity_key_values(self,
862 entity: Union[ContextEntityKeyValues, dict],):
863 """
864 The entity are updated with a ContextEntityKeyValues object or a
865 dictionary contain the simplified entity data. This corresponds to a
866 'PATCH' request.
867 Only existing attribute can be updated!
869 Args:
870 entity: A ContextEntityKeyValues object or a dictionary contain
871 the simplified entity data
873 """
874 if isinstance(entity, dict):
875 entity = ContextEntityKeyValues(**entity)
876 url = urljoin(self.base_url, f'v2/entities/{entity.id}/attrs')
877 headers = self.headers.copy()
878 params = {"type": entity.type,
879 "options": AttrsFormat.KEY_VALUES.value
880 }
881 try:
882 res = self.patch(url=url,
883 headers=headers,
884 json=entity.model_dump(exclude={'id', 'type'},
885 exclude_unset=True),
886 params=params)
887 if res.ok:
888 self.logger.info("Entity '%s' successfully "
889 "updated!", entity.id)
890 else:
891 res.raise_for_status()
892 except requests.RequestException as err:
893 msg = f"Could not update attributes of entity" \
894 f" {entity.id} !"
895 self.log_error(err=err, msg=msg)
896 raise
898 def update_existing_entity_attributes(
899 self,
900 entity_id: str,
901 attrs: Union[List[NamedContextAttribute],
902 Dict[str, ContextAttribute],
903 Dict[str, Any]],
904 entity_type: str = None,
905 forcedUpdate: bool = False,
906 override_metadata: bool = False,
907 key_values: bool = False,
908 ):
909 """
910 The entity attributes are updated with the ones in the payload.
911 In addition to that, if one or more attributes in the payload doesn't
912 exist in the entity, an error is returned. This corresponds to a
913 'PATCH' request.
915 Args:
916 entity_id: Entity id to be updated
917 entity_type: Entity type, to avoid ambiguity in case there are
918 several entities with the same entity id.
919 attrs: List of attributes to update or to append
920 forcedUpdate: Update operation have to trigger any matching
921 subscription, no matter if there is an actual attribute
922 update or no instead of the default behavior, which is to
923 updated only if attribute is effectively updated.
924 override_metadata:
925 Bool,replace the existing metadata with the one provided in
926 the request
927 key_values: By default False. If set to True, the payload uses
928 the keyValues simplified entity representation, i.e.
929 ContextEntityKeyValues.
930 Returns:
931 None
933 """
934 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs")
935 headers = self.headers.copy()
936 if entity_type:
937 params = {"type": entity_type}
938 else:
939 params = None
940 entity_type = "dummy"
942 options = []
943 if override_metadata:
944 options.append("overrideMetadata")
945 if forcedUpdate:
946 options.append("forcedUpdate")
947 if key_values:
948 assert isinstance(attrs, dict), "for keyValues the attrs must be dict"
949 payload = attrs
950 options.append("keyValues")
951 else:
952 entity = ContextEntity(id=entity_id, type=entity_type)
953 entity.add_attributes(attrs)
954 payload = entity.model_dump(
955 exclude={"id", "type"},
956 exclude_none=True
957 )
958 if options:
959 params.update({'options': ",".join(options)})
961 try:
962 res = self.patch(
963 url=url,
964 headers=headers,
965 json=payload,
966 params=params,
967 )
968 if res.ok:
969 self.logger.info("Entity '%s' successfully " "updated!", entity_id)
970 else:
971 res.raise_for_status()
972 except requests.RequestException as err:
973 msg = f"Could not update attributes of entity" f" {entity_id} !"
974 self.log_error(err=err, msg=msg)
975 raise
977 def override_entity(self,
978 entity: Union[ContextEntity, ContextEntityKeyValues],
979 **kwargs
980 ):
981 """
982 The request payload is an object representing the attributes to
983 override the existing entity.
985 Note:
986 If you want to manipulate you should rather use patch_entity.
988 Args:
989 entity (ContextEntity or ContextEntityKeyValues):
990 Returns:
991 None
992 """
993 return self.replace_entity_attributes(entity_id=entity.id,
994 entity_type=entity.type,
995 attrs=entity.get_attributes(),
996 **kwargs
997 )
999 def replace_entity_attributes(
1000 self,
1001 entity_id: str,
1002 attrs: Union[List[Union[NamedContextAttribute,
1003 Dict[str, ContextAttribute]]],
1004 Dict],
1005 entity_type: str = None,
1006 forcedUpdate: bool = False,
1007 key_values: bool = False,
1008 ):
1009 """
1010 The attributes previously existing in the entity are removed and
1011 replaced by the ones in the request. This corresponds to a 'PUT'
1012 request.
1014 Args:
1015 entity_id: Entity id to be updated
1016 entity_type: Entity type, to avoid ambiguity in case there are
1017 several entities with the same entity id.
1018 attrs: List of attributes to add to the entity or dict of
1019 attributes in case of key_values=True.
1020 forcedUpdate: Update operation have to trigger any matching
1021 subscription, no matter if there is an actual attribute
1022 update or no instead of the default behavior, which is to
1023 updated only if attribute is effectively updated.
1024 key_values(bool):
1025 By default False. If set to True, "options=keyValues" will
1026 be included in params of the request. The payload uses
1027 the keyValues simplified entity representation, i.e.
1028 ContextEntityKeyValues.
1029 Returns:
1030 None
1031 """
1032 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs")
1033 headers = self.headers.copy()
1034 params = {}
1035 options = []
1036 if entity_type:
1037 params.update({"type": entity_type})
1038 else:
1039 entity_type = "dummy"
1041 if forcedUpdate:
1042 options.append("forcedUpdate")
1044 if key_values:
1045 options.append("keyValues")
1046 assert isinstance(attrs, dict)
1047 else:
1048 entity = ContextEntity(id=entity_id, type=entity_type)
1049 entity.add_attributes(attrs)
1050 attrs = entity.model_dump(
1051 exclude={"id", "type"},
1052 exclude_none=True
1053 )
1054 if options:
1055 params.update({'options': ",".join(options)})
1057 try:
1058 res = self.put(
1059 url=url,
1060 headers=headers,
1061 json=attrs,
1062 params=params,
1063 )
1064 if res.ok:
1065 self.logger.info("Entity '%s' successfully " "updated!", entity_id)
1066 else:
1067 res.raise_for_status()
1068 except requests.RequestException as err:
1069 msg = f"Could not replace attribute of entity {entity_id} !"
1070 self.log_error(err=err, msg=msg)
1071 raise
1073 # Attribute operations
1074 def get_attribute(
1075 self,
1076 entity_id: str,
1077 attr_name: str,
1078 entity_type: str = None,
1079 metadata: str = None,
1080 response_format="",
1081 ) -> ContextAttribute:
1082 """
1083 Retrieves a specified attribute from an entity.
1085 Args:
1086 entity_id: Id of the entity. Example: Bcn_Welt
1087 attr_name: Name of the attribute to be retrieved.
1088 entity_type (Optional): Type of the entity to retrieve
1089 metadata (Optional): A list of metadata names to include in the
1090 response. See "Filtering out attributes and metadata" section
1091 for more detail.
1093 Returns:
1094 The content of the retrieved attribute as ContextAttribute
1096 Raises:
1097 Error
1099 """
1100 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs/{attr_name}")
1101 headers = self.headers.copy()
1102 params = {}
1103 if entity_type:
1104 params.update({"type": entity_type})
1105 if metadata:
1106 params.update({"metadata": ",".join(metadata)})
1107 try:
1108 res = self.get(url=url, params=params, headers=headers)
1109 if res.ok:
1110 self.logger.debug("Received: %s", res.json())
1111 return ContextAttribute(**res.json())
1112 res.raise_for_status()
1113 except requests.RequestException as err:
1114 msg = (
1115 f"Could not load attribute '{attr_name}' from entity" f"'{entity_id}' "
1116 )
1117 self.log_error(err=err, msg=msg)
1118 raise
1120 def update_entity_attribute(self,
1121 entity_id: str,
1122 attr: Union[ContextAttribute,
1123 NamedContextAttribute],
1124 *,
1125 entity_type: str = None,
1126 attr_name: str = None,
1127 override_metadata: bool = True,
1128 forcedUpdate: bool = False):
1129 """
1130 Updates a specified attribute from an entity.
1132 Args:
1133 attr:
1134 context attribute to update
1135 entity_id:
1136 Id of the entity. Example: Bcn_Welt
1137 entity_type:
1138 Entity type, to avoid ambiguity in case there are
1139 several entities with the same entity id.
1140 forcedUpdate: Update operation have to trigger any matching
1141 subscription, no matter if there is an actual attribute
1142 update or no instead of the default behavior, which is to
1143 updated only if attribute is effectively updated.
1144 attr_name:
1145 Name of the attribute to be updated.
1146 override_metadata:
1147 Bool, if set to `True` (default) the metadata will be
1148 overwritten. This is for backwards compatibility reasons.
1149 If `False` the metadata values will be either updated if
1150 already existing or append if not.
1151 See also:
1152 https://fiware-orion.readthedocs.io/en/master/user/metadata.html
1153 """
1154 headers = self.headers.copy()
1155 if not isinstance(attr, NamedContextAttribute):
1156 assert attr_name is not None, (
1157 "Missing name for attribute. "
1158 "attr_name must be present if"
1159 "attr is of type ContextAttribute"
1160 )
1161 else:
1162 assert attr_name is None, (
1163 "Invalid argument attr_name. Do not set "
1164 "attr_name if attr is of type "
1165 "NamedContextAttribute"
1166 )
1167 attr_name = attr.name
1169 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs/{attr_name}")
1170 params = {}
1171 if entity_type:
1172 params.update({"type": entity_type})
1173 # set overrideMetadata option (we assure backwards compatibility here)
1174 options = []
1175 if override_metadata:
1176 options.append("overrideMetadata")
1177 if forcedUpdate:
1178 options.append("forcedUpdate")
1179 if options:
1180 params.update({'options': ",".join(options)})
1181 try:
1182 res = self.put(
1183 url=url,
1184 headers=headers,
1185 params=params,
1186 json=attr.model_dump(
1187 exclude={"name"},
1188 exclude_none=True
1189 ),
1190 )
1191 if res.ok:
1192 self.logger.info(
1193 "Attribute '%s' of '%s' " "successfully updated!",
1194 attr_name,
1195 entity_id,
1196 )
1197 else:
1198 res.raise_for_status()
1199 except requests.RequestException as err:
1200 msg = (
1201 f"Could not update attribute '{attr_name}' of entity" f"'{entity_id}' "
1202 )
1203 self.log_error(err=err, msg=msg)
1204 raise
1206 def delete_entity_attribute(
1207 self, entity_id: str, attr_name: str, entity_type: str = None
1208 ) -> None:
1209 """
1210 Removes a specified attribute from an entity.
1212 Args:
1213 entity_id: Id of the entity.
1214 attr_name: Name of the attribute to be retrieved.
1215 entity_type: Entity type, to avoid ambiguity in case there are
1216 several entities with the same entity id.
1217 Raises:
1218 Error
1220 """
1221 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs/{attr_name}")
1222 headers = self.headers.copy()
1223 params = {}
1224 if entity_type:
1225 params.update({"type": entity_type})
1226 try:
1227 res = self.delete(url=url, headers=headers)
1228 if res.ok:
1229 self.logger.info(
1230 "Attribute '%s' of '%s' " "successfully deleted!",
1231 attr_name,
1232 entity_id,
1233 )
1234 else:
1235 res.raise_for_status()
1236 except requests.RequestException as err:
1237 msg = (
1238 f"Could not delete attribute '{attr_name}' of entity '{entity_id}'"
1239 )
1240 self.log_error(err=err, msg=msg)
1241 raise
1243 # Attribute value operations
1244 def get_attribute_value(
1245 self, entity_id: str, attr_name: str, entity_type: str = None
1246 ) -> Any:
1247 """
1248 This operation returns the value property with the value of the
1249 attribute.
1251 Args:
1252 entity_id: Id of the entity. Example: Bcn_Welt
1253 attr_name: Name of the attribute to be retrieved.
1254 Example: temperature.
1255 entity_type: Entity type, to avoid ambiguity in case there are
1256 several entities with the same entity id.
1258 Returns:
1260 """
1261 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs/{attr_name}/value")
1262 headers = self.headers.copy()
1263 params = {}
1264 if entity_type:
1265 params.update({"type": entity_type})
1266 try:
1267 res = self.get(url=url, params=params, headers=headers)
1268 if res.ok:
1269 self.logger.debug("Received: %s", res.json())
1270 return res.json()
1271 res.raise_for_status()
1272 except requests.RequestException as err:
1273 msg = (
1274 f"Could not load value of attribute '{attr_name}' from "
1275 f"entity'{entity_id}' "
1276 )
1277 self.log_error(err=err, msg=msg)
1278 raise
1280 def update_attribute_value(self, *,
1281 entity_id: str,
1282 attr_name: str,
1283 value: Any,
1284 entity_type: str = None,
1285 forcedUpdate: bool = False
1286 ):
1287 """
1288 Updates the value of a specified attribute of an entity
1290 Args:
1291 value: update value
1292 entity_id: Id of the entity. Example: Bcn_Welt
1293 attr_name: Name of the attribute to be retrieved.
1294 Example: temperature.
1295 entity_type: Entity type, to avoid ambiguity in case there are
1296 several entities with the same entity id.
1297 forcedUpdate: Update operation have to trigger any matching
1298 subscription, no matter if there is an actual attribute
1299 update or no instead of the default behavior, which is to
1300 updated only if attribute is effectively updated.
1301 Returns:
1303 """
1304 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs/{attr_name}/value")
1305 headers = self.headers.copy()
1306 params = {}
1307 if entity_type:
1308 params.update({'type': entity_type})
1309 options = []
1310 if forcedUpdate:
1311 options.append("forcedUpdate")
1312 if options:
1313 params.update({'options': ",".join(options)})
1314 try:
1315 if not isinstance(value, (dict, list)):
1316 headers.update({"Content-Type": "text/plain"})
1317 if isinstance(value, str):
1318 value = f"{value}"
1319 res = self.put(url=url, headers=headers, json=value, params=params)
1320 else:
1321 res = self.put(url=url, headers=headers, json=value, params=params)
1322 if res.ok:
1323 self.logger.info(
1324 "Attribute '%s' of '%s' " "successfully updated!",
1325 attr_name,
1326 entity_id,
1327 )
1328 else:
1329 res.raise_for_status()
1330 except requests.RequestException as err:
1331 msg = (
1332 f"Could not update value of attribute '{attr_name}' from "
1333 f"entity '{entity_id}' "
1334 )
1335 self.log_error(err=err, msg=msg)
1336 raise
1338 # Types Operations
1339 def get_entity_types(
1340 self, *, limit: int = None, offset: int = None, options: str = None
1341 ) -> List[Dict[str, Any]]:
1342 """
1344 Args:
1345 limit: Limit the number of types to be retrieved.
1346 offset: Skip a number of records.
1347 options: Options dictionary. Allowed: count, values
1349 Returns:
1351 """
1352 url = urljoin(self.base_url, "v2/types")
1353 headers = self.headers.copy()
1354 params = {}
1355 if limit:
1356 params.update({"limit": limit})
1357 if offset:
1358 params.update({"offset": offset})
1359 if options:
1360 params.update({"options": options})
1361 try:
1362 res = self.get(url=url, params=params, headers=headers)
1363 if res.ok:
1364 self.logger.debug("Received: %s", res.json())
1365 return res.json()
1366 res.raise_for_status()
1367 except requests.RequestException as err:
1368 msg = "Could not load entity types!"
1369 self.log_error(err=err, msg=msg)
1370 raise
1372 def get_entity_type(self, entity_type: str) -> Dict[str, Any]:
1373 """
1375 Args:
1376 entity_type: Entity Type. Example: Room
1378 Returns:
1380 """
1381 url = urljoin(self.base_url, f"v2/types/{entity_type}")
1382 headers = self.headers.copy()
1383 params = {}
1384 try:
1385 res = self.get(url=url, params=params, headers=headers)
1386 if res.ok:
1387 self.logger.debug("Received: %s", res.json())
1388 return res.json()
1389 res.raise_for_status()
1390 except requests.RequestException as err:
1391 msg = f"Could not load entities of type" f"'{entity_type}' "
1392 self.log_error(err=err, msg=msg)
1393 raise
1395 # SUBSCRIPTION API ENDPOINTS
1396 def get_subscription_list(self, limit: PositiveInt = inf) -> List[Subscription]:
1397 """
1398 Returns a list of all the subscriptions present in the system.
1399 Args:
1400 limit: Limit the number of subscriptions to be retrieved
1401 Returns:
1402 list of subscriptions
1403 """
1404 url = urljoin(self.base_url, "v2/subscriptions/")
1405 headers = self.headers.copy()
1406 params = {}
1408 # We always use the 'count' option to check weather pagination is
1409 # required
1410 params.update({"options": "count"})
1411 try:
1412 items = self.__pagination(
1413 limit=limit, url=url, params=params, headers=headers
1414 )
1415 adapter = TypeAdapter(List[Subscription])
1416 return adapter.validate_python(items)
1417 except requests.RequestException as err:
1418 msg = "Could not load subscriptions!"
1419 self.log_error(err=err, msg=msg)
1420 raise
1422 def post_subscription(
1423 self,
1424 subscription: Subscription,
1425 update: bool = False,
1426 skip_initial_notification: bool = False,
1427 ) -> str:
1428 """
1429 Creates a new subscription. The subscription is represented by a
1430 Subscription object defined in filip.cb.models.
1432 If the subscription already exists, the adding is prevented and the id
1433 of the existing subscription is returned.
1435 A subscription is deemed as already existing if there exists a
1436 subscription with the exact same subject and notification fields. All
1437 optional fields are not considered.
1439 Args:
1440 subscription: Subscription
1441 update: True - If the subscription already exists, update it
1442 False- If the subscription already exists, throw warning
1443 skip_initial_notification: True - Initial Notifications will be
1444 sent to recipient containing the whole data. This is
1445 deprecated and removed from version 3.0 of the context broker.
1446 False - skip the initial notification
1447 Returns:
1448 str: Id of the (created) subscription
1450 """
1451 existing_subscriptions = self.get_subscription_list()
1453 sub_dict = subscription.model_dump(include={'subject',
1454 'notification'})
1455 for ex_sub in existing_subscriptions:
1456 if self._subscription_dicts_are_equal(
1457 sub_dict,
1458 ex_sub.model_dump(include={'subject', 'notification'})
1459 ):
1460 self.logger.info("Subscription already exists")
1461 if update:
1462 self.logger.info("Updated subscription")
1463 subscription.id = ex_sub.id
1464 self.update_subscription(subscription)
1465 else:
1466 warnings.warn(
1467 f"Subscription existed already with the id" f" {ex_sub.id}"
1468 )
1469 return ex_sub.id
1471 params = {}
1472 if skip_initial_notification:
1473 version = self.get_version()["orion"]["version"]
1474 if parse_version(version) <= parse_version("3.1"):
1475 params.update({"options": "skipInitialNotification"})
1476 else:
1477 pass
1478 warnings.warn(
1479 f"Skip initial notifications is a deprecated "
1480 f"feature of older versions <=3.1 of the context "
1481 f"broker. The Context Broker that you requesting has "
1482 f"version: {version}. For newer versions we "
1483 f"automatically skip this option. Consider "
1484 f"refactoring and updating your services",
1485 DeprecationWarning,
1486 )
1488 url = urljoin(self.base_url, "v2/subscriptions")
1489 headers = self.headers.copy()
1490 headers.update({"Content-Type": "application/json"})
1491 try:
1492 res = self.post(
1493 url=url,
1494 headers=headers,
1495 data=subscription.model_dump_json(exclude={"id"}, exclude_none=True),
1496 params=params,
1497 )
1498 if res.ok:
1499 self.logger.info("Subscription successfully created!")
1500 return res.headers["Location"].split("/")[-1]
1501 res.raise_for_status()
1502 except requests.RequestException as err:
1503 msg = "Could not send subscription!"
1504 self.log_error(err=err, msg=msg)
1505 raise
1507 def get_subscription(self, subscription_id: str) -> Subscription:
1508 """
1509 Retrieves a subscription from
1510 Args:
1511 subscription_id: id of the subscription
1513 Returns:
1515 """
1516 url = urljoin(self.base_url, f"v2/subscriptions/{subscription_id}")
1517 headers = self.headers.copy()
1518 try:
1519 res = self.get(url=url, headers=headers)
1520 if res.ok:
1521 self.logger.debug("Received: %s", res.json())
1522 return Subscription(**res.json())
1523 res.raise_for_status()
1524 except requests.RequestException as err:
1525 msg = f"Could not load subscription {subscription_id}"
1526 self.log_error(err=err, msg=msg)
1527 raise
1529 def update_subscription(
1530 self, subscription: Subscription, skip_initial_notification: bool = False
1531 ):
1532 """
1533 Only the fields included in the request are updated in the subscription.
1535 Args:
1536 subscription: Subscription to update
1537 skip_initial_notification: True - Initial Notifications will be
1538 sent to recipient containing the whole data. This is
1539 deprecated and removed from version 3.0 of the context broker.
1540 False - skip the initial notification
1542 Returns:
1543 None
1544 """
1545 params = {}
1546 if skip_initial_notification:
1547 version = self.get_version()["orion"]["version"]
1548 if parse_version(version) <= parse_version("3.1"):
1549 params.update({"options": "skipInitialNotification"})
1550 else:
1551 pass
1552 warnings.warn(
1553 f"Skip initial notifications is a deprecated "
1554 f"feature of older versions <3.1 of the context "
1555 f"broker. The Context Broker that you requesting has "
1556 f"version: {version}. For newer versions we "
1557 f"automatically skip this option. Consider "
1558 f"refactoring and updating your services",
1559 DeprecationWarning,
1560 )
1562 url = urljoin(self.base_url, f"v2/subscriptions/{subscription.id}")
1563 headers = self.headers.copy()
1564 headers.update({"Content-Type": "application/json"})
1565 try:
1566 res = self.patch(
1567 url=url,
1568 headers=headers,
1569 data=subscription.model_dump_json(
1570 exclude={"id"},
1571 exclude_none=True
1572 ),
1573 )
1574 if res.ok:
1575 self.logger.info("Subscription successfully updated!")
1576 else:
1577 res.raise_for_status()
1578 except requests.RequestException as err:
1579 msg = f"Could not update subscription {subscription.id}"
1580 self.log_error(err=err, msg=msg)
1581 raise
1583 def delete_subscription(self, subscription_id: str) -> None:
1584 """
1585 Deletes a subscription from a Context Broker
1586 Args:
1587 subscription_id: id of the subscription
1588 """
1589 url = urljoin(self.base_url, f"v2/subscriptions/{subscription_id}")
1590 headers = self.headers.copy()
1591 try:
1592 res = self.delete(url=url, headers=headers)
1593 if res.ok:
1594 self.logger.info(
1595 f"Subscription '{subscription_id}' " f"successfully deleted!"
1596 )
1597 else:
1598 res.raise_for_status()
1599 except requests.RequestException as err:
1600 msg = f"Could not delete subscription {subscription_id}"
1601 self.log_error(err=err, msg=msg)
1602 raise
1604 # Registration API
1605 def get_registration_list(self, *, limit: PositiveInt = None) -> List[Registration]:
1606 """
1607 Lists all the context provider registrations present in the system.
1609 Args:
1610 limit: Limit the number of registrations to be retrieved
1611 Returns:
1613 """
1614 url = urljoin(self.base_url, "v2/registrations/")
1615 headers = self.headers.copy()
1616 params = {}
1618 # We always use the 'count' option to check weather pagination is
1619 # required
1620 params.update({"options": "count"})
1621 try:
1622 items = self.__pagination(
1623 limit=limit, url=url, params=params, headers=headers
1624 )
1625 adapter = TypeAdapter(List[Registration])
1626 return adapter.validate_python(items)
1627 except requests.RequestException as err:
1628 msg = "Could not load registrations!"
1629 self.log_error(err=err, msg=msg)
1630 raise
1632 def post_registration(self, registration: Registration):
1633 """
1634 Creates a new context provider registration. This is typically used
1635 for binding context sources as providers of certain data. The
1636 registration is represented by cb.models.Registration
1638 Args:
1639 registration (Registration):
1641 Returns:
1643 """
1644 url = urljoin(self.base_url, "v2/registrations")
1645 headers = self.headers.copy()
1646 headers.update({"Content-Type": "application/json"})
1647 try:
1648 res = self.post(
1649 url=url,
1650 headers=headers,
1651 data=registration.model_dump_json(exclude={"id"}, exclude_none=True),
1652 )
1653 if res.ok:
1654 self.logger.info("Registration successfully created!")
1655 return res.headers["Location"].split("/")[-1]
1656 res.raise_for_status()
1657 except requests.RequestException as err:
1658 msg = f"Could not send registration {registration.id}!"
1659 self.log_error(err=err, msg=msg)
1660 raise
1662 def get_registration(self, registration_id: str) -> Registration:
1663 """
1664 Retrieves a registration from context broker by id
1666 Args:
1667 registration_id: id of the registration
1669 Returns:
1670 Registration
1671 """
1672 url = urljoin(self.base_url, f"v2/registrations/{registration_id}")
1673 headers = self.headers.copy()
1674 try:
1675 res = self.get(url=url, headers=headers)
1676 if res.ok:
1677 self.logger.debug("Received: %s", res.json())
1678 return Registration(**res.json())
1679 res.raise_for_status()
1680 except requests.RequestException as err:
1681 msg = f"Could not load registration {registration_id} !"
1682 self.log_error(err=err, msg=msg)
1683 raise
1685 def update_registration(self, registration: Registration):
1686 """
1687 Only the fields included in the request are updated in the registration.
1689 Args:
1690 registration: Registration to update
1691 Returns:
1693 """
1694 url = urljoin(self.base_url, f"v2/registrations/{registration.id}")
1695 headers = self.headers.copy()
1696 headers.update({"Content-Type": "application/json"})
1697 try:
1698 res = self.patch(
1699 url=url,
1700 headers=headers,
1701 data=registration.model_dump_json(
1702 exclude={"id"},
1703 exclude_none=True
1704 ),
1705 )
1706 if res.ok:
1707 self.logger.info("Registration successfully updated!")
1708 else:
1709 res.raise_for_status()
1710 except requests.RequestException as err:
1711 msg = f"Could not update registration {registration.id} !"
1712 self.log_error(err=err, msg=msg)
1713 raise
1715 def delete_registration(self, registration_id: str) -> None:
1716 """
1717 Deletes a subscription from a Context Broker
1718 Args:
1719 registration_id: id of the subscription
1720 """
1721 url = urljoin(self.base_url, f"v2/registrations/{registration_id}")
1722 headers = self.headers.copy()
1723 try:
1724 res = self.delete(url=url, headers=headers)
1725 if res.ok:
1726 self.logger.info(
1727 "Registration '%s' " "successfully deleted!", registration_id
1728 )
1729 res.raise_for_status()
1730 except requests.RequestException as err:
1731 msg = f"Could not delete registration {registration_id} !"
1732 self.log_error(err=err, msg=msg)
1733 raise
1735 # Batch operation API
1736 def update(self,
1737 *,
1738 entities: List[Union[ContextEntity, ContextEntityKeyValues]],
1739 action_type: Union[ActionType, str],
1740 update_format: str = None,
1741 forcedUpdate: bool = False,
1742 override_metadata: bool = False,
1743 ) -> None:
1744 """
1745 This operation allows to create, update and/or delete several entities
1746 in a single batch operation.
1748 This operation is split in as many individual operations as entities
1749 in the entities vector, so the actionType is executed for each one of
1750 them. Depending on the actionType, a mapping with regular non-batch
1751 operations can be done:
1753 append: maps to POST /v2/entities (if the entity does not already exist)
1754 or POST /v2/entities/<id>/attrs (if the entity already exists).
1756 appendStrict: maps to POST /v2/entities (if the entity does not
1757 already exist) or POST /v2/entities/<id>/attrs?options=append (if the
1758 entity already exists).
1760 update: maps to PATCH /v2/entities/<id>/attrs.
1762 delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every
1763 attribute included in the entity or to DELETE /v2/entities/<id> if
1764 no attribute were included in the entity.
1766 replace: maps to PUT /v2/entities/<id>/attrs.
1768 Args:
1769 entities: "an array of entities, each entity specified using the "
1770 "JSON entity representation format "
1771 action_type (Update): "actionType, to specify the kind of update
1772 action to do: either append, appendStrict, update, delete,
1773 or replace. "
1774 update_format (str): Optional 'keyValues'
1775 forcedUpdate: Update operation have to trigger any matching
1776 subscription, no matter if there is an actual attribute
1777 update or no instead of the default behavior, which is to
1778 updated only if attribute is effectively updated.
1779 override_metadata:
1780 Bool, replace the existing metadata with the one provided in
1781 the request
1782 Returns:
1784 """
1786 url = urljoin(self.base_url, "v2/op/update")
1787 headers = self.headers.copy()
1788 headers.update({"Content-Type": "application/json"})
1789 params = {}
1790 options = []
1791 if override_metadata:
1792 options.append("overrideMetadata")
1793 if forcedUpdate:
1794 options.append("forcedUpdate")
1795 if update_format:
1796 assert (
1797 update_format == "keyValues"
1798 ), "Only 'keyValues' is allowed as update format"
1799 options.append("keyValues")
1800 if options:
1801 params.update({'options': ",".join(options)})
1802 update = Update(actionType=action_type, entities=entities)
1803 try:
1804 res = self.post(
1805 url=url,
1806 headers=headers,
1807 params=params,
1808 json=update.model_dump(by_alias=True),
1809 )
1810 if res.ok:
1811 self.logger.info("Update operation '%s' succeeded!", action_type)
1812 else:
1813 res.raise_for_status()
1814 except requests.RequestException as err:
1815 msg = f"Update operation '{action_type}' failed!"
1816 self.log_error(err=err, msg=msg)
1817 raise
1819 def query(
1820 self,
1821 *,
1822 query: Query,
1823 limit: PositiveInt = None,
1824 order_by: str = None,
1825 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
1826 ) -> List[Any]:
1827 """
1828 Generate api query
1829 Args:
1830 query (Query):
1831 limit (PositiveInt):
1832 order_by (str):
1833 response_format (AttrsFormat, str):
1834 Returns:
1835 The response payload is an Array containing one object per matching
1836 entity, or an empty array [] if no entities are found. The entities
1837 follow the JSON entity representation format (described in the
1838 section "JSON Entity Representation").
1839 """
1840 url = urljoin(self.base_url, "v2/op/query")
1841 headers = self.headers.copy()
1842 headers.update({"Content-Type": "application/json"})
1843 params = {"options": "count"}
1845 if response_format:
1846 if response_format not in list(AttrsFormat):
1847 raise ValueError(f"Value must be in {list(AttrsFormat)}")
1848 params["options"] = ",".join([response_format, "count"])
1849 try:
1850 items = self.__pagination(
1851 method=PaginationMethod.POST,
1852 url=url,
1853 headers=headers,
1854 params=params,
1855 data=query.model_dump_json(exclude_none=True),
1856 limit=limit,
1857 )
1858 if response_format == AttrsFormat.NORMALIZED:
1859 adapter = TypeAdapter(List[ContextEntity])
1860 return adapter.validate_python(items)
1861 if response_format == AttrsFormat.KEY_VALUES:
1862 adapter = TypeAdapter(List[ContextEntityKeyValues])
1863 return adapter.validate_python(items)
1864 return items
1865 except requests.RequestException as err:
1866 msg = "Query operation failed!"
1867 self.log_error(err=err, msg=msg)
1868 raise
1870 def notify(self, message: Message) -> None:
1871 """
1872 This operation is intended to consume a notification payload so that
1873 all the entity data included by such notification is persisted,
1874 overwriting if necessary. This operation is useful when one NGSIv2
1875 endpoint is subscribed to another NGSIv2 endpoint (federation
1876 scenarios). The request payload must be an NGSIv2 notification
1877 payload. The behaviour must be exactly the same as 'update'
1878 with 'action_type' equal to append.
1880 Args:
1881 message: Notification message
1883 Returns:
1884 None
1885 """
1886 url = urljoin(self.base_url, "v2/op/notify")
1887 headers = self.headers.copy()
1888 headers.update({"Content-Type": "application/json"})
1889 params = {}
1890 try:
1891 res = self.post(
1892 url=url,
1893 headers=headers,
1894 params=params,
1895 data=message.model_dump_json(by_alias=True),
1896 )
1897 if res.ok:
1898 self.logger.info("Notification message sent!")
1899 else:
1900 res.raise_for_status()
1901 except requests.RequestException as err:
1902 msg = (
1903 f"Sending notifcation message failed! \n "
1904 f"{message.model_dump_json(indent=2)}"
1905 )
1906 self.log_error(err=err, msg=msg)
1907 raise
1909 def post_command(
1910 self,
1911 *,
1912 entity_id: str,
1913 command: Union[Command, NamedCommand, Dict],
1914 entity_type: str = None,
1915 command_name: str = None,
1916 ) -> None:
1917 """
1918 Post a command to a context entity this corresponds to 'PATCH' of the
1919 specified command attribute.
1921 Args:
1922 entity_id: Entity identifier
1923 command: Command
1924 entity_type: Entity type
1925 command_name: Name of the command in the entity
1927 Returns:
1928 None
1929 """
1930 if command_name:
1931 assert isinstance(command, (Command, dict))
1932 if isinstance(command, dict):
1933 command = Command(**command)
1934 command = {command_name: command.model_dump()}
1935 else:
1936 assert isinstance(command, (NamedCommand, dict))
1937 if isinstance(command, dict):
1938 command = NamedCommand(**command)
1940 self.update_existing_entity_attributes(
1941 entity_id=entity_id, entity_type=entity_type, attrs=[command]
1942 )
1944 def does_entity_exist(self, entity_id: str, entity_type: str) -> bool:
1945 """
1946 Test if an entity with given id and type is present in the CB
1948 Args:
1949 entity_id: Entity id
1950 entity_type: Entity type
1952 Returns:
1953 bool; True if entity exists
1955 Raises:
1956 RequestException, if any error occurs (e.g: No Connection),
1957 except that the entity is not found
1958 """
1959 url = urljoin(self.base_url, f"v2/entities/{entity_id}")
1960 headers = self.headers.copy()
1961 params = {"type": entity_type}
1963 try:
1964 res = self.get(url=url, params=params, headers=headers)
1965 if res.ok:
1966 return True
1967 res.raise_for_status()
1968 except requests.RequestException as err:
1969 if err.response is None or not err.response.status_code == 404:
1970 raise
1971 return False
1973 def patch_entity(self,
1974 entity: ContextEntity,
1975 old_entity: Optional[ContextEntity] = None,
1976 override_attr_metadata: bool = True) -> None:
1977 """
1978 Takes a given entity and updates the state in the CB to match it.
1979 It is an extended equivalent to the HTTP method PATCH, which applies
1980 partial modifications to a resource.
1982 Args:
1983 entity: Entity to update
1984 old_entity: OPTIONAL, if given only the differences between the
1985 old_entity and entity are updated in the CB.
1986 Other changes made to the entity in CB, can be kept.
1987 If type or id was changed, the old_entity will be
1988 deleted.
1989 override_attr_metadata:
1990 Whether to override or append the attributes metadata.
1991 `True` for overwrite or `False` for update/append
1993 Returns:
1994 None
1995 """
1997 new_entity = entity
1999 if old_entity is None:
2000 # If no old entity_was provided we use the current state to compare
2001 # the entity to
2002 if self.does_entity_exist(
2003 entity_id=new_entity.id, entity_type=new_entity.type
2004 ):
2005 old_entity = self.get_entity(
2006 entity_id=new_entity.id, entity_type=new_entity.type
2007 )
2008 else:
2009 # the entity is new, post and finish
2010 self.post_entity(new_entity, update=False)
2011 return
2013 else:
2014 # An old_entity was provided
2015 # check if the old_entity (still) exists else recall methode
2016 # and discard old_entity
2017 if not self.does_entity_exist(
2018 entity_id=old_entity.id, entity_type=old_entity.type
2019 ):
2020 self.patch_entity(
2021 new_entity, override_attr_metadata=override_attr_metadata
2022 )
2023 return
2025 # if type or id was changed, the old_entity needs to be deleted
2026 # and the new_entity created
2027 # In this case we will lose the current state of the entity
2028 if old_entity.id != new_entity.id or old_entity.type != new_entity.type:
2029 self.delete_entity(entity_id=old_entity.id, entity_type=old_entity.type)
2031 if not self.does_entity_exist(
2032 entity_id=new_entity.id, entity_type=new_entity.type
2033 ):
2034 self.post_entity(entity=new_entity, update=False)
2035 return
2037 # At this point we know that we need to patch only the attributes of
2038 # the entity
2039 # Check the differences between the attributes of old and new entity
2040 # Delete the removed attributes, create the new ones,
2041 # and update the existing if necessary
2042 old_attributes = old_entity.get_attributes()
2043 new_attributes = new_entity.get_attributes()
2045 # Manage attributes that existed before
2046 for old_attr in old_attributes:
2047 # commands do not exist in the ContextEntity and are only
2048 # registrations to the corresponding device. Operations as
2049 # delete will fail as it does not technically exist
2050 corresponding_new_attr = None
2051 for new_attr in new_attributes:
2052 if new_attr.name == old_attr.name:
2053 corresponding_new_attr = new_attr
2055 if corresponding_new_attr is None:
2056 # Attribute no longer exists, delete it
2057 try:
2058 self.delete_entity_attribute(
2059 entity_id=new_entity.id,
2060 entity_type=new_entity.type,
2061 attr_name=old_attr.name,
2062 )
2063 except requests.RequestException as err:
2064 # if the attribute is provided by a registration the
2065 # deletion will fail
2066 if not err.response.status_code == 404:
2067 raise
2068 else:
2069 # Check if attributed changed in any way, if yes update
2070 # else do nothing and keep current state
2071 if old_attr != corresponding_new_attr:
2072 try:
2073 self.update_entity_attribute(
2074 entity_id=new_entity.id,
2075 entity_type=new_entity.type,
2076 attr=corresponding_new_attr,
2077 override_metadata=override_attr_metadata,
2078 )
2079 except requests.RequestException as err:
2080 # if the attribute is provided by a registration the
2081 # update will fail
2082 if not err.response.status_code == 404:
2083 raise
2085 # Create new attributes
2086 update_entity = ContextEntity(id=entity.id, type=entity.type)
2087 update_needed = False
2088 for new_attr in new_attributes:
2089 # commands do not exist in the ContextEntity and are only
2090 # registrations to the corresponding device. Operations as
2091 # delete will fail as it does not technically exists
2092 attr_existed = False
2093 for old_attr in old_attributes:
2094 if new_attr.name == old_attr.name:
2095 attr_existed = True
2097 if not attr_existed:
2098 update_needed = True
2099 update_entity.add_attributes([new_attr])
2101 if update_needed:
2102 self.update_entity(update_entity)
2104 def _subscription_dicts_are_equal(self, first: dict, second: dict):
2105 """
2106 Check if two dictionaries and all sub-dictionaries are equal.
2107 Logs a warning if the keys are not equal, but ignores the
2108 comparison of such keys.
2110 Args:
2111 first dict: Dictionary of first subscription
2112 second dict: Dictionary of second subscription
2114 Returns:
2115 True if equal, else False
2116 """
2118 def _value_is_not_none(value):
2119 """
2120 Recursive function to check if a value equals none.
2121 If the value is a dict and any value of the dict is not none,
2122 the value is not none.
2123 If the value is a list and any item is not none, the value is not none.
2124 If it's neither dict nore list, bool is used.
2125 """
2126 if isinstance(value, dict):
2127 return any([_value_is_not_none(value=_v)
2128 for _v in value.values()])
2129 if isinstance(value, list):
2130 return any([_value_is_not_none(value=_v)for _v in value])
2131 else:
2132 return bool(value)
2133 if first.keys() != second.keys():
2134 warnings.warn(
2135 "Subscriptions contain a different set of fields. "
2136 "Only comparing to new fields of the new one."
2137 )
2138 for k, v in first.items():
2139 ex_value = second.get(k, None)
2140 if isinstance(v, dict) and isinstance(ex_value, dict):
2141 equal = self._subscription_dicts_are_equal(v, ex_value)
2142 if equal:
2143 continue
2144 else:
2145 return False
2146 if v != ex_value:
2147 self.logger.debug(f"Not equal fields for key {k}: ({v}, {ex_value})")
2148 if not _value_is_not_none(v) and not _value_is_not_none(ex_value) or k == "timesSent":
2149 continue
2150 return False
2151 return True
2154#
2155#
2156# def check_duplicate_subscription(self, subscription_body, limit: int = 20):
2157# """
2158# Function compares the subject of the subscription body, on whether a subscription
2159# already exists for a device / entity.
2160# :param subscription_body: the body of the new subscripton
2161# :param limit: pagination parameter, to set the number of
2162# subscriptions bodies the get request should grab
2163# :return: exists, boolean -> True, if such a subscription allready
2164# exists
2165# """
2166# exists = False
2167# subscription_subject = json.loads(subscription_body)["subject"]
2168# # Exact keys depend on subscription body
2169# try:
2170# subscription_url = json.loads(subscription_body)[
2171# "notification"]["httpCustom"]["url"]
2172# except KeyError:
2173# subscription_url = json.loads(subscription_body)[
2174# "notification"]["http"]["url"]
2175#
2176# # If the number of subscriptions is larger then the limit,
2177# paginations methods have to be used
2178# url = self.url + '/v2/subscriptions?limit=' + str(limit) +
2179# '&options=count'
2180# response = self.session.get(url, headers=self.get_header())
2181#
2182# sub_count = float(response.headers["Fiware-Total-Count"])
2183# response = json.loads(response.text)
2184# if sub_count >= limit:
2185# response = self.get_pagination(url=url, headers=self.get_header(),
2186# limit=limit, count=sub_count)
2187# response = json.loads(response)
2188#
2189# for existing_subscription in response:
2190# # check whether the exact same subscriptions already exists
2191# if existing_subscription["subject"] == subscription_subject:
2192# exists = True
2193# break
2194# try:
2195# existing_url = existing_subscription["notification"][
2196# "http"]["url"]
2197# except KeyError:
2198# existing_url = existing_subscription["notification"][
2199# "httpCustom"]["url"]
2200# # check whether both subscriptions notify to the same path
2201# if existing_url != subscription_url:
2202# continue
2203# else:
2204# # iterate over all entities included in the subscription object
2205# for entity in subscription_subject["entities"]:
2206# if 'type' in entity.keys():
2207# subscription_type = entity['type']
2208# else:
2209# subscription_type = entity['typePattern']
2210# if 'id' in entity.keys():
2211# subscription_id = entity['id']
2212# else:
2213# subscription_id = entity["idPattern"]
2214# # iterate over all entities included in the exisiting
2215# subscriptions
2216# for existing_entity in existing_subscription["subject"][
2217# "entities"]:
2218# if "type" in entity.keys():
2219# type_existing = entity["type"]
2220# else:
2221# type_existing = entity["typePattern"]
2222# if "id" in entity.keys():
2223# id_existing = entity["id"]
2224# else:
2225# id_existing = entity["idPattern"]
2226# # as the ID field is non optional, it has to match
2227# # check whether the type match
2228# # if the type field is empty, they match all types
2229# if (type_existing == subscription_type) or\
2230# ('*' in subscription_type) or \
2231# ('*' in type_existing)\
2232# or (type_existing == "") or (
2233# subscription_type == ""):
2234# # check if on of the subscriptions is a pattern,
2235# or if they both refer to the same id
2236# # Get the attrs first, to avoid code duplication
2237# # last thing to compare is the attributes
2238# # Assumption -> position is the same as the
2239# entities _list
2240# # i == j
2241# i = subscription_subject["entities"].index(entity)
2242# j = existing_subscription["subject"][
2243# "entities"].index(existing_entity)
2244# try:
2245# subscription_attrs = subscription_subject[
2246# "condition"]["attrs"][i]
2247# except (KeyError, IndexError):
2248# subscription_attrs = []
2249# try:
2250# existing_attrs = existing_subscription[
2251# "subject"]["condition"]["attrs"][j]
2252# except (KeyError, IndexError):
2253# existing_attrs = []
2254#
2255# if (".*" in subscription_id) or ('.*' in
2256# id_existing) or (subscription_id == id_existing):
2257# # Attributes have to match, or the have to
2258# be an empty array
2259# if (subscription_attrs == existing_attrs) or
2260# (subscription_attrs == []) or (existing_attrs == []):
2261# exists = True
2262# # if they do not match completely or subscribe
2263# to all ids they have to match up to a certain position
2264# elif ("*" in subscription_id) or ('*' in
2265# id_existing):
2266# regex_existing = id_existing.find('*')
2267# regex_subscription =
2268# subscription_id.find('*')
2269# # slice the strings to compare
2270# if (id_existing[:regex_existing] in
2271# subscription_id) or (subscription_id[:regex_subscription] in id_existing) or \
2272# (id_existing[regex_existing:] in
2273# subscription_id) or (subscription_id[regex_subscription:] in id_existing):
2274# if (subscription_attrs ==
2275# existing_attrs) or (subscription_attrs == []) or (existing_attrs == []):
2276# exists = True
2277# else:
2278# continue
2279# else:
2280# continue
2281# else:
2282# continue
2283# else:
2284# continue
2285# else:
2286# continue
2287# return exists
2288#