"""
Context Broker Module for API Client
"""
from __future__ import annotations
import copy
from copy import deepcopy
from enum import Enum
from math import inf
from pkg_resources import parse_version
from pydantic import PositiveInt, PositiveFloat, AnyHttpUrl, ValidationError
from pydantic.type_adapter import TypeAdapter
from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union
import re
import requests
from urllib.parse import urljoin
import warnings
from filip.clients.base_http_client import BaseHttpClient, NgsiURLVersion
from filip.config import settings
from filip.models.base import FiwareHeader, PaginationMethod, DataType
from filip.utils.simple_ql import QueryString
from filip.models.ngsi_v2.context import (
ActionType,
Command,
ContextEntity,
ContextEntityKeyValues,
ContextAttribute,
NamedCommand,
NamedContextAttribute,
Query,
Update,
PropertyFormat,
ContextEntityList,
ContextEntityKeyValuesList,
)
from filip.models.ngsi_v2.base import AttrsFormat
from filip.models.ngsi_v2.subscriptions import Subscription, Message
from filip.models.ngsi_v2.registrations import Registration
if TYPE_CHECKING:
from filip.clients.ngsi_v2.iota import IoTAClient
[docs]class ContextBrokerClient(BaseHttpClient):
"""
Implementation of NGSI Context Broker functionalities, such as creating
entities and subscriptions; retrieving, updating and deleting data.
Further documentation:
https://fiware-orion.readthedocs.io/en/master/
Api specifications for v2 are located here:
https://telefonicaid.github.io/fiware-orion/api/v2/stable/
Note:
We use the reference implementation for development. Therefore, some
other brokers may show slightly different behavior!
"""
def __init__(
self,
url: str = None,
*,
session: requests.Session = None,
fiware_header: FiwareHeader = None,
**kwargs,
):
"""
Args:
url: Url of context broker server
session (requests.Session):
fiware_header (FiwareHeader): fiware service and fiware service path
**kwargs (Optional): Optional arguments that ``request`` takes.
"""
# set service url
url = url or settings.CB_URL
self._url_version = NgsiURLVersion.v2_url.value
super().__init__(
url=url, session=session, fiware_header=fiware_header, **kwargs
)
def __pagination(
self,
*,
method: PaginationMethod = PaginationMethod.GET,
url: str,
headers: Dict,
limit: Union[PositiveInt, PositiveFloat] = None,
params: Dict = None,
data: str = None,
) -> List[Dict]:
"""
NGSIv2 implements a pagination mechanism in order to help clients to
retrieve large sets of resources. This mechanism works for all listing
operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions,
POST /v2/op/query, etc.). This function helps getting datasets that are
larger than the limit for the different GET operations.
https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html
Args:
url: Information about the url, obtained from the original function
headers: The headers from the original function
params:
limit:
Returns:
object:
"""
if limit is None:
limit = inf
if limit > 1000:
params["limit"] = 1000 # maximum items per request
else:
params["limit"] = limit
if self.session:
session = self.session
else:
session = requests.Session()
with session:
res = session.request(
method=method, url=url, params=params, headers=headers, data=data
)
if res.ok:
items = res.json()
# do pagination
count = int(res.headers["Fiware-Total-Count"])
while len(items) < limit and len(items) < count:
# Establishing the offset from where entities are retrieved
params["offset"] = len(items)
params["limit"] = min(1000, (limit - len(items)))
res = session.request(
method=method,
url=url,
params=params,
headers=headers,
data=data,
)
if res.ok:
items.extend(res.json())
else:
res.raise_for_status()
self.logger.debug("Received: %s", items)
return items
res.raise_for_status()
# MANAGEMENT API
[docs] def get_version(self) -> Dict:
"""
Gets version of IoT Agent
Returns:
Dictionary with response
"""
url = urljoin(self.base_url, "version")
try:
res = self.get(url=url, headers=self.headers)
if res.ok:
return res.json()
res.raise_for_status()
except requests.RequestException as err:
self.logger.error(err)
raise
[docs] def get_resources(self) -> Dict:
"""
Gets reo
Returns:
Dict
"""
url = urljoin(self.base_url, self._url_version)
try:
res = self.get(url=url, headers=self.headers)
if res.ok:
return res.json()
res.raise_for_status()
except requests.RequestException as err:
self.logger.error(err)
raise
# STATISTICS API
[docs] def get_statistics(self) -> Dict:
"""
Gets statistics of context broker
Returns:
Dictionary with response
"""
url = urljoin(self.base_url, "statistics")
try:
res = self.get(url=url, headers=self.headers)
if res.ok:
return res.json()
res.raise_for_status()
except requests.RequestException as err:
self.logger.error(err)
raise
# CONTEXT MANAGEMENT API ENDPOINTS
# Entity Operations
[docs] def post_entity(
self,
entity: Union[ContextEntity, ContextEntityKeyValues],
update: bool = False,
patch: bool = False,
override_attr_metadata: bool = True,
key_values: bool = False,
):
"""
Function registers an Object with the NGSI Context Broker,
if it already exists it can be automatically updated (overwritten)
if the update bool is True.
First a post request with the entity is tried, if the response code
is 422 the entity is uncrossable, as it already exists there are two
options, either overwrite it, if the attribute have changed
(e.g. at least one new/new values) (update = True) or leave
it the way it is (update=False)
If you only want to manipulate the entities values, you need to set
patch argument.
Args:
entity (ContextEntity/ContextEntityKeyValues):
Context Entity Object
update (bool):
If the response.status_code is 422, whether the override and
existing entity
patch (bool):
If the response.status_code is 422, whether to manipulate the
existing entity. Omitted if update `True`.
override_attr_metadata:
Only applies for patch equal to `True`.
Whether to override or append the attribute's metadata.
`True` for overwrite or `False` for update/append
key_values(bool):
By default False. If set to True, "options=keyValues" will
be included in params of post request. The payload uses
the keyValues simplified entity representation, i.e.
ContextEntityKeyValues.
"""
url = urljoin(self.base_url, f"{self._url_version}/entities")
headers = self.headers.copy()
params = {}
options = []
if key_values:
assert isinstance(entity, ContextEntityKeyValues)
options.append("keyValues")
else:
assert isinstance(entity, ContextEntity)
if options:
params.update({"options": ",".join(options)})
try:
res = self.post(
url=url,
headers=headers,
json=entity.model_dump(exclude_none=True),
params=params,
)
if res.ok:
self.logger.info("Entity successfully posted!")
return res.headers.get("Location")
res.raise_for_status()
except requests.RequestException as err:
if err.response is not None:
if update and err.response.status_code == 422:
return self.override_entity(entity=entity, key_values=key_values)
if patch and err.response.status_code == 422:
if not key_values:
return self.patch_entity(
entity=entity, override_attr_metadata=override_attr_metadata
)
else:
return self._patch_entity_key_values(entity=entity)
msg = f"Could not post entity {entity.id}"
self.log_error(err=err, msg=msg)
raise
[docs] def get_entity_list(
self,
*,
entity_ids: List[str] = None,
entity_types: List[str] = None,
id_pattern: str = None,
type_pattern: str = None,
q: Union[str, QueryString] = None,
mq: Union[str, QueryString] = None,
georel: str = None,
geometry: str = None,
coords: str = None,
limit: PositiveInt = inf,
attrs: List[str] = None,
metadata: str = None,
order_by: str = None,
response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
) -> List[Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]]:
r"""
Retrieves a list of context entities that match different criteria by
id, type, pattern matching (either id or type) and/or those which
match a query or geographical query (see Simple Query Language and
Geographical Queries). A given entity has to match all the criteria
to be retrieved (i.e., the criteria is combined in a logical AND
way). Note that pattern matching query parameters are incompatible
(i.e. mutually exclusive) with their corresponding exact matching
parameters, i.e. idPattern with id and typePattern with type.
Args:
entity_ids: A comma-separated list of elements. Retrieve entities
whose ID matches one of the elements in the list.
Incompatible with idPattern,e.g. Boe_Idarium
entity_types: comma-separated list of elements. Retrieve entities
whose type matches one of the elements in the list.
Incompatible with typePattern. Example: Room.
id_pattern: A correctly formatted regular expression. Retrieve
entities whose ID matches the regular expression. Incompatible
with id, e.g. ngsi-ld.* or sensor.*
type_pattern: A correctly formatted regular expression. Retrieve
entities whose type matches the regular expression.
Incompatible with type, e.g. room.*
q (SimpleQuery): A query expression, composed of a list of
statements separated by ;, i.e.,
q=statement1;statement2;statement3. See Simple Query
Language specification. Example: temperature>40.
mq (SimpleQuery): A query expression for attribute metadata,
composed of a list of statements separated by ;, i.e.,
mq=statement1;statement2;statement3. See Simple Query
Language specification. Example: temperature.accuracy<0.9.
georel: Spatial relationship between matching entities and a
reference shape. See Geographical Queries. Example: 'near'.
geometry: Geographical area to which the query is restricted.
See Geographical Queries. Example: point.
coords: List of latitude-longitude pairs of coordinates separated
by ';'. See Geographical Queries. Example: 41.390205,
2.154007;48.8566,2.3522.
limit: Limits the number of entities to be retrieved Example: 20
attrs: Comma-separated list of attribute names whose data are to
be included in the response. The attributes are retrieved in
the order specified by this parameter. If this parameter is
not included, the attributes are retrieved in arbitrary
order. See "Filtering out attributes and metadata" section
for more detail. Example: seatNumber.
metadata: A list of metadata names to include in the response.
See "Filtering out attributes and metadata" section for more
detail. Example: accuracy.
order_by: Criteria for ordering results. See "Ordering Results"
section for details. Example: temperature,!speed.
response_format (AttrsFormat, str): Response Format. Note: That if
'keyValues' or 'values' are used the response model will
change to List[ContextEntityKeyValues] and to List[Dict[str,
Any]], respectively.
Returns:
"""
url = urljoin(self.base_url, f"{self._url_version}/entities/")
headers = self.headers.copy()
params = {}
if entity_ids and id_pattern:
raise ValueError
if entity_types and type_pattern:
raise ValueError
if entity_ids:
if not isinstance(entity_ids, list):
entity_ids = [entity_ids]
params.update({"id": ",".join(entity_ids)})
if id_pattern:
try:
re.compile(id_pattern)
except re.error as err:
raise ValueError(f"Invalid Pattern: {err}") from err
params.update({"idPattern": id_pattern})
if entity_types:
if not isinstance(entity_types, list):
entity_types = [entity_types]
params.update({"type": ",".join(entity_types)})
if type_pattern:
try:
re.compile(type_pattern)
except re.error as err:
raise ValueError(f"Invalid Pattern: {err.msg}") from err
params.update({"typePattern": type_pattern})
if attrs:
params.update({"attrs": ",".join(attrs)})
if metadata:
params.update({"metadata": ",".join(metadata)})
if q:
if isinstance(q, str):
q = QueryString.parse_str(q)
params.update({"q": str(q)})
if mq:
params.update({"mq": str(mq)})
if geometry:
params.update({"geometry": geometry})
if georel:
params.update({"georel": georel})
if coords:
params.update({"coords": coords})
if order_by:
params.update({"orderBy": order_by})
if response_format not in list(AttrsFormat):
raise ValueError(f"Value must be in {list(AttrsFormat)}")
response_format = ",".join(["count", response_format])
params.update({"options": response_format})
try:
items = self.__pagination(
method=PaginationMethod.GET,
limit=limit,
url=url,
params=params,
headers=headers,
)
if AttrsFormat.NORMALIZED in response_format:
return ContextEntityList.model_validate({"entities": items}).entities
elif AttrsFormat.KEY_VALUES in response_format:
return ContextEntityKeyValuesList.model_validate(
{"entities": items}
).entities
return items # in case of VALUES as response_format
except requests.RequestException as err:
msg = "Could not load entities"
self.log_error(err=err, msg=msg)
raise
[docs] def get_entity(
self,
entity_id: str,
entity_type: str = None,
attrs: List[str] = None,
metadata: List[str] = None,
response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
) -> Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]:
"""
This operation must return one entity element only, but there may be
more than one entity with the same ID (e.g. entities with same ID but
different types). In such case, an error message is returned, with
the HTTP status code set to 409 Conflict.
Args:
entity_id (String): Id of the entity to be retrieved
entity_type (String): Entity type, to avoid ambiguity in case
there are several entities with the same entity id.
attrs (List of Strings): List of attribute names whose data must be
included in the response. The attributes are retrieved in the
order specified by this parameter.
See "Filtering out attributes and metadata" section for more
detail. If this parameter is not included, the attributes are
retrieved in arbitrary order, and all the attributes of the
entity are included in the response.
Example: temperature, humidity.
metadata (List of Strings): A list of metadata names to include in
the response. See "Filtering out attributes and metadata"
section for more detail. Example: accuracy.
response_format (AttrsFormat, str): Representation format of
response
Returns:
ContextEntity
"""
url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}")
headers = self.headers.copy()
params = {}
if entity_type:
params.update({"type": entity_type})
if attrs:
params.update({"attrs": ",".join(attrs)})
if metadata:
params.update({"metadata": ",".join(metadata)})
if response_format not in list(AttrsFormat):
raise ValueError(f"Value must be in {list(AttrsFormat)}")
params.update({"options": response_format})
try:
res = self.get(url=url, params=params, headers=headers)
if res.ok:
self.logger.info("Entity successfully retrieved!")
self.logger.debug("Received: %s", res.json())
if response_format == AttrsFormat.NORMALIZED:
return ContextEntity(**res.json())
if response_format == AttrsFormat.KEY_VALUES:
return ContextEntityKeyValues(**res.json())
return res.json()
res.raise_for_status()
except requests.RequestException as err:
msg = f"Could not load entity {entity_id}"
self.log_error(err=err, msg=msg)
raise
[docs] def get_entity_attributes(
self,
entity_id: str,
entity_type: str = None,
attrs: List[str] = None,
metadata: List[str] = None,
response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
) -> Dict[str, ContextAttribute]:
"""
This request is similar to retrieving the whole entity, however this
one omits the id and type fields. Just like the general request of
getting an entire entity, this operation must return only one entity
element. If more than one entity with the same ID is found (e.g.
entities with same ID but different type), an error message is
returned, with the HTTP status code set to 409 Conflict.
Args:
entity_id (String): Id of the entity to be retrieved
entity_type (String): Entity type, to avoid ambiguity in case
there are several entities with the same entity id.
attrs (List of Strings): List of attribute names whose data must be
included in the response. The attributes are retrieved in the
order specified by this parameter.
See "Filtering out attributes and metadata" section for more
detail. If this parameter is not included, the attributes are
retrieved in arbitrary order, and all the attributes of the
entity are included in the response. Example: temperature,
humidity.
metadata (List of Strings): A list of metadata names to include in
the response. See "Filtering out attributes and metadata"
section for more detail. Example: accuracy.
response_format (AttrsFormat, str): Representation format of
response
Returns:
Dict
"""
url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
headers = self.headers.copy()
params = {}
if entity_type:
params.update({"type": entity_type})
if attrs:
params.update({"attrs": ",".join(attrs)})
if metadata:
params.update({"metadata": ",".join(metadata)})
if response_format not in list(AttrsFormat):
raise ValueError(f"Value must be in {list(AttrsFormat)}")
params.update({"options": response_format})
try:
res = self.get(url=url, params=params, headers=headers)
if res.ok:
if response_format == AttrsFormat.NORMALIZED:
return {
key: ContextAttribute(**values)
for key, values in res.json().items()
}
return res.json()
res.raise_for_status()
except requests.RequestException as err:
msg = f"Could not load attributes from entity {entity_id} !"
self.log_error(err=err, msg=msg)
raise
[docs] def update_entity(
self,
entity: Union[ContextEntity, ContextEntityKeyValues, dict],
append_strict: bool = False,
key_values: bool = False,
):
"""
The request payload is an object representing the attributes to
append or update.
Note:
Update means overwriting the existing entity. If you want to
manipulate you should rather use patch_entity.
Args:
entity (ContextEntity):
append_strict: If `False` the entity attributes are updated (if they
previously exist) or appended (if they don't previously exist)
with the ones in the payload.
If `True` all the attributes in the payload not
previously existing in the entity are appended. In addition
to that, in case some of the attributes in the payload
already exist in the entity, an error is returned.
More precisely this means a strict append procedure.
key_values: By default False. If set to True, the payload uses
the keyValues simplified entity representation, i.e.
ContextEntityKeyValues.
Returns:
None
"""
if key_values:
if isinstance(entity, dict):
entity = copy.deepcopy(entity)
_id = entity.pop("id")
_type = entity.pop("type")
attrs = entity
entity = ContextEntityKeyValues(id=_id, type=_type)
else:
attrs = entity.model_dump(exclude={"id", "type"})
else:
attrs = entity.get_attributes()
self.update_or_append_entity_attributes(
entity_id=entity.id,
entity_type=entity.type,
attrs=attrs,
append_strict=append_strict,
key_values=key_values,
)
[docs] def update_entity_properties(
self, entity: ContextEntity, append_strict: bool = False
):
"""
The request payload is an object representing the attributes, of any type
but Relationship, to append or update.
Note:
Update means overwriting the existing entity. If you want to
manipulate you should rather use patch_entity.
Args:
entity (ContextEntity):
append_strict: If `False` the entity attributes are updated (if they
previously exist) or appended (if they don't previously exist)
with the ones in the payload.
If `True` all the attributes in the payload not
previously existing in the entity are appended. In addition
to that, in case some of the attributes in the payload
already exist in the entity, an error is returned.
More precisely this means a strict append procedure.
Returns:
None
"""
self.update_or_append_entity_attributes(
entity_id=entity.id,
entity_type=entity.type,
attrs=entity.get_properties(),
append_strict=append_strict,
)
[docs] def update_entity_relationships(
self, entity: ContextEntity, append_strict: bool = False
):
"""
The request payload is an object representing only the attributes, of type
Relationship, to append or update.
Note:
Update means overwriting the existing entity. If you want to
manipulate you should rather use patch_entity.
Args:
entity (ContextEntity):
append_strict: If `False` the entity attributes are updated (if they
previously exist) or appended (if they don't previously exist)
with the ones in the payload.
If `True` all the attributes in the payload not
previously existing in the entity are appended. In addition
to that, in case some of the attributes in the payload
already exist in the entity, an error is returned.
More precisely this means a strict append procedure.
Returns:
None
"""
self.update_or_append_entity_attributes(
entity_id=entity.id,
entity_type=entity.type,
attrs=entity.get_relationships(),
append_strict=append_strict,
)
[docs] def delete_entity(
self,
entity_id: str,
entity_type: str = None,
delete_devices: bool = False,
iota_client: IoTAClient = None,
iota_url: AnyHttpUrl = settings.IOTA_URL,
) -> None:
"""
Remove a entity from the context broker. No payload is required
or received.
Args:
entity_id:
Id of the entity to be deleted
entity_type:
Entity type, to avoid ambiguity in case there are several
entities with the same entity id.
delete_devices:
If True, also delete all devices that reference this
entity (entity_id as entity_name)
iota_client:
Corresponding IoTA-Client used to access IoTA-Agent
iota_url:
URL of the corresponding IoT-Agent. This will autogenerate
an IoTA-Client, mirroring the information of the
ContextBrokerClient, e.g. FiwareHeader, and other headers
Returns:
None
"""
url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}")
headers = self.headers.copy()
if entity_type:
params = {"type": entity_type}
else:
params = None
try:
res = self.delete(url=url, params=params, headers=headers)
if res.ok:
self.logger.info("Entity '%s' successfully deleted!", entity_id)
else:
res.raise_for_status()
except requests.RequestException as err:
msg = f"Could not delete entity {entity_id} !"
self.log_error(err=err, msg=msg)
raise
if delete_devices:
from filip.clients.ngsi_v2 import IoTAClient
if iota_client:
iota_client_local = deepcopy(iota_client)
else:
warnings.warn(
"No IoTA-Client object provided! "
"Will try to generate one. "
"This usage is not recommended."
)
iota_client_local = IoTAClient(
url=iota_url,
fiware_header=self.fiware_headers,
headers=self.headers,
)
for device in iota_client_local.get_device_list(entity_names=[entity_id]):
if entity_type:
if device.entity_type == entity_type:
iota_client_local.delete_device(device_id=device.device_id)
else:
iota_client_local.delete_device(device_id=device.device_id)
iota_client_local.close()
[docs] def delete_entities(self, entities: List[ContextEntity]) -> None:
"""
Remove a list of entities from the context broker. This methode is
more efficient than to call delete_entity() for each entity
Args:
entities: List[ContextEntity]: List of entities to be deleted
Raises:
Exception, if one of the entities is not in the ContextBroker
Returns:
None
"""
# update() delete, deletes all entities without attributes completely,
# and removes the attributes for the other
# The entities are sorted based on the fact if they have
# attributes.
entities_with_attributes: List[ContextEntity] = []
for entity in entities:
attribute_names = [
key
for key in entity.model_dump()
if key not in ContextEntity.model_fields
]
if len(attribute_names) > 0:
entities_with_attributes.append(
ContextEntity(id=entity.id, type=entity.type)
)
# Post update_delete for those without attribute only once,
# for the other post update_delete again but for the changed entity
# in the ContextBroker (only id and type left)
if len(entities) > 0:
self.update(entities=entities, action_type="delete")
if len(entities_with_attributes) > 0:
self.update(entities=entities_with_attributes, action_type="delete")
[docs] def update_or_append_entity_attributes(
self,
entity_id: str,
attrs: Union[
List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any]
],
entity_type: str = None,
append_strict: bool = False,
forcedUpdate: bool = False,
key_values: bool = False,
):
"""
The request payload is an object representing the attributes to
append or update. This corresponds to a 'POST' request if append is
set to 'False'
Note:
Be careful not to update attributes that are
provided via context registration, e.g. commands. Commands are
removed before sending the request. To avoid breaking things.
Args:
entity_id: Entity id to be updated
entity_type: Entity type, to avoid ambiguity in case there are
several entities with the same entity id.
attrs: List of attributes to update or to append
append_strict: If `False` the entity attributes are updated (if they
previously exist) or appended (if they don't previously exist)
with the ones in the payload.
If `True` all the attributes in the payload not
previously existing in the entity are appended. In addition
to that, in case some of the attributes in the payload
already exist in the entity, an error is returned.
More precisely this means a strict append procedure.
forcedUpdate: Update operation have to trigger any matching
subscription, no matter if there is an actual attribute
update or no instead of the default behavior, which is to
updated only if attribute is effectively updated.
key_values: By default False. If set to True, the payload uses
the keyValues simplified entity representation, i.e.
ContextEntityKeyValues.
Returns:
None
"""
url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
headers = self.headers.copy()
params = {}
if entity_type:
params.update({"type": entity_type})
else:
entity_type = "dummy"
options = []
if append_strict:
options.append("append")
if forcedUpdate:
options.append("forcedUpdate")
if key_values:
assert isinstance(attrs, dict), "for keyValues attrs has to be a dict"
options.append("keyValues")
if options:
params.update({"options": ",".join(options)})
if key_values:
entity = ContextEntityKeyValues(id=entity_id, type=entity_type, **attrs)
else:
entity = ContextEntity(id=entity_id, type=entity_type)
entity.add_attributes(attrs)
# exclude commands from the send data,
# as they live in the IoTA-agent
excluded_keys = {"id", "type"}
# excluded_keys.update(
# entity.get_commands(response_format=PropertyFormat.DICT).keys()
# )
try:
res = self.post(
url=url,
headers=headers,
json=entity.model_dump(exclude=excluded_keys, exclude_none=True),
params=params,
)
if res.ok:
self.logger.info("Entity '%s' successfully " "updated!", entity.id)
else:
res.raise_for_status()
except requests.RequestException as err:
msg = f"Could not update or append attributes of entity" f" {entity.id} !"
self.log_error(err=err, msg=msg)
raise
def _patch_entity_key_values(
self,
entity: Union[ContextEntityKeyValues, dict],
):
"""
The entity are updated with a ContextEntityKeyValues object or a
dictionary contain the simplified entity data. This corresponds to a
'PATCH' request.
Only existing attribute can be updated!
Args:
entity: A ContextEntityKeyValues object or a dictionary contain
the simplified entity data
"""
if isinstance(entity, dict):
entity = ContextEntityKeyValues(**entity)
url = urljoin(self.base_url, f"v2/entities/{entity.id}/attrs")
headers = self.headers.copy()
params = {"type": entity.type, "options": AttrsFormat.KEY_VALUES.value}
try:
res = self.patch(
url=url,
headers=headers,
json=entity.model_dump(exclude={"id", "type"}, exclude_unset=True),
params=params,
)
if res.ok:
self.logger.info("Entity '%s' successfully " "updated!", entity.id)
else:
res.raise_for_status()
except requests.RequestException as err:
msg = f"Could not update attributes of entity" f" {entity.id} !"
self.log_error(err=err, msg=msg)
raise
[docs] def update_existing_entity_attributes(
self,
entity_id: str,
attrs: Union[
List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any]
],
entity_type: str = None,
forcedUpdate: bool = False,
override_metadata: bool = False,
key_values: bool = False,
):
"""
The entity attributes are updated with the ones in the payload.
In addition to that, if one or more attributes in the payload doesn't
exist in the entity, an error is returned. This corresponds to a
'PATCH' request.
Args:
entity_id: Entity id to be updated
entity_type: Entity type, to avoid ambiguity in case there are
several entities with the same entity id.
attrs: List of attributes to update or to append
forcedUpdate: Update operation have to trigger any matching
subscription, no matter if there is an actual attribute
update or no instead of the default behavior, which is to
updated only if attribute is effectively updated.
override_metadata:
Bool,replace the existing metadata with the one provided in
the request
key_values: By default False. If set to True, the payload uses
the keyValues simplified entity representation, i.e.
ContextEntityKeyValues.
Returns:
None
"""
url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
headers = self.headers.copy()
if entity_type:
params = {"type": entity_type}
else:
params = None
entity_type = "dummy"
options = []
if override_metadata:
options.append("overrideMetadata")
if forcedUpdate:
options.append("forcedUpdate")
if key_values:
assert isinstance(attrs, dict), "for keyValues the attrs must be dict"
payload = attrs
options.append("keyValues")
else:
entity = ContextEntity(id=entity_id, type=entity_type)
entity.add_attributes(attrs)
payload = entity.model_dump(exclude={"id", "type"}, exclude_none=True)
if options:
params.update({"options": ",".join(options)})
try:
res = self.patch(
url=url,
headers=headers,
json=payload,
params=params,
)
if res.ok:
self.logger.info("Entity '%s' successfully " "updated!", entity_id)
else:
res.raise_for_status()
except requests.RequestException as err:
msg = f"Could not update attributes of entity" f" {entity_id} !"
self.log_error(err=err, msg=msg)
raise
[docs] def override_entity(
self, entity: Union[ContextEntity, ContextEntityKeyValues], **kwargs
):
"""
The request payload is an object representing the attributes to
override the existing entity.
Note:
If you want to manipulate you should rather use patch_entity.
Args:
entity (ContextEntity or ContextEntityKeyValues):
Returns:
None
"""
return self.replace_entity_attributes(
entity_id=entity.id,
entity_type=entity.type,
attrs=entity.get_attributes(),
**kwargs,
)
[docs] def replace_entity_attributes(
self,
entity_id: str,
attrs: Union[
List[Union[NamedContextAttribute, Dict[str, ContextAttribute]]], Dict
],
entity_type: str = None,
forcedUpdate: bool = False,
key_values: bool = False,
):
"""
The attributes previously existing in the entity are removed and
replaced by the ones in the request. This corresponds to a 'PUT'
request.
Args:
entity_id: Entity id to be updated
entity_type: Entity type, to avoid ambiguity in case there are
several entities with the same entity id.
attrs: List of attributes to add to the entity or dict of
attributes in case of key_values=True.
forcedUpdate: Update operation have to trigger any matching
subscription, no matter if there is an actual attribute
update or no instead of the default behavior, which is to
updated only if attribute is effectively updated.
key_values(bool):
By default False. If set to True, "options=keyValues" will
be included in params of the request. The payload uses
the keyValues simplified entity representation, i.e.
ContextEntityKeyValues.
Returns:
None
"""
url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs")
headers = self.headers.copy()
params = {}
options = []
if entity_type:
params.update({"type": entity_type})
else:
entity_type = "dummy"
if forcedUpdate:
options.append("forcedUpdate")
if key_values:
options.append("keyValues")
assert isinstance(attrs, dict)
else:
entity = ContextEntity(id=entity_id, type=entity_type)
entity.add_attributes(attrs)
attrs = entity.model_dump(exclude={"id", "type"}, exclude_none=True)
if options:
params.update({"options": ",".join(options)})
try:
res = self.put(
url=url,
headers=headers,
json=attrs,
params=params,
)
if res.ok:
self.logger.info("Entity '%s' successfully " "updated!", entity_id)
else:
res.raise_for_status()
except requests.RequestException as err:
msg = f"Could not replace attribute of entity {entity_id} !"
self.log_error(err=err, msg=msg)
raise
# Attribute operations
[docs] def get_attribute(
self,
entity_id: str,
attr_name: str,
entity_type: str = None,
metadata: str = None,
response_format="",
) -> ContextAttribute:
"""
Retrieves a specified attribute from an entity.
Args:
entity_id: Id of the entity. Example: Bcn_Welt
attr_name: Name of the attribute to be retrieved.
entity_type (Optional): Type of the entity to retrieve
metadata (Optional): A list of metadata names to include in the
response. See "Filtering out attributes and metadata" section
for more detail.
Returns:
The content of the retrieved attribute as ContextAttribute
Raises:
Error
"""
url = urljoin(
self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
)
headers = self.headers.copy()
params = {}
if entity_type:
params.update({"type": entity_type})
if metadata:
params.update({"metadata": ",".join(metadata)})
try:
res = self.get(url=url, params=params, headers=headers)
if res.ok:
self.logger.debug("Received: %s", res.json())
return ContextAttribute(**res.json())
res.raise_for_status()
except requests.RequestException as err:
msg = (
f"Could not load attribute '{attr_name}' from entity" f"'{entity_id}' "
)
self.log_error(err=err, msg=msg)
raise
[docs] def update_entity_attribute(
self,
entity_id: str,
attr: Union[ContextAttribute, NamedContextAttribute],
*,
entity_type: str = None,
attr_name: str = None,
override_metadata: bool = True,
forcedUpdate: bool = False,
):
"""
Updates a specified attribute from an entity.
Args:
attr:
context attribute to update
entity_id:
Id of the entity. Example: Bcn_Welt
entity_type:
Entity type, to avoid ambiguity in case there are
several entities with the same entity id.
forcedUpdate: Update operation have to trigger any matching
subscription, no matter if there is an actual attribute
update or no instead of the default behavior, which is to
updated only if attribute is effectively updated.
attr_name:
Name of the attribute to be updated.
override_metadata:
Bool, if set to `True` (default) the metadata will be
overwritten. This is for backwards compatibility reasons.
If `False` the metadata values will be either updated if
already existing or append if not.
See also:
https://fiware-orion.readthedocs.io/en/master/user/metadata.html
"""
headers = self.headers.copy()
if not isinstance(attr, NamedContextAttribute):
assert attr_name is not None, (
"Missing name for attribute. "
"attr_name must be present if"
"attr is of type ContextAttribute"
)
else:
assert attr_name is None, (
"Invalid argument attr_name. Do not set "
"attr_name if attr is of type "
"NamedContextAttribute"
)
attr_name = attr.name
url = urljoin(
self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
)
params = {}
if entity_type:
params.update({"type": entity_type})
# set overrideMetadata option (we assure backwards compatibility here)
options = []
if override_metadata:
options.append("overrideMetadata")
if forcedUpdate:
options.append("forcedUpdate")
if options:
params.update({"options": ",".join(options)})
try:
res = self.put(
url=url,
headers=headers,
params=params,
json=attr.model_dump(exclude={"name"}, exclude_none=True),
)
if res.ok:
self.logger.info(
"Attribute '%s' of '%s' " "successfully updated!",
attr_name,
entity_id,
)
else:
res.raise_for_status()
except requests.RequestException as err:
msg = (
f"Could not update attribute '{attr_name}' of entity" f"'{entity_id}' "
)
self.log_error(err=err, msg=msg)
raise
[docs] def delete_entity_attribute(
self, entity_id: str, attr_name: str, entity_type: str = None
) -> None:
"""
Removes a specified attribute from an entity.
Args:
entity_id: Id of the entity.
attr_name: Name of the attribute to be retrieved.
entity_type: Entity type, to avoid ambiguity in case there are
several entities with the same entity id.
Raises:
Error
"""
url = urljoin(
self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
)
headers = self.headers.copy()
params = {}
if entity_type:
params.update({"type": entity_type})
try:
res = self.delete(url=url, headers=headers)
if res.ok:
self.logger.info(
"Attribute '%s' of '%s' " "successfully deleted!",
attr_name,
entity_id,
)
else:
res.raise_for_status()
except requests.RequestException as err:
msg = f"Could not delete attribute '{attr_name}' of entity '{entity_id}'"
self.log_error(err=err, msg=msg)
raise
# Attribute value operations
[docs] def get_attribute_value(
self, entity_id: str, attr_name: str, entity_type: str = None
) -> Any:
"""
This operation returns the value property with the value of the
attribute.
Args:
entity_id: Id of the entity. Example: Bcn_Welt
attr_name: Name of the attribute to be retrieved.
Example: temperature.
entity_type: Entity type, to avoid ambiguity in case there are
several entities with the same entity id.
Returns:
"""
url = urljoin(
self.base_url,
f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value",
)
headers = self.headers.copy()
params = {}
if entity_type:
params.update({"type": entity_type})
try:
res = self.get(url=url, params=params, headers=headers)
if res.ok:
self.logger.debug("Received: %s", res.json())
return res.json()
res.raise_for_status()
except requests.RequestException as err:
msg = (
f"Could not load value of attribute '{attr_name}' from "
f"entity'{entity_id}' "
)
self.log_error(err=err, msg=msg)
raise
[docs] def update_attribute_value(
self,
*,
entity_id: str,
attr_name: str,
value: Any,
entity_type: str = None,
forcedUpdate: bool = False,
):
"""
Updates the value of a specified attribute of an entity
Args:
value: update value
entity_id: Id of the entity. Example: Bcn_Welt
attr_name: Name of the attribute to be retrieved.
Example: temperature.
entity_type: Entity type, to avoid ambiguity in case there are
several entities with the same entity id.
forcedUpdate: Update operation have to trigger any matching
subscription, no matter if there is an actual attribute
update or no instead of the default behavior, which is to
updated only if attribute is effectively updated.
Returns:
"""
url = urljoin(
self.base_url,
f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value",
)
headers = self.headers.copy()
params = {}
if entity_type:
params.update({"type": entity_type})
options = []
if forcedUpdate:
options.append("forcedUpdate")
if options:
params.update({"options": ",".join(options)})
try:
if not isinstance(value, (dict, list)):
headers.update({"Content-Type": "text/plain"})
if isinstance(value, str):
value = f"{value}"
res = self.put(url=url, headers=headers, json=value, params=params)
else:
res = self.put(url=url, headers=headers, json=value, params=params)
if res.ok:
self.logger.info(
"Attribute '%s' of '%s' " "successfully updated!",
attr_name,
entity_id,
)
else:
res.raise_for_status()
except requests.RequestException as err:
msg = (
f"Could not update value of attribute '{attr_name}' from "
f"entity '{entity_id}' "
)
self.log_error(err=err, msg=msg)
raise
# Types Operations
[docs] def get_entity_types(
self, *, limit: int = None, offset: int = None, options: str = None
) -> List[Dict[str, Any]]:
"""
Args:
limit: Limit the number of types to be retrieved.
offset: Skip a number of records.
options: Options dictionary. Allowed: count, values
Returns:
"""
url = urljoin(self.base_url, f"{self._url_version}/types")
headers = self.headers.copy()
params = {}
if limit:
params.update({"limit": limit})
if offset:
params.update({"offset": offset})
if options:
params.update({"options": options})
try:
res = self.get(url=url, params=params, headers=headers)
if res.ok:
self.logger.debug("Received: %s", res.json())
return res.json()
res.raise_for_status()
except requests.RequestException as err:
msg = "Could not load entity types!"
self.log_error(err=err, msg=msg)
raise
[docs] def get_entity_type(self, entity_type: str) -> Dict[str, Any]:
"""
Args:
entity_type: Entity Type. Example: Room
Returns:
"""
url = urljoin(self.base_url, f"{self._url_version}/types/{entity_type}")
headers = self.headers.copy()
params = {}
try:
res = self.get(url=url, params=params, headers=headers)
if res.ok:
self.logger.debug("Received: %s", res.json())
return res.json()
res.raise_for_status()
except requests.RequestException as err:
msg = f"Could not load entities of type" f"'{entity_type}' "
self.log_error(err=err, msg=msg)
raise
# SUBSCRIPTION API ENDPOINTS
[docs] def get_subscription_list(self, limit: PositiveInt = inf) -> List[Subscription]:
"""
Returns a list of all the subscriptions present in the system.
Args:
limit: Limit the number of subscriptions to be retrieved
Returns:
list of subscriptions
"""
url = urljoin(self.base_url, f"{self._url_version}/subscriptions/")
headers = self.headers.copy()
params = {}
# We always use the 'count' option to check weather pagination is
# required
params.update({"options": "count"})
try:
items = self.__pagination(
limit=limit, url=url, params=params, headers=headers
)
adapter = TypeAdapter(List[Subscription])
return adapter.validate_python(items)
except requests.RequestException as err:
msg = "Could not load subscriptions!"
self.log_error(err=err, msg=msg)
raise
[docs] def post_subscription(
self,
subscription: Subscription,
update: bool = False,
skip_initial_notification: bool = False,
) -> str:
"""
Creates a new subscription. The subscription is represented by a
Subscription object defined in filip.cb.models.
If the subscription already exists, the adding is prevented and the id
of the existing subscription is returned.
A subscription is deemed as already existing if there exists a
subscription with the exact same subject and notification fields. All
optional fields are not considered.
Args:
subscription: Subscription
update: True - If the subscription already exists, update it
False- If the subscription already exists, throw warning
skip_initial_notification: True - Initial Notifications will be
sent to recipient containing the whole data. This is
deprecated and removed from version 3.0 of the context broker.
False - skip the initial notification
Returns:
str: Id of the (created) subscription
"""
existing_subscriptions = self.get_subscription_list()
sub_dict = subscription.model_dump(include={"subject", "notification"})
for ex_sub in existing_subscriptions:
if self._subscription_dicts_are_equal(
sub_dict, ex_sub.model_dump(include={"subject", "notification"})
):
self.logger.info("Subscription already exists")
if update:
self.logger.info("Updated subscription")
subscription.id = ex_sub.id
self.update_subscription(subscription)
else:
warnings.warn(
f"Subscription existed already with the id" f" {ex_sub.id}"
)
return ex_sub.id
params = {}
if skip_initial_notification:
version = self.get_version()["orion"]["version"]
if parse_version(version) <= parse_version("3.1"):
params.update({"options": "skipInitialNotification"})
else:
pass
warnings.warn(
f"Skip initial notifications is a deprecated "
f"feature of older versions <=3.1 of the context "
f"broker. The Context Broker that you requesting has "
f"version: {version}. For newer versions we "
f"automatically skip this option. Consider "
f"refactoring and updating your services",
DeprecationWarning,
)
url = urljoin(self.base_url, "v2/subscriptions")
headers = self.headers.copy()
headers.update({"Content-Type": "application/json"})
try:
res = self.post(
url=url,
headers=headers,
data=subscription.model_dump_json(exclude={"id"}, exclude_none=True),
params=params,
)
if res.ok:
self.logger.info("Subscription successfully created!")
return res.headers["Location"].split("/")[-1]
res.raise_for_status()
except requests.RequestException as err:
msg = "Could not send subscription!"
self.log_error(err=err, msg=msg)
raise
[docs] def get_subscription(self, subscription_id: str) -> Subscription:
"""
Retrieves a subscription from
Args:
subscription_id: id of the subscription
Returns:
"""
url = urljoin(
self.base_url, f"{self._url_version}/subscriptions/{subscription_id}"
)
headers = self.headers.copy()
try:
res = self.get(url=url, headers=headers)
if res.ok:
self.logger.debug("Received: %s", res.json())
return Subscription(**res.json())
res.raise_for_status()
except requests.RequestException as err:
msg = f"Could not load subscription {subscription_id}"
self.log_error(err=err, msg=msg)
raise
[docs] def update_subscription(
self, subscription: Subscription, skip_initial_notification: bool = False
):
"""
Only the fields included in the request are updated in the subscription.
Args:
subscription: Subscription to update
skip_initial_notification: True - Initial Notifications will be
sent to recipient containing the whole data. This is
deprecated and removed from version 3.0 of the context broker.
False - skip the initial notification
Returns:
None
"""
params = {}
if skip_initial_notification:
version = self.get_version()["orion"]["version"]
if parse_version(version) <= parse_version("3.1"):
params.update({"options": "skipInitialNotification"})
else:
pass
warnings.warn(
f"Skip initial notifications is a deprecated "
f"feature of older versions <3.1 of the context "
f"broker. The Context Broker that you requesting has "
f"version: {version}. For newer versions we "
f"automatically skip this option. Consider "
f"refactoring and updating your services",
DeprecationWarning,
)
url = urljoin(
self.base_url, f"{self._url_version}/subscriptions/{subscription.id}"
)
headers = self.headers.copy()
headers.update({"Content-Type": "application/json"})
try:
res = self.patch(
url=url,
headers=headers,
data=subscription.model_dump_json(exclude={"id"}, exclude_none=True),
)
if res.ok:
self.logger.info("Subscription successfully updated!")
else:
res.raise_for_status()
except requests.RequestException as err:
msg = f"Could not update subscription {subscription.id}"
self.log_error(err=err, msg=msg)
raise
[docs] def delete_subscription(self, subscription_id: str) -> None:
"""
Deletes a subscription from a Context Broker
Args:
subscription_id: id of the subscription
"""
url = urljoin(
self.base_url, f"{self._url_version}/subscriptions/{subscription_id}"
)
headers = self.headers.copy()
try:
res = self.delete(url=url, headers=headers)
if res.ok:
self.logger.info(
f"Subscription '{subscription_id}' " f"successfully deleted!"
)
else:
res.raise_for_status()
except requests.RequestException as err:
msg = f"Could not delete subscription {subscription_id}"
self.log_error(err=err, msg=msg)
raise
# Registration API
[docs] def get_registration_list(self, *, limit: PositiveInt = None) -> List[Registration]:
"""
Lists all the context provider registrations present in the system.
Args:
limit: Limit the number of registrations to be retrieved
Returns:
"""
url = urljoin(self.base_url, f"{self._url_version}/registrations/")
headers = self.headers.copy()
params = {}
# We always use the 'count' option to check weather pagination is
# required
params.update({"options": "count"})
try:
items = self.__pagination(
limit=limit, url=url, params=params, headers=headers
)
adapter = TypeAdapter(List[Registration])
return adapter.validate_python(items)
except requests.RequestException as err:
msg = "Could not load registrations!"
self.log_error(err=err, msg=msg)
raise
[docs] def post_registration(self, registration: Registration):
"""
Creates a new context provider registration. This is typically used
for binding context sources as providers of certain data. The
registration is represented by cb.models.Registration
Args:
registration (Registration):
Returns:
"""
url = urljoin(self.base_url, f"{self._url_version}/registrations")
headers = self.headers.copy()
headers.update({"Content-Type": "application/json"})
try:
res = self.post(
url=url,
headers=headers,
data=registration.model_dump_json(exclude={"id"}, exclude_none=True),
)
if res.ok:
self.logger.info("Registration successfully created!")
return res.headers["Location"].split("/")[-1]
res.raise_for_status()
except requests.RequestException as err:
msg = f"Could not send registration {registration.id}!"
self.log_error(err=err, msg=msg)
raise
[docs] def get_registration(self, registration_id: str) -> Registration:
"""
Retrieves a registration from context broker by id
Args:
registration_id: id of the registration
Returns:
Registration
"""
url = urljoin(
self.base_url, f"{self._url_version}/registrations/{registration_id}"
)
headers = self.headers.copy()
try:
res = self.get(url=url, headers=headers)
if res.ok:
self.logger.debug("Received: %s", res.json())
return Registration(**res.json())
res.raise_for_status()
except requests.RequestException as err:
msg = f"Could not load registration {registration_id} !"
self.log_error(err=err, msg=msg)
raise
[docs] def add_valid_relationships(
self, entities: List[ContextEntity]
) -> List[ContextEntity]:
"""
Validate all attributes in the given entities. If the attribute value points to
an existing entity, it is assumed that this attribute is a relationship, and it
will be assigned with the attribute type "relationship"
Args:
entities: list of entities that need to be validated.
Returns:
updated entities
"""
updated_entities = []
for entity in entities[:]:
for attr_name, attr_value in entity.model_dump(
exclude={"id", "type"}
).items():
if isinstance(attr_value, dict):
if self.validate_relationship(attr_value):
entity.update_attribute(
{
attr_name: ContextAttribute(
**{
"type": DataType.RELATIONSHIP,
"value": attr_value.get("value"),
}
)
}
)
updated_entities.append(entity)
return updated_entities
[docs] def remove_invalid_relationships(
self, entities: List[ContextEntity], hard_remove: bool = True
) -> List[ContextEntity]:
"""
Removes invalid relationships from the entities. An invalid relationship
is a relationship that has no destination entity.
Args:
entities: list of entities that need to be validated.
hard_remove: If True, invalid relationships will be deleted.
If False, invalid relationships will be changed to Text
attributes.
Returns:
updated entities
"""
updated_entities = []
for entity in entities[:]:
for relationship in entity.get_relationships():
if not self.validate_relationship(relationship):
if hard_remove:
entity.delete_attributes(attrs=[relationship])
else:
# change the attribute type to "Text"
entity.update_attribute(
attrs=[
NamedContextAttribute(
name=relationship.name,
type=DataType.TEXT,
value=relationship.value,
)
]
)
updated_entities.append(entity)
return updated_entities
[docs] def validate_relationship(
self, relationship: Union[NamedContextAttribute, ContextAttribute, Dict]
) -> bool:
"""
Validates a relationship. A relationship is valid if it points to an existing
entity. Otherwise, it is considered invalid
Args:
relationship: relationship to validate
Returns
True if the relationship is valid, False otherwise
"""
if isinstance(relationship, NamedContextAttribute) or isinstance(
relationship, ContextAttribute
):
destination_id = relationship.value
elif isinstance(relationship, dict):
destination_id = relationship.get("value")
if destination_id is None:
raise ValueError(
"Invalid relationship dictionary format\n"
"Expected format: {"
f'"type": "{DataType.RELATIONSHIP.value}", '
'"value" "entity_id"}'
)
else:
raise ValueError("Invalid relationship type.")
try:
destination_entity = self.get_entity(entity_id=destination_id)
return destination_entity.id == destination_id
except requests.RequestException as err:
if err.response.status_code == 404:
return False
[docs] def update_registration(self, registration: Registration):
"""
Only the fields included in the request are updated in the registration.
Args:
registration: Registration to update
Returns:
"""
url = urljoin(
self.base_url, f"{self._url_version}/registrations/{registration.id}"
)
headers = self.headers.copy()
headers.update({"Content-Type": "application/json"})
try:
res = self.patch(
url=url,
headers=headers,
data=registration.model_dump_json(exclude={"id"}, exclude_none=True),
)
if res.ok:
self.logger.info("Registration successfully updated!")
else:
res.raise_for_status()
except requests.RequestException as err:
msg = f"Could not update registration {registration.id} !"
self.log_error(err=err, msg=msg)
raise
[docs] def delete_registration(self, registration_id: str) -> None:
"""
Deletes a subscription from a Context Broker
Args:
registration_id: id of the subscription
"""
url = urljoin(
self.base_url, f"{self._url_version}/registrations/{registration_id}"
)
headers = self.headers.copy()
try:
res = self.delete(url=url, headers=headers)
if res.ok:
self.logger.info(
"Registration '%s' " "successfully deleted!", registration_id
)
res.raise_for_status()
except requests.RequestException as err:
msg = f"Could not delete registration {registration_id} !"
self.log_error(err=err, msg=msg)
raise
# Batch operation API
[docs] def update(
self,
*,
entities: List[Union[ContextEntity, ContextEntityKeyValues]],
action_type: Union[ActionType, str],
update_format: str = None,
forcedUpdate: bool = False,
override_metadata: bool = False,
) -> None:
"""
This operation allows to create, update and/or delete several entities
in a single batch operation.
This operation is split in as many individual operations as entities
in the entities vector, so the actionType is executed for each one of
them. Depending on the actionType, a mapping with regular non-batch
operations can be done:
append: maps to POST /v2/entities (if the entity does not already exist)
or POST /v2/entities/<id>/attrs (if the entity already exists).
appendStrict: maps to POST /v2/entities (if the entity does not
already exist) or POST /v2/entities/<id>/attrs?options=append (if the
entity already exists).
update: maps to PATCH /v2/entities/<id>/attrs.
delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every
attribute included in the entity or to DELETE /v2/entities/<id> if
no attribute were included in the entity.
replace: maps to PUT /v2/entities/<id>/attrs.
Args:
entities: "an array of entities, each entity specified using the "
"JSON entity representation format "
action_type (Update): "actionType, to specify the kind of update
action to do: either append, appendStrict, update, delete,
or replace. "
update_format (str): Optional 'keyValues'
forcedUpdate: Update operation have to trigger any matching
subscription, no matter if there is an actual attribute
update or no instead of the default behavior, which is to
updated only if attribute is effectively updated.
override_metadata:
Bool, replace the existing metadata with the one provided in
the request
Returns:
"""
url = urljoin(self.base_url, f"{self._url_version}/op/update")
headers = self.headers.copy()
headers.update({"Content-Type": "application/json"})
params = {}
options = []
if override_metadata:
options.append("overrideMetadata")
if forcedUpdate:
options.append("forcedUpdate")
if update_format:
assert (
update_format == AttrsFormat.KEY_VALUES.value
), "Only 'keyValues' is allowed as update format"
options.append("keyValues")
if options:
params.update({"options": ",".join(options)})
update = Update(actionType=action_type, entities=entities)
try:
res = self.post(
url=url,
headers=headers,
params=params,
json=update.model_dump(by_alias=True),
)
if res.ok:
self.logger.info("Update operation '%s' succeeded!", action_type)
else:
res.raise_for_status()
except requests.RequestException as err:
msg = f"Update operation '{action_type}' failed!"
self.log_error(err=err, msg=msg)
raise
[docs] def query(
self,
*,
query: Query,
limit: PositiveInt = None,
order_by: str = None,
response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED,
) -> List[Any]:
"""
Generate api query
Args:
query (Query):
limit (PositiveInt):
order_by (str):
response_format (AttrsFormat, str):
Returns:
The response payload is an Array containing one object per matching
entity, or an empty array [] if no entities are found. The entities
follow the JSON entity representation format (described in the
section "JSON Entity Representation").
"""
url = urljoin(self.base_url, f"{self._url_version}/op/query")
headers = self.headers.copy()
headers.update({"Content-Type": "application/json"})
params = {"options": "count"}
if response_format:
if response_format not in list(AttrsFormat):
raise ValueError(f"Value must be in {list(AttrsFormat)}")
params["options"] = ",".join([response_format, "count"])
try:
items = self.__pagination(
method=PaginationMethod.POST,
url=url,
headers=headers,
params=params,
data=query.model_dump_json(exclude_none=True),
limit=limit,
)
if response_format == AttrsFormat.NORMALIZED:
adapter = TypeAdapter(List[ContextEntity])
return adapter.validate_python(items)
if response_format == AttrsFormat.KEY_VALUES:
adapter = TypeAdapter(List[ContextEntityKeyValues])
return adapter.validate_python(items)
return items
except requests.RequestException as err:
msg = "Query operation failed!"
self.log_error(err=err, msg=msg)
raise
[docs] def notify(self, message: Message) -> None:
"""
This operation is intended to consume a notification payload so that
all the entity data included by such notification is persisted,
overwriting if necessary. This operation is useful when one NGSIv2
endpoint is subscribed to another NGSIv2 endpoint (federation
scenarios). The request payload must be an NGSIv2 notification
payload. The behaviour must be exactly the same as 'update'
with 'action_type' equal to append.
Args:
message: Notification message
Returns:
None
"""
url = urljoin(self.base_url, "v2/op/notify")
headers = self.headers.copy()
headers.update({"Content-Type": "application/json"})
params = {}
try:
res = self.post(
url=url,
headers=headers,
params=params,
data=message.model_dump_json(by_alias=True),
)
if res.ok:
self.logger.info("Notification message sent!")
else:
res.raise_for_status()
except requests.RequestException as err:
msg = (
f"Sending notifcation message failed! \n "
f"{message.model_dump_json(indent=2)}"
)
self.log_error(err=err, msg=msg)
raise
[docs] def post_command(
self,
*,
entity_id: str,
command: Union[Command, NamedCommand, Dict],
entity_type: str = None,
command_name: str = None,
) -> None:
"""
Post a command to a context entity this corresponds to 'PATCH' of the
specified command attribute.
Args:
entity_id: Entity identifier
command: Command
entity_type: Entity type
command_name: Name of the command in the entity
Returns:
None
"""
if command_name:
assert isinstance(command, (Command, dict))
if isinstance(command, dict):
command = Command(**command)
command = {command_name: command.model_dump()}
else:
assert isinstance(command, (NamedCommand, dict))
if isinstance(command, dict):
command = NamedCommand(**command)
self.update_existing_entity_attributes(
entity_id=entity_id, entity_type=entity_type, attrs=[command]
)
[docs] def does_entity_exist(self, entity_id: str, entity_type: str) -> bool:
"""
Test if an entity with given id and type is present in the CB
Args:
entity_id: Entity id
entity_type: Entity type
Returns:
bool; True if entity exists
Raises:
RequestException, if any error occurs (e.g: No Connection),
except that the entity is not found
"""
url = urljoin(self.base_url, f"v2/entities/{entity_id}")
headers = self.headers.copy()
params = {"type": entity_type}
try:
res = self.get(url=url, params=params, headers=headers)
if res.ok:
return True
res.raise_for_status()
except requests.RequestException as err:
if err.response is None or not err.response.status_code == 404:
self.log_error(err=err, msg="Checking entity existence failed!")
raise
return False
[docs] def patch_entity(
self,
entity: ContextEntity,
old_entity: Optional[ContextEntity] = None,
override_attr_metadata: bool = True,
) -> None:
"""
Takes a given entity and updates the state in the CB to match it.
It is an extended equivalent to the HTTP method PATCH, which applies
partial modifications to a resource.
Args:
entity: Entity to update
old_entity: OPTIONAL, if given only the differences between the
old_entity and entity are updated in the CB.
Other changes made to the entity in CB, can be kept.
If type or id was changed, the old_entity will be
deleted.
override_attr_metadata:
Whether to override or append the attributes metadata.
`True` for overwrite or `False` for update/append
Returns:
None
"""
new_entity = entity
if old_entity is None:
# If no old entity_was provided we use the current state to compare
# the entity to
if self.does_entity_exist(
entity_id=new_entity.id, entity_type=new_entity.type
):
old_entity = self.get_entity(
entity_id=new_entity.id, entity_type=new_entity.type
)
else:
# the entity is new, post and finish
self.post_entity(new_entity, update=False)
return
else:
# An old_entity was provided
# check if the old_entity (still) exists else recall methode
# and discard old_entity
if not self.does_entity_exist(
entity_id=old_entity.id, entity_type=old_entity.type
):
self.patch_entity(
new_entity, override_attr_metadata=override_attr_metadata
)
return
# if type or id was changed, the old_entity needs to be deleted
# and the new_entity created
# In this case we will lose the current state of the entity
if old_entity.id != new_entity.id or old_entity.type != new_entity.type:
self.delete_entity(entity_id=old_entity.id, entity_type=old_entity.type)
if not self.does_entity_exist(
entity_id=new_entity.id, entity_type=new_entity.type
):
self.post_entity(entity=new_entity, update=False)
return
# At this point we know that we need to patch only the attributes of
# the entity
# Check the differences between the attributes of old and new entity
# Delete the removed attributes, create the new ones,
# and update the existing if necessary
old_attributes = old_entity.get_attributes()
new_attributes = new_entity.get_attributes()
# Manage attributes that existed before
for old_attr in old_attributes:
# commands do not exist in the ContextEntity and are only
# registrations to the corresponding device. Operations as
# delete will fail as it does not technically exist
corresponding_new_attr = None
for new_attr in new_attributes:
if new_attr.name == old_attr.name:
corresponding_new_attr = new_attr
if corresponding_new_attr is None:
# Attribute no longer exists, delete it
try:
self.delete_entity_attribute(
entity_id=new_entity.id,
entity_type=new_entity.type,
attr_name=old_attr.name,
)
except requests.RequestException as err:
msg = (
f"Failed to delete attribute {old_attr.name} of "
f"entity {new_entity.id}."
)
if err.response is not None and err.response.status_code == 404:
# if the attribute is provided by a registration the
# deletion will fail
msg += (
f" The attribute is probably provided "
f"by a registration."
)
self.log_error(err=err, msg=msg)
else:
self.log_error(err=err, msg=msg)
raise
else:
# Check if attributed changed in any way, if yes update
# else do nothing and keep current state
if old_attr != corresponding_new_attr:
try:
self.update_entity_attribute(
entity_id=new_entity.id,
entity_type=new_entity.type,
attr=corresponding_new_attr,
override_metadata=override_attr_metadata,
)
except requests.RequestException as err:
msg = (
f"Failed to update attribute {old_attr.name} of "
f"entity {new_entity.id}."
)
if err.response is not None and err.response.status_code == 404:
# if the attribute is provided by a registration the
# update will fail
msg += (
f" The attribute is probably provided "
f"by a registration."
)
self.log_error(err=err, msg=msg)
raise
# Create new attributes
update_entity = ContextEntity(id=entity.id, type=entity.type)
update_needed = False
for new_attr in new_attributes:
# commands do not exist in the ContextEntity and are only
# registrations to the corresponding device. Operations as
# delete will fail as it does not technically exists
attr_existed = False
for old_attr in old_attributes:
if new_attr.name == old_attr.name:
attr_existed = True
if not attr_existed:
update_needed = True
update_entity.add_attributes([new_attr])
if update_needed:
self.update_entity(update_entity)
def _subscription_dicts_are_equal(self, first: dict, second: dict):
"""
Check if two dictionaries and all sub-dictionaries are equal.
Logs a warning if the keys are not equal, but ignores the
comparison of such keys.
Args:
first dict: Dictionary of first subscription
second dict: Dictionary of second subscription
Returns:
True if equal, else False
"""
def _value_is_not_none(value):
"""
Recursive function to check if a value equals none.
If the value is a dict and any value of the dict is not none,
the value is not none.
If the value is a list and any item is not none, the value is not none.
If it's neither dict nore list, bool is used.
"""
if isinstance(value, dict):
return any([_value_is_not_none(value=_v) for _v in value.values()])
if isinstance(value, list):
return any([_value_is_not_none(value=_v) for _v in value])
else:
return bool(value)
if first.keys() != second.keys():
warnings.warn(
"Subscriptions contain a different set of fields. "
"Only comparing to new fields of the new one."
)
for k, v in first.items():
ex_value = second.get(k, None)
if isinstance(v, dict) and isinstance(ex_value, dict):
equal = self._subscription_dicts_are_equal(v, ex_value)
if equal:
continue
else:
return False
if v != ex_value:
self.logger.debug(f"Not equal fields for key {k}: ({v}, {ex_value})")
if (
not _value_is_not_none(v)
and not _value_is_not_none(ex_value)
or k == "timesSent"
):
continue
return False
return True
#
#
# def check_duplicate_subscription(self, subscription_body, limit: int = 20):
# """
# Function compares the subject of the subscription body, on whether a subscription
# already exists for a device / entity.
# :param subscription_body: the body of the new subscripton
# :param limit: pagination parameter, to set the number of
# subscriptions bodies the get request should grab
# :return: exists, boolean -> True, if such a subscription allready
# exists
# """
# exists = False
# subscription_subject = json.loads(subscription_body)["subject"]
# # Exact keys depend on subscription body
# try:
# subscription_url = json.loads(subscription_body)[
# "notification"]["httpCustom"]["url"]
# except KeyError:
# subscription_url = json.loads(subscription_body)[
# "notification"]["http"]["url"]
#
# # If the number of subscriptions is larger then the limit,
# paginations methods have to be used
# url = self.url + '/v2/subscriptions?limit=' + str(limit) +
# '&options=count'
# response = self.session.get(url, headers=self.get_header())
#
# sub_count = float(response.headers["Fiware-Total-Count"])
# response = json.loads(response.text)
# if sub_count >= limit:
# response = self.get_pagination(url=url, headers=self.get_header(),
# limit=limit, count=sub_count)
# response = json.loads(response)
#
# for existing_subscription in response:
# # check whether the exact same subscriptions already exists
# if existing_subscription["subject"] == subscription_subject:
# exists = True
# break
# try:
# existing_url = existing_subscription["notification"][
# "http"]["url"]
# except KeyError:
# existing_url = existing_subscription["notification"][
# "httpCustom"]["url"]
# # check whether both subscriptions notify to the same path
# if existing_url != subscription_url:
# continue
# else:
# # iterate over all entities included in the subscription object
# for entity in subscription_subject["entities"]:
# if 'type' in entity.keys():
# subscription_type = entity['type']
# else:
# subscription_type = entity['typePattern']
# if 'id' in entity.keys():
# subscription_id = entity['id']
# else:
# subscription_id = entity["idPattern"]
# # iterate over all entities included in the exisiting
# subscriptions
# for existing_entity in existing_subscription["subject"][
# "entities"]:
# if "type" in entity.keys():
# type_existing = entity["type"]
# else:
# type_existing = entity["typePattern"]
# if "id" in entity.keys():
# id_existing = entity["id"]
# else:
# id_existing = entity["idPattern"]
# # as the ID field is non optional, it has to match
# # check whether the type match
# # if the type field is empty, they match all types
# if (type_existing == subscription_type) or\
# ('*' in subscription_type) or \
# ('*' in type_existing)\
# or (type_existing == "") or (
# subscription_type == ""):
# # check if on of the subscriptions is a pattern,
# or if they both refer to the same id
# # Get the attrs first, to avoid code duplication
# # last thing to compare is the attributes
# # Assumption -> position is the same as the
# entities _list
# # i == j
# i = subscription_subject["entities"].index(entity)
# j = existing_subscription["subject"][
# "entities"].index(existing_entity)
# try:
# subscription_attrs = subscription_subject[
# "condition"]["attrs"][i]
# except (KeyError, IndexError):
# subscription_attrs = []
# try:
# existing_attrs = existing_subscription[
# "subject"]["condition"]["attrs"][j]
# except (KeyError, IndexError):
# existing_attrs = []
#
# if (".*" in subscription_id) or ('.*' in
# id_existing) or (subscription_id == id_existing):
# # Attributes have to match, or the have to
# be an empty array
# if (subscription_attrs == existing_attrs) or
# (subscription_attrs == []) or (existing_attrs == []):
# exists = True
# # if they do not match completely or subscribe
# to all ids they have to match up to a certain position
# elif ("*" in subscription_id) or ('*' in
# id_existing):
# regex_existing = id_existing.find('*')
# regex_subscription =
# subscription_id.find('*')
# # slice the strings to compare
# if (id_existing[:regex_existing] in
# subscription_id) or (subscription_id[:regex_subscription] in id_existing) or \
# (id_existing[regex_existing:] in
# subscription_id) or (subscription_id[regex_subscription:] in id_existing):
# if (subscription_attrs ==
# existing_attrs) or (subscription_attrs == []) or (existing_attrs == []):
# exists = True
# else:
# continue
# else:
# continue
# else:
# continue
# else:
# continue
# else:
# continue
# return exists
#