Coverage for filip/clients/ngsi_ld/cb.py: 84%
328 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-10 13:57 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-10 13:57 +0000
1"""
2Context Broker Module for API Client
3"""
5import re
6import json
7import os
8from math import inf
9from typing import Any, Dict, List, Union, Optional, Literal
10from urllib.parse import urljoin
11import requests
12from pydantic import TypeAdapter, PositiveInt, PositiveFloat
13from filip.clients.base_http_client import BaseHttpClient, NgsiURLVersion
14from filip.config import settings
15from filip.models.base import FiwareLDHeader, PaginationMethod, core_context
16from filip.models.ngsi_v2.base import AttrsFormat
17from filip.models.ngsi_ld.subscriptions import SubscriptionLD
18from filip.models.ngsi_ld.context import (
19 DataTypeLD,
20 ContextLDEntity,
21 ContextLDEntityKeyValues,
22 ContextProperty,
23 ContextRelationship,
24 NamedContextProperty,
25 NamedContextRelationship,
26 ActionTypeLD,
27 UpdateLD,
28)
29from filip.models.ngsi_v2.context import Query
32class ContextBrokerLDClient(BaseHttpClient):
33 """
34 Implementation of NGSI-LD Context Broker functionalities, such as creating
35 entities and subscriptions; retrieving, updating and deleting data.
36 Further documentation:
37 https://fiware-orion.readthedocs.io/en/master/
39 Api specifications for LD are located here:
40 https://www.etsi.org/deliver/etsi_gs/CIM/001_099/009/01.04.01_60/gs_cim009v010401p.pdf
41 """
43 def __init__(
44 self,
45 url: str = None,
46 *,
47 session: requests.Session = None,
48 fiware_header: FiwareLDHeader = None,
49 **kwargs,
50 ):
51 """
53 Args:
54 url: Url of context broker server
55 session (requests.Session):
56 fiware_header (FiwareHeader): fiware service and fiware service path
57 **kwargs (Optional): Optional arguments that ``request`` takes.
58 """
59 # set service url
60 url = url or settings.LD_CB_URL
61 # base_http_client overwrites empty header with FiwareHeader instead of FiwareLD
62 init_header = fiware_header if fiware_header else FiwareLDHeader()
63 if init_header.link_header is None:
64 init_header.set_context(core_context)
65 super().__init__(url=url, session=session, fiware_header=init_header, **kwargs)
66 # set the version specific url-pattern
67 self._url_version = NgsiURLVersion.ld_url.value
68 # For uplink requests, the Content-Type header is essential,
69 # Accept will be ignored
70 # For downlink requests, the Accept header is essential,
71 # Content-Type will be ignored
73 # default uplink content JSON
74 self.headers.update({"Content-Type": "application/json"})
75 # default downlink content JSON-LD
76 self.headers.update({"Accept": "application/ld+json"})
78 if init_header.ngsild_tenant is not None:
79 self.__make_tenant()
81 def __pagination(
82 self,
83 *,
84 method: PaginationMethod = PaginationMethod.GET,
85 url: str,
86 headers: Dict,
87 limit: Union[PositiveInt, PositiveFloat] = None,
88 params: Dict = None,
89 data: str = None,
90 ) -> List[Dict]:
91 """
92 NGSIv2 implements a pagination mechanism in order to help clients to
93 retrieve large sets of resources. This mechanism works for all listing
94 operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions,
95 POST /v2/op/query, etc.). This function helps getting datasets that are
96 larger than the limit for the different GET operations.
98 https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html
100 Args:
101 url: Information about the url, obtained from the original function
102 headers: The headers from the original function
103 params:
104 limit:
106 Returns:
107 object:
109 """
110 if limit is None:
111 limit = inf
112 if limit > 1000:
113 params["limit"] = 1000 # maximum items per request
114 else:
115 params["limit"] = limit
116 # add count option if not present
117 if "count" not in params:
118 params.update({"count": "true"})
120 if self.session:
121 session = self.session
122 else:
123 session = requests.Session()
124 with session:
125 res = session.request(
126 method=method, url=url, params=params, headers=headers, data=data
127 )
128 if res.ok:
129 items = res.json()
130 # do pagination
131 count = int(res.headers["NGSILD-Results-Count"])
133 while len(items) < limit and len(items) < count:
134 # Establishing the offset from where entities are retrieved
135 params["offset"] = len(items)
136 params["limit"] = min(1000, (limit - len(items)))
137 res = session.request(
138 method=method,
139 url=url,
140 params=params,
141 headers=headers,
142 data=data,
143 )
144 if res.ok:
145 items.extend(res.json())
146 else:
147 res.raise_for_status()
148 self.logger.debug("Received: %s", items)
149 return items
150 res.raise_for_status()
152 def get_version(self) -> Dict:
153 """
154 Gets version of Orion-LD context broker
155 Returns:
156 Dictionary with response
157 """
158 url = urljoin(self.base_url, "/version")
159 try:
160 res = self.get(url=url)
161 if res.ok:
162 return res.json()
163 res.raise_for_status()
164 except requests.RequestException as err:
165 self.logger.error(err)
166 raise
168 def __make_tenant(self):
169 """
170 Create tenant if tenant
171 is given in headers
172 """
173 idhex = f"urn:ngsi-ld:{os.urandom(6).hex()}"
174 e = ContextLDEntity(id=idhex, type="TemporaryTenant")
175 try:
176 self.post_entity(entity=e)
177 self.delete_entity_by_id(idhex)
178 except Exception as err:
179 self.log_error(err=err, msg="Error while creating tenant")
180 raise
182 def get_statistics(self) -> Dict:
183 """
184 Gets statistics of context broker
185 Returns:
186 Dictionary with response
187 """
188 url = urljoin(self.base_url, "statistics")
189 try:
190 res = self.get(url=url)
191 if res.ok:
192 return res.json()
193 res.raise_for_status()
194 except requests.RequestException as err:
195 self.logger.error(err)
196 raise
198 def post_entity(
199 self,
200 entity: Union[ContextLDEntity, ContextLDEntityKeyValues],
201 append: bool = False,
202 update: bool = False,
203 ):
204 """
205 Function registers an Object with the NGSI-LD Context Broker,
206 if it already exists it can be automatically updated
207 if the update flag bool is True.
208 First a post request with the entity is tried, if the response code
209 is 422 the entity is uncrossable, as it already exists there are two
210 options, either overwrite it, if the attribute have changed
211 (e.g. at least one new/new values) (update = True) or leave
212 it the way it is (update=False)
214 """
215 url = urljoin(self.base_url, f"{self._url_version}/entities")
216 headers = self.headers.copy()
217 if isinstance(entity, ContextLDEntityKeyValues):
218 entity = entity.to_entity()
219 if entity.model_dump().get("@context", None) is not None:
220 headers.update({"Content-Type": "application/ld+json"})
221 headers.update({"Link": None})
222 try:
223 res = self.post(
224 url=url,
225 headers=headers,
226 json=entity.model_dump(
227 exclude_unset=False, exclude_defaults=False, exclude_none=True
228 ),
229 )
230 if res.ok:
231 self.logger.info("Entity successfully posted!")
232 return res.headers.get("Location")
233 res.raise_for_status()
234 except requests.RequestException as err:
235 if err.response is not None and err.response.status_code == 409:
236 if append: # 409 entity already exists
237 return self.append_entity_attributes(entity=entity)
238 elif update:
239 return self.override_entities(entities=[entity])
240 msg = f"Could not post entity {entity.id}"
241 self.log_error(err=err, msg=msg)
242 raise
244 def override_entities(
245 self, entities: List[Union[ContextLDEntity, ContextLDEntityKeyValues]]
246 ):
247 """
248 Function to create or override existing entites with the NGSI-LD Context Broker.
249 The batch operation with Upsert will be used.
250 """
251 return self.entity_batch_operation(
252 entities=entities, action_type=ActionTypeLD.UPSERT, options="replace"
253 )
255 def get_entity(
256 self,
257 entity_id: str,
258 entity_type: str = None,
259 attrs: List[str] = None,
260 options: Optional[str] = None,
261 geometryProperty: Optional[str] = None,
262 ) -> Union[ContextLDEntity, ContextLDEntityKeyValues, Dict[str, Any]]:
263 """
264 This operation must return one entity element only, but there may be
265 more than one entity with the same ID (e.g. entities with same ID but
266 different types). In such case, an error message is returned, with
267 the HTTP status code set to 409 Conflict.
269 Args:
270 entity_id (String): Id of the entity to be retrieved
271 entity_type (String): Entity type, to avoid ambiguity in case
272 there are several entities with the same entity id.
273 attrs (List of Strings): List of attribute names whose data must be
274 included in the response. The attributes are retrieved in the
275 order specified by this parameter.
276 See "Filtering out attributes and metadata" section in https://fiware-orion.readthedocs.io/en/master/orion-api.html#filtering-out-attributes-and-metadata for more
277 detail. If this parameter is not included, the attributes are
278 retrieved in arbitrary order, and all the attributes of the
279 entity are included in the response.
280 Example: temperature, humidity.
281 options (String): keyValues (simplified representation of entity)
282 or sysAttrs (include generated attrs createdAt and modifiedAt)
283 geometryProperty (String): Name of a GeoProperty. In the case of GeoJSON
284 Entity representation, this parameter indicates which GeoProperty to
285 use for the "geometry" element. By default, it shall be 'location'.
286 Returns:
287 ContextEntity
288 """
289 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}")
290 headers = self.headers.copy()
291 params = {}
292 if entity_type:
293 params.update({"type": entity_type})
294 if attrs:
295 params.update({"attrs": ",".join(attrs)})
296 if geometryProperty:
297 params.update({"geometryProperty": geometryProperty})
298 if options:
299 if options != "keyValues" and options != "sysAttrs":
300 raise ValueError(
301 f"Only available options are 'keyValues' and 'sysAttrs'"
302 )
303 params.update({"options": options})
305 try:
306 res = self.get(url=url, params=params, headers=headers)
307 if res.ok:
308 self.logger.info("Entity successfully retrieved!")
309 self.logger.debug("Received: %s", res.json())
310 if options == "keyValues":
311 return ContextLDEntityKeyValues(**res.json())
312 else:
313 return ContextLDEntity(**res.json())
314 res.raise_for_status()
315 except requests.RequestException as err:
316 msg = f"Could not load entity {entity_id}"
317 self.log_error(err=err, msg=msg)
318 raise
320 GeometryShape = Literal[
321 "Point",
322 "MultiPoint",
323 "LineString",
324 "MultiLineString",
325 "Polygon",
326 "MultiPolygon",
327 ]
329 def get_entity_list(
330 self,
331 entity_id: Optional[str] = None,
332 id_pattern: Optional[str] = ".*",
333 entity_type: Optional[str] = None,
334 attrs: Optional[List[str]] = None,
335 q: Optional[str] = None,
336 georel: Optional[str] = None,
337 geometry: Optional[GeometryShape] = None,
338 coordinates: Optional[str] = None,
339 geoproperty: Optional[str] = None,
340 # csf: Optional[str] = None, # Context Source Filter
341 limit: Optional[PositiveInt] = None,
342 options: Optional[str] = None,
343 ) -> List[Union[ContextLDEntity, ContextLDEntityKeyValues]]:
344 """
345 This operation retrieves a list of entities based on different query options.
346 By default, the operation retrieves all the entities in the context broker.
347 Args:
348 entity_id:
349 Id of the entity to be retrieved
350 id_pattern:
351 Regular expression to match the entity id
352 entity_type:
353 Entity type, to avoid ambiguity in case there are several
354 entities with the same entity id.
355 attrs:
356 List of attribute names whose data must be included in the response.
357 q:
358 Query expression, composed of attribute names, operators and values.
359 georel:
360 Geospatial relationship between the query geometry and the entities.
361 geometry:
362 Type of geometry for the query. The possible values are Point,
363 MultiPoint, LineString, MultiLineString, Polygon, MultiPolygon.
364 coordinates:
365 Coordinates of the query geometry. The coordinates must be
366 expressed as a string of comma-separated values.
367 geoproperty:
368 Name of a GeoProperty. In the case of GeoJSON Entity representation,
369 this parameter indicates which GeoProperty to use for the "geometry" element.
370 limit:
371 Maximum number of entities to retrieve.
372 options:
373 Further options for the query. The available options are:
374 - keyValues (simplified representation of entity)
375 - sysAttrs (including createdAt and modifiedAt, etc.)
376 - count (include number of all matched entities in response header)
377 """
378 url = urljoin(self.base_url, f"{self._url_version}/entities/")
379 headers = self.headers.copy()
380 params = {}
381 if entity_id:
382 params.update({"id": entity_id})
383 if id_pattern:
384 params.update({"idPattern": id_pattern})
385 if entity_type:
386 params.update({"type": entity_type})
387 if attrs:
388 params.update({"attrs": ",".join(attrs)})
389 if q:
390 x = re.search(r"[=!<>~]{1}\'.*\'", q.replace(" ", ""))
391 if x is not None:
392 raise ValueError(
393 f"String/Date/etc. value in {x.group()} must be " f"in double quote"
394 )
395 params.update({"q": q})
396 if georel:
397 params.update({"georel": georel})
398 if geometry:
399 params.update({"geometry": geometry})
400 if coordinates:
401 params.update({"coordinates": coordinates})
402 if geoproperty:
403 params.update({"geoproperty": geoproperty})
404 # if csf: # ContextSourceRegistration not supported yet
405 # params.update({'csf': csf})
406 if options:
407 if options != "keyValues" and options != "sysAttrs":
408 raise ValueError(
409 f"Only available options are 'keyValues' and 'sysAttrs'"
410 )
411 params.update({"options": options})
412 # params.update({'local': 'true'})
414 try:
415 # use pagination
416 params.update({"count": "true"})
417 items = self.__pagination(
418 limit=limit, url=url, params=params, headers=headers
419 )
421 self.logger.info("Entity successfully retrieved!")
422 # convert raw data to pydantic models
423 if options == "keyValues":
424 entity_list = [ContextLDEntityKeyValues(**item) for item in items]
425 return entity_list
426 else:
427 entity_list = [ContextLDEntity(**item) for item in items]
428 return entity_list
429 except requests.RequestException as err:
430 msg = f"Could not load entity matching{params}"
431 self.log_error(err=err, msg=msg)
432 raise
434 def replace_existing_attributes_of_entity(
435 self,
436 entity: Union[ContextLDEntity, ContextLDEntityKeyValues],
437 append: bool = False,
438 ):
439 """
440 The attributes previously existing in the entity are removed and
441 replaced by the ones in the request.
443 Args:
444 entity (ContextEntity):
445 append (bool):
446 options:
447 Returns:
449 """
450 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity.id}/attrs")
451 headers = self.headers.copy()
452 if isinstance(entity, ContextLDEntityKeyValues):
453 entity = entity.to_entity()
454 if entity.model_dump().get("@context", None) is not None:
455 headers.update({"Content-Type": "application/ld+json"})
456 headers.update({"Link": None})
457 try:
458 res = self.patch(
459 url=url,
460 headers=headers,
461 json=entity.model_dump(exclude={"id", "type"}, exclude_none=True),
462 )
463 if res.ok:
464 self.logger.info(f"Entity {entity.id} successfully " "updated!")
465 else:
466 res.raise_for_status()
467 except requests.RequestException as err:
468 if err.response is not None and append and err.response.status_code == 207:
469 return self.append_entity_attributes(entity=entity)
470 msg = f"Could not replace attribute of entity {entity.id} !"
471 self.log_error(err=err, msg=msg)
472 raise
474 def update_entity_attribute(
475 self,
476 entity_id: str,
477 attr: Union[
478 ContextProperty,
479 ContextRelationship,
480 NamedContextProperty,
481 NamedContextRelationship,
482 ],
483 attr_name: str = None,
484 ):
485 """
486 Updates a specified attribute from an entity.
487 Args:
488 attr: context attribute to update
489 entity_id: Id of the entity. Example: Bcn_Welt
490 entity_type: Entity type, to avoid ambiguity in case there are
491 several entities with the same entity id.
492 """
493 headers = self.headers.copy()
494 if not isinstance(attr, NamedContextProperty) or not isinstance(
495 attr, NamedContextRelationship
496 ):
497 assert attr_name is not None, (
498 "Missing name for attribute. "
499 "attr_name must be present if"
500 "attr is of type ContextAttribute"
501 )
502 else:
503 assert attr_name is None, (
504 "Invalid argument attr_name. Do not set "
505 "attr_name if attr is of type "
506 "NamedContextAttribute or NamedContextRelationship"
507 )
508 attr_name = attr.name
509 url = urljoin(
510 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
511 )
512 val = attr.value if "value" in attr.model_dump() else attr.object
513 try:
514 res = self.patch(url=url, headers=headers, json={"value": val})
515 if res.ok:
516 self.logger.info(
517 f"Attribute {attr_name} of {entity_id} successfully updated!"
518 )
519 else:
520 res.raise_for_status()
521 except requests.RequestException as err:
522 msg = f"Could not update attribute '{attr_name}' of entity {entity_id}"
523 self.log_error(err=err, msg=msg)
524 raise
526 def append_entity_attributes(
527 self,
528 entity: Union[ContextLDEntity, ContextLDEntityKeyValues],
529 options: Optional[str] = None,
530 ):
531 """
532 Append new Entity attributes to an existing Entity within an NGSI-LD system
533 Args:
534 entity (ContextLDEntity):
535 Entity to append attributes to.
536 options (str):
537 Options for the request. The only available value is
538 'noOverwrite'. If set, it will raise 400, if all attributes
539 exist already.
541 """
542 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity.id}/attrs")
543 headers = self.headers.copy()
544 if entity.model_dump().get("@context", None) is not None:
545 headers.update({"Content-Type": "application/ld+json"})
546 headers.update({"Link": None})
547 params = {}
549 if options:
550 if options != "noOverwrite":
551 raise ValueError(f"The only available value is 'noOverwrite'")
552 params.update({"options": options})
554 try:
555 res = self.post(
556 url=url,
557 headers=headers,
558 params=params,
559 json=entity.model_dump(exclude={"id", "type"}, exclude_none=True),
560 )
561 if res.ok:
562 self.logger.info(f"Entity {entity.id} successfully updated!")
563 else:
564 res.raise_for_status()
565 except requests.RequestException as err:
566 msg = f"Could not update entity {entity.id}!"
567 self.log_error(err=err, msg=msg)
568 raise
570 # def update_existing_attribute_by_name(self, entity: ContextLDEntity
571 # ):
572 # pass
574 def delete_entity_by_id(self, entity_id: str, entity_type: Optional[str] = None):
575 """
576 Deletes an entity by its id. For deleting mulitple entities at once,
577 entity_batch_operation() is more efficient.
578 Args:
579 entity_id:
580 ID of entity to delete.
581 entity_type:
582 Type of entity to delete.
583 """
584 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}")
585 headers = self.headers.copy()
586 params = {}
588 if entity_type:
589 params.update({"type": entity_type})
591 try:
592 res = self.delete(url=url, headers=headers, params=params)
593 if res.ok:
594 self.logger.info(f"Entity {entity_id} successfully deleted")
595 else:
596 res.raise_for_status()
597 except requests.RequestException as err:
598 msg = f"Could not delete entity {entity_id}"
599 self.log_error(err=err, msg=msg)
600 raise
602 def delete_attribute(self, entity_id: str, attribute_id: str):
603 """
604 Deletes an attribute from an entity.
605 Args:
606 entity_id:
607 ID of the entity.
608 attribute_id:
609 Name of the attribute to delete.
610 Returns:
612 """
613 url = urljoin(
614 self.base_url,
615 f"{self._url_version}/entities/{entity_id}/attrs/{attribute_id}",
616 )
617 headers = self.headers.copy()
619 try:
620 res = self.delete(url=url, headers=headers)
621 if res.ok:
622 self.logger.info(
623 f"Attribute {attribute_id} of Entity {entity_id} successfully deleted"
624 )
625 else:
626 res.raise_for_status()
627 except requests.RequestException as err:
628 msg = f"Could not delete attribute {attribute_id} of entity {entity_id}"
629 self.log_error(err=err, msg=msg)
630 raise
632 # SUBSCRIPTION API ENDPOINTS
633 def get_subscription_list(self, limit: PositiveInt = inf) -> List[SubscriptionLD]:
634 """
635 Returns a list of all the subscriptions present in the system.
636 Args:
637 limit: Limit the number of subscriptions to be retrieved
638 Returns:
639 list of subscriptions
640 """
641 url = urljoin(self.base_url, f"{self._url_version}/subscriptions/")
642 headers = self.headers.copy()
643 params = {}
645 # We always use the 'count' option to check weather pagination is
646 # required
647 params.update({"options": "count"})
648 try:
649 items = self.__pagination(
650 limit=limit, url=url, params=params, headers=headers
651 )
652 adapter = TypeAdapter(List[SubscriptionLD])
653 return adapter.validate_python(items)
654 except requests.RequestException as err:
655 msg = "Could not load subscriptions!"
656 self.log_error(err=err, msg=msg)
657 raise
659 def post_subscription(
660 self, subscription: SubscriptionLD, update: bool = False
661 ) -> str:
662 """
663 Creates a new subscription. The subscription is represented by a
664 Subscription object defined in filip.cb.models.
666 If the subscription already exists, the adding is prevented and the id
667 of the existing subscription is returned.
669 A subscription is deemed as already existing if there exists a
670 subscription with the exact same subject and notification fields. All
671 optional fields are not considered.
673 Args:
674 subscription: Subscription
675 update: True - If the subscription already exists, update it
676 False- If the subscription already exists, throw warning
677 Returns:
678 str: Id of the (created) subscription
680 """
681 existing_subscriptions = self.get_subscription_list()
683 sub_hash = subscription.model_dump_json(
684 include={"subject", "notification", "type"}
685 )
686 for ex_sub in existing_subscriptions:
687 if sub_hash == ex_sub.model_dump_json(
688 include={"subject", "notification", "type"}
689 ):
690 self.logger.info("Subscription already exists")
691 if update:
692 self.logger.info("Updated subscription")
693 subscription.id = ex_sub.id
694 self.update_subscription(subscription)
695 else:
696 self.logger.warning(
697 f"Subscription existed already with the id" f" {ex_sub.id}"
698 )
699 return ex_sub.id
701 url = urljoin(self.base_url, f"{self._url_version}/subscriptions")
702 headers = self.headers.copy()
703 if subscription.model_dump().get("@context", None) is not None:
704 headers.update({"Content-Type": "application/ld+json"})
705 headers.update({"Link": None})
706 try:
707 res = self.post(
708 url=url,
709 headers=headers,
710 data=subscription.model_dump_json(
711 exclude_unset=False, exclude_defaults=False, exclude_none=True
712 ),
713 )
714 if res.ok:
715 self.logger.info("Subscription successfully created!")
716 return res.headers["Location"].split("/")[-1]
717 res.raise_for_status()
718 except requests.RequestException as err:
719 msg = "Could not send subscription!"
720 self.log_error(err=err, msg=msg)
721 raise
723 def get_subscription(self, subscription_id: str) -> SubscriptionLD:
724 """
725 Retrieves a subscription from the context broker.
726 Args:
727 subscription_id: id of the subscription
729 Returns:
731 """
732 url = urljoin(
733 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}"
734 )
735 headers = self.headers.copy()
736 try:
737 res = self.get(url=url, headers=headers)
738 if res.ok:
739 self.logger.debug("Received: %s", res.json())
740 return SubscriptionLD(**res.json())
741 res.raise_for_status()
742 except requests.RequestException as err:
743 msg = f"Could not load subscription {subscription_id}"
744 self.log_error(err=err, msg=msg)
745 raise
747 def update_subscription(self, subscription: SubscriptionLD) -> None:
748 """
749 Only the fields included in the request are updated in the subscription.
750 Args:
751 subscription: Subscription to update
752 Returns:
754 """
755 url = urljoin(
756 self.base_url, f"{self._url_version}/subscriptions/{subscription.id}"
757 )
758 headers = self.headers.copy()
759 if subscription.model_dump().get("@context", None) is not None:
760 headers.update({"Content-Type": "application/ld+json"})
761 headers.update({"Link": None})
762 try:
763 res = self.patch(
764 url=url,
765 headers=headers,
766 data=subscription.model_dump_json(
767 exclude={"id"},
768 exclude_none=True,
769 ),
770 )
771 if res.ok:
772 self.logger.info("Subscription successfully updated!")
773 else:
774 res.raise_for_status()
775 except requests.RequestException as err:
776 msg = f"Could not update subscription {subscription.id}"
777 self.log_error(err=err, msg=msg)
778 raise
780 def delete_subscription(self, subscription_id: str) -> None:
781 """
782 Deletes a subscription from a Context Broker
783 Args:
784 subscription_id: id of the subscription
785 """
786 url = urljoin(
787 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}"
788 )
789 headers = self.headers.copy()
790 try:
791 res = self.delete(url=url, headers=headers)
792 if res.ok:
793 self.logger.info(
794 f"Subscription '{subscription_id}' " f"successfully deleted!"
795 )
796 else:
797 res.raise_for_status()
798 except requests.RequestException as err:
799 msg = f"Could not delete subscription {subscription_id}"
800 self.log_error(err=err, msg=msg)
801 raise
803 def log_multi_errors(self, errors: List[Dict]) -> None:
804 for error in errors:
805 entity_id = error["entityId"]
806 error_details: dict = error["error"]
807 error_title = error_details.get("title")
808 error_status = error_details.get("status")
809 # error_detail = error_details['detail']
810 self.logger.error(
811 "Response status: %d, Entity: %s, Reason: %s",
812 error_status,
813 entity_id,
814 error_title,
815 )
817 def handle_multi_status_response(self, res: requests.Response):
818 """
819 Handles the response of a batch_operation. If the response contains
820 errors, they are logged. If the response contains only errors, a RuntimeError
821 is raised.
822 Args:
823 res:
825 Returns:
827 """
828 try:
829 res.raise_for_status()
830 if res.text:
831 response_data = res.json()
832 if "errors" in response_data:
833 errors = response_data["errors"]
834 self.log_multi_errors(errors)
835 if "success" in response_data:
836 successList = response_data["success"]
837 if len(successList) == 0:
838 raise RuntimeError(
839 "Batch operation resulted in errors only, see logs"
840 )
841 else:
842 self.logger.info("Empty response received.")
843 except json.JSONDecodeError:
844 self.logger.info(
845 "Error decoding JSON. Response may not be in valid JSON format."
846 )
848 # Batch operation API
849 def entity_batch_operation(
850 self,
851 *,
852 entities: List[Union[ContextLDEntity, ContextLDEntityKeyValues]],
853 action_type: Union[ActionTypeLD, str],
854 options: Literal["noOverwrite", "replace", "update"] = None,
855 ) -> None:
856 """
857 This operation allows to create, update and/or delete several entities
858 in a single batch operation.
860 This operation is split in as many individual operations as entities
861 in the entities vector, so the actionType is executed for each one of
862 them. Depending on the actionType, a mapping with regular non-batch
863 operations can be done:
865 append: maps to POST /v2/entities (if the entity does not already exist)
866 or POST /v2/entities/<id>/attrs (if the entity already exists).
868 appendStrict: maps to POST /v2/entities (if the entity does not
869 already exist) or POST /v2/entities/<id>/attrs?options=append (if the
870 entity already exists).
872 update: maps to PATCH /v2/entities/<id>/attrs.
874 delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every
875 attribute included in the entity or to DELETE /v2/entities/<id> if
876 no attribute were included in the entity.
878 replace: maps to PUT /v2/entities/<id>/attrs.
880 Args:
881 entities: "an array of entities, each entity specified using the "
882 "JSON entity representation format "
883 action_type (Update): "actionType, to specify the kind of update
884 action to do: either append, appendStrict, update, delete,
885 or replace. "
886 options (str): Optional 'noOverwrite' 'replace' 'update'
888 Returns:
890 """
892 url = urljoin(
893 self.base_url, f"{self._url_version}/entityOperations/{action_type.value}"
894 )
896 headers = self.headers.copy()
897 ctx = any(e.model_dump().get("@context", None) is not None for e in entities)
899 nctx = any(e.model_dump().get("@context", None) is None for e in entities)
900 if ctx and not nctx:
901 headers.update({"Content-Type": "application/ld+json"})
902 headers.update({"Link": None})
903 elif not ctx and nctx:
904 headers.update({"Content-Type": "application/json"})
905 else:
906 self.logger.warning(
907 "Detected mixed context provision in batch operation: "
908 "Some entities have @context field while others don't. "
909 "FiLiP use application/json and Link header by default, so that "
910 "the entities with @context will be rejected by CB"
911 )
913 params = {}
914 if options:
915 params.update({"options": options})
916 update = UpdateLD(entities=entities)
917 try:
918 if action_type == ActionTypeLD.DELETE:
919 id_list = [entity.id for entity in entities]
920 res = self.post(
921 url=url, headers=headers, params=params, data=json.dumps(id_list)
922 )
923 else:
924 res = self.post(
925 url=url,
926 headers=headers,
927 params=params,
928 data=json.dumps(
929 update.model_dump(
930 by_alias=True,
931 exclude_none=True,
932 ).get("entities")
933 ),
934 )
935 self.handle_multi_status_response(res)
936 except RuntimeError as rerr:
937 raise rerr
938 except Exception as err:
939 raise err
940 else:
941 self.logger.info(f"Update operation {action_type} succeeded!")
943 def validate_relationship(
944 self,
945 relationship: Union[
946 NamedContextProperty,
947 ContextProperty,
948 NamedContextRelationship,
949 ContextRelationship,
950 Dict,
951 ],
952 ) -> bool:
953 """
954 Validates a relationship. A relationship is valid if it points to an existing
955 entity. Otherwise, it is considered invalid
957 Args:
958 relationship: relationship to validate,assumed to be property or relationship
959 since there is no geoproperty with string value
960 Returns
961 True if the relationship is valid, False otherwise
962 """
963 if hasattr(relationship, "value"):
964 destination_id = relationship.value
965 elif hasattr(relationship, "object"):
966 destination_id = relationship.object
967 elif isinstance(relationship, dict):
968 _sentinel = object()
969 destination_id = relationship.get("value", _sentinel)
970 if destination_id is _sentinel:
971 raise ValueError(
972 "Invalid ld relationship dictionary format\n"
973 "Expected format: {"
974 f'"type": "{DataTypeLD.RELATIONSHIP[0]}", '
975 '"value" "entity_id"}'
976 )
977 else:
978 raise ValueError("Invalid relationship type.")
979 try:
980 destination_entity = self.get_entity(entity_id=destination_id)
981 return destination_entity.id == destination_id
982 except requests.RequestException as err:
983 if err.response.status_code == 404:
984 return False