Source code for filip.clients.ngsi_ld.cb

"""
Context Broker Module for API Client
"""

import re
import json
import os
from math import inf
from typing import Any, Dict, List, Union, Optional, Literal
from urllib.parse import urljoin
import requests
from pydantic import TypeAdapter, PositiveInt, PositiveFloat
from filip.clients.base_http_client import BaseHttpClient, NgsiURLVersion
from filip.config import settings
from filip.models.base import FiwareLDHeader, PaginationMethod, core_context
from filip.models.ngsi_v2.base import AttrsFormat
from filip.models.ngsi_ld.subscriptions import SubscriptionLD
from filip.models.ngsi_ld.context import (
    ContextLDEntity,
    ContextLDEntityKeyValues,
    ContextProperty,
    ContextRelationship,
    NamedContextProperty,
    NamedContextRelationship,
    ActionTypeLD,
    UpdateLD,
)
from filip.models.ngsi_v2.context import Query


[docs]class ContextBrokerLDClient(BaseHttpClient): """ Implementation of NGSI-LD Context Broker functionalities, such as creating entities and subscriptions; retrieving, updating and deleting data. Further documentation: https://fiware-orion.readthedocs.io/en/master/ Api specifications for LD are located here: https://www.etsi.org/deliver/etsi_gs/CIM/001_099/009/01.04.01_60/gs_cim009v010401p.pdf """ def __init__( self, url: str = None, *, session: requests.Session = None, fiware_header: FiwareLDHeader = None, **kwargs, ): """ Args: url: Url of context broker server session (requests.Session): fiware_header (FiwareHeader): fiware service and fiware service path **kwargs (Optional): Optional arguments that ``request`` takes. """ # set service url url = url or settings.LD_CB_URL # base_http_client overwrites empty header with FiwareHeader instead of FiwareLD init_header = fiware_header if fiware_header else FiwareLDHeader() if init_header.link_header is None: init_header.set_context(core_context) super().__init__(url=url, session=session, fiware_header=init_header, **kwargs) # set the version specific url-pattern self._url_version = NgsiURLVersion.ld_url.value # For uplink requests, the Content-Type header is essential, # Accept will be ignored # For downlink requests, the Accept header is essential, # Content-Type will be ignored # default uplink content JSON self.headers.update({"Content-Type": "application/json"}) # default downlink content JSON-LD self.headers.update({"Accept": "application/ld+json"}) if init_header.ngsild_tenant is not None: self.__make_tenant() def __pagination( self, *, method: PaginationMethod = PaginationMethod.GET, url: str, headers: Dict, limit: Union[PositiveInt, PositiveFloat] = None, params: Dict = None, data: str = None, ) -> List[Dict]: """ NGSIv2 implements a pagination mechanism in order to help clients to retrieve large sets of resources. This mechanism works for all listing operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions, POST /v2/op/query, etc.). This function helps getting datasets that are larger than the limit for the different GET operations. https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html Args: url: Information about the url, obtained from the original function headers: The headers from the original function params: limit: Returns: object: """ if limit is None: limit = inf if limit > 1000: params["limit"] = 1000 # maximum items per request else: params["limit"] = limit if self.session: session = self.session else: session = requests.Session() with session: res = session.request( method=method, url=url, params=params, headers=headers, data=data ) if res.ok: items = res.json() # do pagination if self._url_version == NgsiURLVersion.v2_url.value: count = int(res.headers["Fiware-Total-Count"]) elif self._url_version == NgsiURLVersion.ld_url.value: count = int(res.headers["NGSILD-Results-Count"]) else: count = 0 while len(items) < limit and len(items) < count: # Establishing the offset from where entities are retrieved params["offset"] = len(items) params["limit"] = min(1000, (limit - len(items))) res = session.request( method=method, url=url, params=params, headers=headers, data=data, ) if res.ok: items.extend(res.json()) else: res.raise_for_status() self.logger.debug("Received: %s", items) return items res.raise_for_status()
[docs] def get_version(self) -> Dict: """ Gets version of Orion-LD context broker Returns: Dictionary with response """ url = urljoin(self.base_url, "/version") try: res = self.get(url=url) if res.ok: return res.json() res.raise_for_status() except requests.RequestException as err: self.logger.error(err) raise
def __make_tenant(self): """ Create tenant if tenant is given in headers """ idhex = f"urn:ngsi-ld:{os.urandom(6).hex()}" e = ContextLDEntity(id=idhex, type=f"urn:ngsi-ld:{os.urandom(6).hex()}") try: self.post_entity(entity=e) self.delete_entity_by_id(idhex) except Exception as err: self.log_error(err=err, msg="Error while creating tenant") raise
[docs] def get_statistics(self) -> Dict: """ Gets statistics of context broker Returns: Dictionary with response """ url = urljoin(self.base_url, "statistics") try: res = self.get(url=url) if res.ok: return res.json() res.raise_for_status() except requests.RequestException as err: self.logger.error(err) raise
[docs] def post_entity( self, entity: ContextLDEntity, append: bool = False, update: bool = False ): """ Function registers an Object with the NGSI-LD Context Broker, if it already exists it can be automatically updated if the update flag bool is True. First a post request with the entity is tried, if the response code is 422 the entity is uncrossable, as it already exists there are two options, either overwrite it, if the attribute have changed (e.g. at least one new/new values) (update = True) or leave it the way it is (update=False) """ url = urljoin(self.base_url, f"{self._url_version}/entities") headers = self.headers.copy() if entity.model_dump().get("@context", None) is not None: headers.update({"Content-Type": "application/ld+json"}) headers.update({"Link": None}) try: res = self.post( url=url, headers=headers, json=entity.model_dump( exclude_unset=True, exclude_defaults=True, exclude_none=True ), ) if res.ok: self.logger.info("Entity successfully posted!") return res.headers.get("Location") res.raise_for_status() except requests.RequestException as err: if err.response is not None and err.response.status_code == 409: if append: # 409 entity already exists return self.append_entity_attributes(entity=entity) elif update: return self.override_entities(entities=[entity]) msg = f"Could not post entity {entity.id}" self.log_error(err=err, msg=msg) raise
[docs] def override_entities(self, entities: List[ContextLDEntity]): """ Function to create or override existing entites with the NGSI-LD Context Broker. The batch operation with Upsert will be used. """ return self.entity_batch_operation( entities=entities, action_type=ActionTypeLD.UPSERT, options="replace" )
[docs] def get_entity( self, entity_id: str, entity_type: str = None, attrs: List[str] = None, options: Optional[str] = None, geometryProperty: Optional[str] = None, ) -> Union[ContextLDEntity, ContextLDEntityKeyValues, Dict[str, Any]]: """ This operation must return one entity element only, but there may be more than one entity with the same ID (e.g. entities with same ID but different types). In such case, an error message is returned, with the HTTP status code set to 409 Conflict. Args: entity_id (String): Id of the entity to be retrieved entity_type (String): Entity type, to avoid ambiguity in case there are several entities with the same entity id. attrs (List of Strings): List of attribute names whose data must be included in the response. The attributes are retrieved in the order specified by this parameter. See "Filtering out attributes and metadata" section for more detail. If this parameter is not included, the attributes are retrieved in arbitrary order, and all the attributes of the entity are included in the response. Example: temperature, humidity. options (String): keyValues (simplified representation of entity) or sysAttrs (include generated attrs createdAt and modifiedAt) geometryProperty (String): Name of a GeoProperty. In the case of GeoJSON Entity representation, this parameter indicates which GeoProperty to use for the "geometry" element. By default, it shall be 'location'. Returns: ContextEntity """ url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}") headers = self.headers.copy() params = {} if entity_type: params.update({"type": entity_type}) if attrs: params.update({"attrs": ",".join(attrs)}) if geometryProperty: params.update({"geometryProperty": geometryProperty}) if options: if options != "keyValues" and options != "sysAttrs": raise ValueError( f"Only available options are 'keyValues' and 'sysAttrs'" ) params.update({"options": options}) try: res = self.get(url=url, params=params, headers=headers) if res.ok: self.logger.info("Entity successfully retrieved!") self.logger.debug("Received: %s", res.json()) if options == "keyValues": return ContextLDEntityKeyValues(**res.json()) else: return ContextLDEntity(**res.json()) res.raise_for_status() except requests.RequestException as err: msg = f"Could not load entity {entity_id}" self.log_error(err=err, msg=msg) raise
GeometryShape = Literal[ "Point", "MultiPoint", "LineString", "MultiLineString", "Polygon", "MultiPolygon", ]
[docs] def get_entity_list( self, entity_id: Optional[str] = None, id_pattern: Optional[str] = ".*", entity_type: Optional[str] = None, attrs: Optional[List[str]] = None, q: Optional[str] = None, georel: Optional[str] = None, geometry: Optional[GeometryShape] = None, coordinates: Optional[str] = None, geoproperty: Optional[str] = None, # csf: Optional[str] = None, # Context Source Filter limit: Optional[PositiveInt] = None, options: Optional[str] = None, ) -> List[Union[ContextLDEntity, ContextLDEntityKeyValues]]: """ This operation retrieves a list of entities based on different query options. By default, the operation retrieves all the entities in the context broker. Args: entity_id: Id of the entity to be retrieved id_pattern: Regular expression to match the entity id entity_type: Entity type, to avoid ambiguity in case there are several entities with the same entity id. attrs: List of attribute names whose data must be included in the response. q: Query expression, composed of attribute names, operators and values. georel: Geospatial relationship between the query geometry and the entities. geometry: Type of geometry for the query. The possible values are Point, MultiPoint, LineString, MultiLineString, Polygon, MultiPolygon. coordinates: Coordinates of the query geometry. The coordinates must be expressed as a string of comma-separated values. geoproperty: Name of a GeoProperty. In the case of GeoJSON Entity representation, this parameter indicates which GeoProperty to use for the "geometry" element. limit: Maximum number of entities to retrieve. options: Further options for the query. The available options are: - keyValues (simplified representation of entity) - sysAttrs (including createdAt and modifiedAt, etc.) - count (include number of all matched entities in response header) """ url = urljoin(self.base_url, f"{self._url_version}/entities/") headers = self.headers.copy() params = {} if entity_id: params.update({"id": entity_id}) if id_pattern: params.update({"idPattern": id_pattern}) if entity_type: params.update({"type": entity_type}) if attrs: params.update({"attrs": ",".join(attrs)}) if q: x = re.search(r"[=!<>~]{1}\'.*\'", q.replace(" ", "")) if x is not None: raise ValueError( f"String/Date/etc. value in {x.group()} must be " f"in double quote" ) params.update({"q": q}) if georel: params.update({"georel": georel}) if geometry: params.update({"geometry": geometry}) if coordinates: params.update({"coordinates": coordinates}) if geoproperty: params.update({"geoproperty": geoproperty}) # if csf: # ContextSourceRegistration not supported yet # params.update({'csf': csf}) if limit: if limit > 1000: raise ValueError("limit must be an integer value <= 1000") params.update({"limit": limit}) if options: if options != "keyValues" and options != "sysAttrs": raise ValueError( f"Only available options are 'keyValues' and 'sysAttrs'" ) params.update({"options": options}) # params.update({'local': 'true'}) try: res = self.get(url=url, params=params, headers=headers) if res.ok: self.logger.info("Entity successfully retrieved!") entity_list: List[Union[ContextLDEntity, ContextLDEntityKeyValues]] = [] if options == "keyValues": entity_list = [ ContextLDEntityKeyValues(**item) for item in res.json() ] return entity_list else: entity_list = [ContextLDEntity(**item) for item in res.json()] return entity_list res.raise_for_status() except requests.RequestException as err: msg = f"Could not load entity matching{params}" self.log_error(err=err, msg=msg) raise
[docs] def replace_existing_attributes_of_entity( self, entity: ContextLDEntity, append: bool = False ): """ The attributes previously existing in the entity are removed and replaced by the ones in the request. Args: entity (ContextEntity): append (bool): options: Returns: """ url = urljoin(self.base_url, f"{self._url_version}/entities/{entity.id}/attrs") headers = self.headers.copy() if entity.model_dump().get("@context", None) is not None: headers.update({"Content-Type": "application/ld+json"}) headers.update({"Link": None}) try: res = self.patch( url=url, headers=headers, json=entity.model_dump( exclude={"id", "type"}, exclude_unset=True, exclude_none=True ), ) if res.ok: self.logger.info(f"Entity {entity.id} successfully " "updated!") else: res.raise_for_status() except requests.RequestException as err: if err.response is not None and append and err.response.status_code == 207: return self.append_entity_attributes(entity=entity) msg = f"Could not replace attribute of entity {entity.id} !" self.log_error(err=err, msg=msg) raise
[docs] def update_entity_attribute( self, entity_id: str, attr: Union[ ContextProperty, ContextRelationship, NamedContextProperty, NamedContextRelationship, ], attr_name: str = None, ): """ Updates a specified attribute from an entity. Args: attr: context attribute to update entity_id: Id of the entity. Example: Bcn_Welt entity_type: Entity type, to avoid ambiguity in case there are several entities with the same entity id. """ headers = self.headers.copy() if not isinstance(attr, NamedContextProperty) or not isinstance( attr, NamedContextRelationship ): assert attr_name is not None, ( "Missing name for attribute. " "attr_name must be present if" "attr is of type ContextAttribute" ) else: assert attr_name is None, ( "Invalid argument attr_name. Do not set " "attr_name if attr is of type " "NamedContextAttribute or NamedContextRelationship" ) url = urljoin( self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}" ) jsonnn = {} if isinstance(attr, list) or isinstance(attr, NamedContextProperty): jsonnn = attr.model_dump( exclude={"name"}, exclude_unset=True, exclude_none=True ) else: prop = attr.model_dump() for key, value in prop.items(): if value and value != "Property": jsonnn[key] = value try: res = self.patch(url=url, headers=headers, json=jsonnn) if res.ok: self.logger.info( f"Attribute {attr_name} of {entity_id} successfully updated!" ) else: res.raise_for_status() except requests.RequestException as err: msg = f"Could not update attribute '{attr_name}' of entity {entity_id}" self.log_error(err=err, msg=msg) raise
[docs] def append_entity_attributes( self, entity: ContextLDEntity, options: Optional[str] = None ): """ Append new Entity attributes to an existing Entity within an NGSI-LD system Args: entity (ContextLDEntity): Entity to append attributes to. options (str): Options for the request. The only available value is 'noOverwrite'. If set, it will raise 400, if all attributes exist already. """ url = urljoin(self.base_url, f"{self._url_version}/entities/{entity.id}/attrs") headers = self.headers.copy() if entity.model_dump().get("@context", None) is not None: headers.update({"Content-Type": "application/ld+json"}) headers.update({"Link": None}) params = {} if options: if options != "noOverwrite": raise ValueError(f"The only available value is 'noOverwrite'") params.update({"options": options}) try: res = self.post( url=url, headers=headers, params=params, json=entity.model_dump( exclude={"id", "type"}, exclude_unset=True, exclude_none=True ), ) if res.ok: self.logger.info(f"Entity {entity.id} successfully updated!") else: res.raise_for_status() except requests.RequestException as err: msg = f"Could not update entity {entity.id}!" self.log_error(err=err, msg=msg) raise
# def update_existing_attribute_by_name(self, entity: ContextLDEntity # ): # pass
[docs] def delete_entity_by_id(self, entity_id: str, entity_type: Optional[str] = None): """ Deletes an entity by its id. For deleting mulitple entities at once, entity_batch_operation() is more efficient. Args: entity_id: ID of entity to delete. entity_type: Type of entity to delete. """ url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}") headers = self.headers.copy() params = {} if entity_type: params.update({"type": entity_type}) try: res = self.delete(url=url, headers=headers, params=params) if res.ok: self.logger.info(f"Entity {entity_id} successfully deleted") else: res.raise_for_status() except requests.RequestException as err: msg = f"Could not delete entity {entity_id}" self.log_error(err=err, msg=msg) raise
[docs] def delete_attribute(self, entity_id: str, attribute_id: str): """ Deletes an attribute from an entity. Args: entity_id: ID of the entity. attribute_id: Name of the attribute to delete. Returns: """ url = urljoin( self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attribute_id}", ) headers = self.headers.copy() try: res = self.delete(url=url, headers=headers) if res.ok: self.logger.info( f"Attribute {attribute_id} of Entity {entity_id} successfully deleted" ) else: res.raise_for_status() except requests.RequestException as err: msg = f"Could not delete attribute {attribute_id} of entity {entity_id}" self.log_error(err=err, msg=msg) raise
# SUBSCRIPTION API ENDPOINTS
[docs] def get_subscription_list(self, limit: PositiveInt = inf) -> List[SubscriptionLD]: """ Returns a list of all the subscriptions present in the system. Args: limit: Limit the number of subscriptions to be retrieved Returns: list of subscriptions """ url = urljoin(self.base_url, f"{self._url_version}/subscriptions/") headers = self.headers.copy() params = {} # We always use the 'count' option to check weather pagination is # required params.update({"options": "count"}) try: items = self.__pagination( limit=limit, url=url, params=params, headers=headers ) adapter = TypeAdapter(List[SubscriptionLD]) return adapter.validate_python(items) except requests.RequestException as err: msg = "Could not load subscriptions!" self.log_error(err=err, msg=msg) raise
[docs] def post_subscription( self, subscription: SubscriptionLD, update: bool = False ) -> str: """ Creates a new subscription. The subscription is represented by a Subscription object defined in filip.cb.models. If the subscription already exists, the adding is prevented and the id of the existing subscription is returned. A subscription is deemed as already existing if there exists a subscription with the exact same subject and notification fields. All optional fields are not considered. Args: subscription: Subscription update: True - If the subscription already exists, update it False- If the subscription already exists, throw warning Returns: str: Id of the (created) subscription """ existing_subscriptions = self.get_subscription_list() sub_hash = subscription.model_dump_json( include={"subject", "notification", "type"} ) for ex_sub in existing_subscriptions: if sub_hash == ex_sub.model_dump_json( include={"subject", "notification", "type"} ): self.logger.info("Subscription already exists") if update: self.logger.info("Updated subscription") subscription.id = ex_sub.id self.update_subscription(subscription) else: self.logger.warning( f"Subscription existed already with the id" f" {ex_sub.id}" ) return ex_sub.id url = urljoin(self.base_url, f"{self._url_version}/subscriptions") headers = self.headers.copy() if subscription.model_dump().get("@context", None) is not None: headers.update({"Content-Type": "application/ld+json"}) headers.update({"Link": None}) try: res = self.post( url=url, headers=headers, data=subscription.model_dump_json( exclude_unset=False, exclude_defaults=False, exclude_none=True ), ) if res.ok: self.logger.info("Subscription successfully created!") return res.headers["Location"].split("/")[-1] res.raise_for_status() except requests.RequestException as err: msg = "Could not send subscription!" self.log_error(err=err, msg=msg) raise
[docs] def get_subscription(self, subscription_id: str) -> SubscriptionLD: """ Retrieves a subscription from the context broker. Args: subscription_id: id of the subscription Returns: """ url = urljoin( self.base_url, f"{self._url_version}/subscriptions/{subscription_id}" ) headers = self.headers.copy() try: res = self.get(url=url, headers=headers) if res.ok: self.logger.debug("Received: %s", res.json()) return SubscriptionLD(**res.json()) res.raise_for_status() except requests.RequestException as err: msg = f"Could not load subscription {subscription_id}" self.log_error(err=err, msg=msg) raise
[docs] def update_subscription(self, subscription: SubscriptionLD) -> None: """ Only the fields included in the request are updated in the subscription. Args: subscription: Subscription to update Returns: """ url = urljoin( self.base_url, f"{self._url_version}/subscriptions/{subscription.id}" ) headers = self.headers.copy() if subscription.model_dump().get("@context", None) is not None: headers.update({"Content-Type": "application/ld+json"}) headers.update({"Link": None}) try: res = self.patch( url=url, headers=headers, data=subscription.model_dump_json( exclude={"id"}, exclude_unset=True, exclude_defaults=True, exclude_none=True, ), ) if res.ok: self.logger.info("Subscription successfully updated!") else: res.raise_for_status() except requests.RequestException as err: msg = f"Could not update subscription {subscription.id}" self.log_error(err=err, msg=msg) raise
[docs] def delete_subscription(self, subscription_id: str) -> None: """ Deletes a subscription from a Context Broker Args: subscription_id: id of the subscription """ url = urljoin( self.base_url, f"{self._url_version}/subscriptions/{subscription_id}" ) headers = self.headers.copy() try: res = self.delete(url=url, headers=headers) if res.ok: self.logger.info( f"Subscription '{subscription_id}' " f"successfully deleted!" ) else: res.raise_for_status() except requests.RequestException as err: msg = f"Could not delete subscription {subscription_id}" self.log_error(err=err, msg=msg) raise
[docs] def log_multi_errors(self, errors: List[Dict]) -> None: for error in errors: entity_id = error["entityId"] error_details: dict = error["error"] error_title = error_details.get("title") error_status = error_details.get("status") # error_detail = error_details['detail'] self.logger.error( "Response status: %d, Entity: %s, Reason: %s", error_status, entity_id, error_title, )
[docs] def handle_multi_status_response(self, res: requests.Response): """ Handles the response of a batch_operation. If the response contains errors, they are logged. If the response contains only errors, a RuntimeError is raised. Args: res: Returns: """ try: res.raise_for_status() if res.text: response_data = res.json() if "errors" in response_data: errors = response_data["errors"] self.log_multi_errors(errors) if "success" in response_data: successList = response_data["success"] if len(successList) == 0: raise RuntimeError( "Batch operation resulted in errors only, see logs" ) else: self.logger.info("Empty response received.") except json.JSONDecodeError: self.logger.info( "Error decoding JSON. Response may not be in valid JSON format." )
# Batch operation API
[docs] def entity_batch_operation( self, *, entities: List[ContextLDEntity], action_type: Union[ActionTypeLD, str], options: Literal["noOverwrite", "replace", "update"] = None, ) -> None: """ This operation allows to create, update and/or delete several entities in a single batch operation. This operation is split in as many individual operations as entities in the entities vector, so the actionType is executed for each one of them. Depending on the actionType, a mapping with regular non-batch operations can be done: append: maps to POST /v2/entities (if the entity does not already exist) or POST /v2/entities/<id>/attrs (if the entity already exists). appendStrict: maps to POST /v2/entities (if the entity does not already exist) or POST /v2/entities/<id>/attrs?options=append (if the entity already exists). update: maps to PATCH /v2/entities/<id>/attrs. delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every attribute included in the entity or to DELETE /v2/entities/<id> if no attribute were included in the entity. replace: maps to PUT /v2/entities/<id>/attrs. Args: entities: "an array of entities, each entity specified using the " "JSON entity representation format " action_type (Update): "actionType, to specify the kind of update action to do: either append, appendStrict, update, delete, or replace. " options (str): Optional 'noOverwrite' 'replace' 'update' Returns: """ url = urljoin( self.base_url, f"{self._url_version}/entityOperations/{action_type.value}" ) headers = self.headers.copy() headers.update({"Content-Type": "application/json"}) params = {} if options: params.update({"options": options}) update = UpdateLD(entities=entities) try: if action_type == ActionTypeLD.DELETE: id_list = [entity.id for entity in entities] res = self.post( url=url, headers=headers, params=params, data=json.dumps(id_list) ) else: res = self.post( url=url, headers=headers, params=params, data=json.dumps( update.model_dump( by_alias=True, exclude_unset=True, exclude_none=True, ).get("entities") ), ) self.handle_multi_status_response(res) except RuntimeError as rerr: raise rerr except Exception as err: raise err else: self.logger.info(f"Update operation {action_type} succeeded!")