Coverage for filip/clients/ngsi_ld/cb.py: 82%
341 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-05 11:07 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-05 11:07 +0000
1"""
2Context Broker Module for API Client
3"""
5import re
6import json
7import os
8from math import inf
9from typing import Any, Dict, List, Union, Optional, Literal
10from urllib.parse import urljoin
11import requests
12from pydantic import TypeAdapter, PositiveInt, PositiveFloat
13from filip.clients.base_http_client import BaseHttpClient, NgsiURLVersion
14from filip.config import settings
15from filip.models.base import FiwareLDHeader, PaginationMethod, core_context
16from filip.models.ngsi_v2.base import AttrsFormat
17from filip.models.ngsi_ld.subscriptions import SubscriptionLD
18from filip.models.ngsi_ld.context import (
19 DataTypeLD,
20 ContextLDEntity,
21 ContextLDEntityKeyValues,
22 ContextProperty,
23 ContextRelationship,
24 NamedContextProperty,
25 NamedContextRelationship,
26 ActionTypeLD,
27 UpdateLD,
28)
29from filip.models.ngsi_v2.context import Query
32class ContextBrokerLDClient(BaseHttpClient):
33 """
34 Implementation of NGSI-LD Context Broker functionalities, such as creating
35 entities and subscriptions; retrieving, updating and deleting data.
36 Further documentation:
37 https://fiware-orion.readthedocs.io/en/master/
39 Api specifications for LD are located here:
40 https://www.etsi.org/deliver/etsi_gs/CIM/001_099/009/01.04.01_60/gs_cim009v010401p.pdf
41 """
43 def __init__(
44 self,
45 url: str = None,
46 *,
47 session: requests.Session = None,
48 fiware_header: FiwareLDHeader = None,
49 **kwargs,
50 ):
51 """
53 Args:
54 url: Url of context broker server
55 session (requests.Session):
56 fiware_header (FiwareHeader): fiware service and fiware service path
57 **kwargs (Optional): Optional arguments that ``request`` takes.
58 """
59 # set service url
60 url = url or settings.LD_CB_URL
61 # base_http_client overwrites empty header with FiwareHeader instead of FiwareLD
62 init_header = fiware_header if fiware_header else FiwareLDHeader()
63 if init_header.link_header is None:
64 init_header.set_context(core_context)
65 super().__init__(url=url, session=session, fiware_header=init_header, **kwargs)
66 # set the version specific url-pattern
67 self._url_version = NgsiURLVersion.ld_url.value
68 # For uplink requests, the Content-Type header is essential,
69 # Accept will be ignored
70 # For downlink requests, the Accept header is essential,
71 # Content-Type will be ignored
73 # default uplink content JSON
74 self.headers.update({"Content-Type": "application/json"})
75 # default downlink content JSON-LD
76 self.headers.update({"Accept": "application/ld+json"})
78 if init_header.ngsild_tenant is not None:
79 self.__make_tenant()
81 def __pagination(
82 self,
83 *,
84 method: PaginationMethod = PaginationMethod.GET,
85 url: str,
86 headers: Dict,
87 limit: Union[PositiveInt, PositiveFloat] = None,
88 params: Dict = None,
89 data: str = None,
90 ) -> List[Dict]:
91 """
92 NGSIv2 implements a pagination mechanism in order to help clients to
93 retrieve large sets of resources. This mechanism works for all listing
94 operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions,
95 POST /v2/op/query, etc.). This function helps getting datasets that are
96 larger than the limit for the different GET operations.
98 https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html
100 Args:
101 url: Information about the url, obtained from the original function
102 headers: The headers from the original function
103 params:
104 limit:
106 Returns:
107 object:
109 """
110 if limit is None:
111 limit = inf
112 if limit > 1000:
113 params["limit"] = 1000 # maximum items per request
114 else:
115 params["limit"] = limit
117 if self.session:
118 session = self.session
119 else:
120 session = requests.Session()
121 with session:
122 res = session.request(
123 method=method, url=url, params=params, headers=headers, data=data
124 )
125 if res.ok:
126 items = res.json()
127 # do pagination
128 if self._url_version == NgsiURLVersion.v2_url.value:
129 count = int(res.headers["Fiware-Total-Count"])
130 elif self._url_version == NgsiURLVersion.ld_url.value:
131 count = int(res.headers["NGSILD-Results-Count"])
132 else:
133 count = 0
135 while len(items) < limit and len(items) < count:
136 # Establishing the offset from where entities are retrieved
137 params["offset"] = len(items)
138 params["limit"] = min(1000, (limit - len(items)))
139 res = session.request(
140 method=method,
141 url=url,
142 params=params,
143 headers=headers,
144 data=data,
145 )
146 if res.ok:
147 items.extend(res.json())
148 else:
149 res.raise_for_status()
150 self.logger.debug("Received: %s", items)
151 return items
152 res.raise_for_status()
154 def get_version(self) -> Dict:
155 """
156 Gets version of Orion-LD context broker
157 Returns:
158 Dictionary with response
159 """
160 url = urljoin(self.base_url, "/version")
161 try:
162 res = self.get(url=url)
163 if res.ok:
164 return res.json()
165 res.raise_for_status()
166 except requests.RequestException as err:
167 self.logger.error(err)
168 raise
170 def __make_tenant(self):
171 """
172 Create tenant if tenant
173 is given in headers
174 """
175 idhex = f"urn:ngsi-ld:{os.urandom(6).hex()}"
176 e = ContextLDEntity(id=idhex, type="TemporaryTenant")
177 try:
178 self.post_entity(entity=e)
179 self.delete_entity_by_id(idhex)
180 except Exception as err:
181 self.log_error(err=err, msg="Error while creating tenant")
182 raise
184 def get_statistics(self) -> Dict:
185 """
186 Gets statistics of context broker
187 Returns:
188 Dictionary with response
189 """
190 url = urljoin(self.base_url, "statistics")
191 try:
192 res = self.get(url=url)
193 if res.ok:
194 return res.json()
195 res.raise_for_status()
196 except requests.RequestException as err:
197 self.logger.error(err)
198 raise
200 def post_entity(
201 self,
202 entity: Union[ContextLDEntity, ContextLDEntityKeyValues],
203 append: bool = False,
204 update: bool = False,
205 ):
206 """
207 Function registers an Object with the NGSI-LD Context Broker,
208 if it already exists it can be automatically updated
209 if the update flag bool is True.
210 First a post request with the entity is tried, if the response code
211 is 422 the entity is uncrossable, as it already exists there are two
212 options, either overwrite it, if the attribute have changed
213 (e.g. at least one new/new values) (update = True) or leave
214 it the way it is (update=False)
216 """
217 url = urljoin(self.base_url, f"{self._url_version}/entities")
218 headers = self.headers.copy()
219 if isinstance(entity, ContextLDEntityKeyValues):
220 entity = entity.to_entity()
221 if entity.model_dump().get("@context", None) is not None:
222 headers.update({"Content-Type": "application/ld+json"})
223 headers.update({"Link": None})
224 try:
225 res = self.post(
226 url=url,
227 headers=headers,
228 json=entity.model_dump(
229 exclude_unset=False, exclude_defaults=False, exclude_none=True
230 ),
231 )
232 if res.ok:
233 self.logger.info("Entity successfully posted!")
234 return res.headers.get("Location")
235 res.raise_for_status()
236 except requests.RequestException as err:
237 if err.response is not None and err.response.status_code == 409:
238 if append: # 409 entity already exists
239 return self.append_entity_attributes(entity=entity)
240 elif update:
241 return self.override_entities(entities=[entity])
242 msg = f"Could not post entity {entity.id}"
243 self.log_error(err=err, msg=msg)
244 raise
246 def override_entities(
247 self, entities: List[Union[ContextLDEntity, ContextLDEntityKeyValues]]
248 ):
249 """
250 Function to create or override existing entites with the NGSI-LD Context Broker.
251 The batch operation with Upsert will be used.
252 """
253 return self.entity_batch_operation(
254 entities=entities, action_type=ActionTypeLD.UPSERT, options="replace"
255 )
257 def get_entity(
258 self,
259 entity_id: str,
260 entity_type: str = None,
261 attrs: List[str] = None,
262 options: Optional[str] = None,
263 geometryProperty: Optional[str] = None,
264 ) -> Union[ContextLDEntity, ContextLDEntityKeyValues, Dict[str, Any]]:
265 """
266 This operation must return one entity element only, but there may be
267 more than one entity with the same ID (e.g. entities with same ID but
268 different types). In such case, an error message is returned, with
269 the HTTP status code set to 409 Conflict.
271 Args:
272 entity_id (String): Id of the entity to be retrieved
273 entity_type (String): Entity type, to avoid ambiguity in case
274 there are several entities with the same entity id.
275 attrs (List of Strings): List of attribute names whose data must be
276 included in the response. The attributes are retrieved in the
277 order specified by this parameter.
278 See "Filtering out attributes and metadata" section for more
279 detail. If this parameter is not included, the attributes are
280 retrieved in arbitrary order, and all the attributes of the
281 entity are included in the response.
282 Example: temperature, humidity.
283 options (String): keyValues (simplified representation of entity)
284 or sysAttrs (include generated attrs createdAt and modifiedAt)
285 geometryProperty (String): Name of a GeoProperty. In the case of GeoJSON
286 Entity representation, this parameter indicates which GeoProperty to
287 use for the "geometry" element. By default, it shall be 'location'.
288 Returns:
289 ContextEntity
290 """
291 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}")
292 headers = self.headers.copy()
293 params = {}
294 if entity_type:
295 params.update({"type": entity_type})
296 if attrs:
297 params.update({"attrs": ",".join(attrs)})
298 if geometryProperty:
299 params.update({"geometryProperty": geometryProperty})
300 if options:
301 if options != "keyValues" and options != "sysAttrs":
302 raise ValueError(
303 f"Only available options are 'keyValues' and 'sysAttrs'"
304 )
305 params.update({"options": options})
307 try:
308 res = self.get(url=url, params=params, headers=headers)
309 if res.ok:
310 self.logger.info("Entity successfully retrieved!")
311 self.logger.debug("Received: %s", res.json())
312 if options == "keyValues":
313 return ContextLDEntityKeyValues(**res.json())
314 else:
315 return ContextLDEntity(**res.json())
316 res.raise_for_status()
317 except requests.RequestException as err:
318 msg = f"Could not load entity {entity_id}"
319 self.log_error(err=err, msg=msg)
320 raise
322 GeometryShape = Literal[
323 "Point",
324 "MultiPoint",
325 "LineString",
326 "MultiLineString",
327 "Polygon",
328 "MultiPolygon",
329 ]
331 def get_entity_list(
332 self,
333 entity_id: Optional[str] = None,
334 id_pattern: Optional[str] = ".*",
335 entity_type: Optional[str] = None,
336 attrs: Optional[List[str]] = None,
337 q: Optional[str] = None,
338 georel: Optional[str] = None,
339 geometry: Optional[GeometryShape] = None,
340 coordinates: Optional[str] = None,
341 geoproperty: Optional[str] = None,
342 # csf: Optional[str] = None, # Context Source Filter
343 limit: Optional[PositiveInt] = None,
344 options: Optional[str] = None,
345 ) -> List[Union[ContextLDEntity, ContextLDEntityKeyValues]]:
346 """
347 This operation retrieves a list of entities based on different query options.
348 By default, the operation retrieves all the entities in the context broker.
349 Args:
350 entity_id:
351 Id of the entity to be retrieved
352 id_pattern:
353 Regular expression to match the entity id
354 entity_type:
355 Entity type, to avoid ambiguity in case there are several
356 entities with the same entity id.
357 attrs:
358 List of attribute names whose data must be included in the response.
359 q:
360 Query expression, composed of attribute names, operators and values.
361 georel:
362 Geospatial relationship between the query geometry and the entities.
363 geometry:
364 Type of geometry for the query. The possible values are Point,
365 MultiPoint, LineString, MultiLineString, Polygon, MultiPolygon.
366 coordinates:
367 Coordinates of the query geometry. The coordinates must be
368 expressed as a string of comma-separated values.
369 geoproperty:
370 Name of a GeoProperty. In the case of GeoJSON Entity representation,
371 this parameter indicates which GeoProperty to use for the "geometry" element.
372 limit:
373 Maximum number of entities to retrieve.
374 options:
375 Further options for the query. The available options are:
376 - keyValues (simplified representation of entity)
377 - sysAttrs (including createdAt and modifiedAt, etc.)
378 - count (include number of all matched entities in response header)
379 """
380 url = urljoin(self.base_url, f"{self._url_version}/entities/")
381 headers = self.headers.copy()
382 params = {}
383 if entity_id:
384 params.update({"id": entity_id})
385 if id_pattern:
386 params.update({"idPattern": id_pattern})
387 if entity_type:
388 params.update({"type": entity_type})
389 if attrs:
390 params.update({"attrs": ",".join(attrs)})
391 if q:
392 x = re.search(r"[=!<>~]{1}\'.*\'", q.replace(" ", ""))
393 if x is not None:
394 raise ValueError(
395 f"String/Date/etc. value in {x.group()} must be " f"in double quote"
396 )
397 params.update({"q": q})
398 if georel:
399 params.update({"georel": georel})
400 if geometry:
401 params.update({"geometry": geometry})
402 if coordinates:
403 params.update({"coordinates": coordinates})
404 if geoproperty:
405 params.update({"geoproperty": geoproperty})
406 # if csf: # ContextSourceRegistration not supported yet
407 # params.update({'csf': csf})
408 if limit:
409 if limit > 1000:
410 raise ValueError("limit must be an integer value <= 1000")
411 params.update({"limit": limit})
412 if options:
413 if options != "keyValues" and options != "sysAttrs":
414 raise ValueError(
415 f"Only available options are 'keyValues' and 'sysAttrs'"
416 )
417 params.update({"options": options})
418 # params.update({'local': 'true'})
420 try:
421 res = self.get(url=url, params=params, headers=headers)
422 if res.ok:
423 self.logger.info("Entity successfully retrieved!")
424 entity_list: List[Union[ContextLDEntity, ContextLDEntityKeyValues]] = []
425 if options == "keyValues":
426 entity_list = [
427 ContextLDEntityKeyValues(**item) for item in res.json()
428 ]
429 return entity_list
430 else:
431 entity_list = [ContextLDEntity(**item) for item in res.json()]
432 return entity_list
433 res.raise_for_status()
434 except requests.RequestException as err:
435 msg = f"Could not load entity matching{params}"
436 self.log_error(err=err, msg=msg)
437 raise
439 def replace_existing_attributes_of_entity(
440 self,
441 entity: Union[ContextLDEntity, ContextLDEntityKeyValues],
442 append: bool = False,
443 ):
444 """
445 The attributes previously existing in the entity are removed and
446 replaced by the ones in the request.
448 Args:
449 entity (ContextEntity):
450 append (bool):
451 options:
452 Returns:
454 """
455 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity.id}/attrs")
456 headers = self.headers.copy()
457 if isinstance(entity, ContextLDEntityKeyValues):
458 entity = entity.to_entity()
459 if entity.model_dump().get("@context", None) is not None:
460 headers.update({"Content-Type": "application/ld+json"})
461 headers.update({"Link": None})
462 try:
463 res = self.patch(
464 url=url,
465 headers=headers,
466 json=entity.model_dump(exclude={"id", "type"}, exclude_none=True),
467 )
468 if res.ok:
469 self.logger.info(f"Entity {entity.id} successfully " "updated!")
470 else:
471 res.raise_for_status()
472 except requests.RequestException as err:
473 if err.response is not None and append and err.response.status_code == 207:
474 return self.append_entity_attributes(entity=entity)
475 msg = f"Could not replace attribute of entity {entity.id} !"
476 self.log_error(err=err, msg=msg)
477 raise
479 def update_entity_attribute(
480 self,
481 entity_id: str,
482 attr: Union[
483 ContextProperty,
484 ContextRelationship,
485 NamedContextProperty,
486 NamedContextRelationship,
487 ],
488 attr_name: str = None,
489 ):
490 """
491 Updates a specified attribute from an entity.
492 Args:
493 attr: context attribute to update
494 entity_id: Id of the entity. Example: Bcn_Welt
495 entity_type: Entity type, to avoid ambiguity in case there are
496 several entities with the same entity id.
497 """
498 headers = self.headers.copy()
499 if not isinstance(attr, NamedContextProperty) or not isinstance(
500 attr, NamedContextRelationship
501 ):
502 assert attr_name is not None, (
503 "Missing name for attribute. "
504 "attr_name must be present if"
505 "attr is of type ContextAttribute"
506 )
507 else:
508 assert attr_name is None, (
509 "Invalid argument attr_name. Do not set "
510 "attr_name if attr is of type "
511 "NamedContextAttribute or NamedContextRelationship"
512 )
514 url = urljoin(
515 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
516 )
518 jsonnn = {}
519 if isinstance(attr, list) or isinstance(attr, NamedContextProperty):
520 jsonnn = attr.model_dump(exclude={"name"}, exclude_none=True)
521 else:
522 prop = attr.model_dump()
523 for key, value in prop.items():
524 if value and value != "Property":
525 jsonnn[key] = value
527 try:
528 res = self.patch(url=url, headers=headers, json=jsonnn)
529 if res.ok:
530 self.logger.info(
531 f"Attribute {attr_name} of {entity_id} successfully updated!"
532 )
533 else:
534 res.raise_for_status()
535 except requests.RequestException as err:
536 msg = f"Could not update attribute '{attr_name}' of entity {entity_id}"
537 self.log_error(err=err, msg=msg)
538 raise
540 def append_entity_attributes(
541 self,
542 entity: Union[ContextLDEntity, ContextLDEntityKeyValues],
543 options: Optional[str] = None,
544 ):
545 """
546 Append new Entity attributes to an existing Entity within an NGSI-LD system
547 Args:
548 entity (ContextLDEntity):
549 Entity to append attributes to.
550 options (str):
551 Options for the request. The only available value is
552 'noOverwrite'. If set, it will raise 400, if all attributes
553 exist already.
555 """
556 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity.id}/attrs")
557 headers = self.headers.copy()
558 if entity.model_dump().get("@context", None) is not None:
559 headers.update({"Content-Type": "application/ld+json"})
560 headers.update({"Link": None})
561 params = {}
563 if options:
564 if options != "noOverwrite":
565 raise ValueError(f"The only available value is 'noOverwrite'")
566 params.update({"options": options})
568 try:
569 res = self.post(
570 url=url,
571 headers=headers,
572 params=params,
573 json=entity.model_dump(exclude={"id", "type"}, exclude_none=True),
574 )
575 if res.ok:
576 self.logger.info(f"Entity {entity.id} successfully updated!")
577 else:
578 res.raise_for_status()
579 except requests.RequestException as err:
580 msg = f"Could not update entity {entity.id}!"
581 self.log_error(err=err, msg=msg)
582 raise
584 # def update_existing_attribute_by_name(self, entity: ContextLDEntity
585 # ):
586 # pass
588 def delete_entity_by_id(self, entity_id: str, entity_type: Optional[str] = None):
589 """
590 Deletes an entity by its id. For deleting mulitple entities at once,
591 entity_batch_operation() is more efficient.
592 Args:
593 entity_id:
594 ID of entity to delete.
595 entity_type:
596 Type of entity to delete.
597 """
598 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}")
599 headers = self.headers.copy()
600 params = {}
602 if entity_type:
603 params.update({"type": entity_type})
605 try:
606 res = self.delete(url=url, headers=headers, params=params)
607 if res.ok:
608 self.logger.info(f"Entity {entity_id} successfully deleted")
609 else:
610 res.raise_for_status()
611 except requests.RequestException as err:
612 msg = f"Could not delete entity {entity_id}"
613 self.log_error(err=err, msg=msg)
614 raise
616 def delete_attribute(self, entity_id: str, attribute_id: str):
617 """
618 Deletes an attribute from an entity.
619 Args:
620 entity_id:
621 ID of the entity.
622 attribute_id:
623 Name of the attribute to delete.
624 Returns:
626 """
627 url = urljoin(
628 self.base_url,
629 f"{self._url_version}/entities/{entity_id}/attrs/{attribute_id}",
630 )
631 headers = self.headers.copy()
633 try:
634 res = self.delete(url=url, headers=headers)
635 if res.ok:
636 self.logger.info(
637 f"Attribute {attribute_id} of Entity {entity_id} successfully deleted"
638 )
639 else:
640 res.raise_for_status()
641 except requests.RequestException as err:
642 msg = f"Could not delete attribute {attribute_id} of entity {entity_id}"
643 self.log_error(err=err, msg=msg)
644 raise
646 # SUBSCRIPTION API ENDPOINTS
647 def get_subscription_list(self, limit: PositiveInt = inf) -> List[SubscriptionLD]:
648 """
649 Returns a list of all the subscriptions present in the system.
650 Args:
651 limit: Limit the number of subscriptions to be retrieved
652 Returns:
653 list of subscriptions
654 """
655 url = urljoin(self.base_url, f"{self._url_version}/subscriptions/")
656 headers = self.headers.copy()
657 params = {}
659 # We always use the 'count' option to check weather pagination is
660 # required
661 params.update({"options": "count"})
662 try:
663 items = self.__pagination(
664 limit=limit, url=url, params=params, headers=headers
665 )
666 adapter = TypeAdapter(List[SubscriptionLD])
667 return adapter.validate_python(items)
668 except requests.RequestException as err:
669 msg = "Could not load subscriptions!"
670 self.log_error(err=err, msg=msg)
671 raise
673 def post_subscription(
674 self, subscription: SubscriptionLD, update: bool = False
675 ) -> str:
676 """
677 Creates a new subscription. The subscription is represented by a
678 Subscription object defined in filip.cb.models.
680 If the subscription already exists, the adding is prevented and the id
681 of the existing subscription is returned.
683 A subscription is deemed as already existing if there exists a
684 subscription with the exact same subject and notification fields. All
685 optional fields are not considered.
687 Args:
688 subscription: Subscription
689 update: True - If the subscription already exists, update it
690 False- If the subscription already exists, throw warning
691 Returns:
692 str: Id of the (created) subscription
694 """
695 existing_subscriptions = self.get_subscription_list()
697 sub_hash = subscription.model_dump_json(
698 include={"subject", "notification", "type"}
699 )
700 for ex_sub in existing_subscriptions:
701 if sub_hash == ex_sub.model_dump_json(
702 include={"subject", "notification", "type"}
703 ):
704 self.logger.info("Subscription already exists")
705 if update:
706 self.logger.info("Updated subscription")
707 subscription.id = ex_sub.id
708 self.update_subscription(subscription)
709 else:
710 self.logger.warning(
711 f"Subscription existed already with the id" f" {ex_sub.id}"
712 )
713 return ex_sub.id
715 url = urljoin(self.base_url, f"{self._url_version}/subscriptions")
716 headers = self.headers.copy()
717 if subscription.model_dump().get("@context", None) is not None:
718 headers.update({"Content-Type": "application/ld+json"})
719 headers.update({"Link": None})
720 try:
721 res = self.post(
722 url=url,
723 headers=headers,
724 data=subscription.model_dump_json(
725 exclude_unset=False, exclude_defaults=False, exclude_none=True
726 ),
727 )
728 if res.ok:
729 self.logger.info("Subscription successfully created!")
730 return res.headers["Location"].split("/")[-1]
731 res.raise_for_status()
732 except requests.RequestException as err:
733 msg = "Could not send subscription!"
734 self.log_error(err=err, msg=msg)
735 raise
737 def get_subscription(self, subscription_id: str) -> SubscriptionLD:
738 """
739 Retrieves a subscription from the context broker.
740 Args:
741 subscription_id: id of the subscription
743 Returns:
745 """
746 url = urljoin(
747 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}"
748 )
749 headers = self.headers.copy()
750 try:
751 res = self.get(url=url, headers=headers)
752 if res.ok:
753 self.logger.debug("Received: %s", res.json())
754 return SubscriptionLD(**res.json())
755 res.raise_for_status()
756 except requests.RequestException as err:
757 msg = f"Could not load subscription {subscription_id}"
758 self.log_error(err=err, msg=msg)
759 raise
761 def update_subscription(self, subscription: SubscriptionLD) -> None:
762 """
763 Only the fields included in the request are updated in the subscription.
764 Args:
765 subscription: Subscription to update
766 Returns:
768 """
769 url = urljoin(
770 self.base_url, f"{self._url_version}/subscriptions/{subscription.id}"
771 )
772 headers = self.headers.copy()
773 if subscription.model_dump().get("@context", None) is not None:
774 headers.update({"Content-Type": "application/ld+json"})
775 headers.update({"Link": None})
776 try:
777 res = self.patch(
778 url=url,
779 headers=headers,
780 data=subscription.model_dump_json(
781 exclude={"id"},
782 exclude_none=True,
783 ),
784 )
785 if res.ok:
786 self.logger.info("Subscription successfully updated!")
787 else:
788 res.raise_for_status()
789 except requests.RequestException as err:
790 msg = f"Could not update subscription {subscription.id}"
791 self.log_error(err=err, msg=msg)
792 raise
794 def delete_subscription(self, subscription_id: str) -> None:
795 """
796 Deletes a subscription from a Context Broker
797 Args:
798 subscription_id: id of the subscription
799 """
800 url = urljoin(
801 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}"
802 )
803 headers = self.headers.copy()
804 try:
805 res = self.delete(url=url, headers=headers)
806 if res.ok:
807 self.logger.info(
808 f"Subscription '{subscription_id}' " f"successfully deleted!"
809 )
810 else:
811 res.raise_for_status()
812 except requests.RequestException as err:
813 msg = f"Could not delete subscription {subscription_id}"
814 self.log_error(err=err, msg=msg)
815 raise
817 def log_multi_errors(self, errors: List[Dict]) -> None:
818 for error in errors:
819 entity_id = error["entityId"]
820 error_details: dict = error["error"]
821 error_title = error_details.get("title")
822 error_status = error_details.get("status")
823 # error_detail = error_details['detail']
824 self.logger.error(
825 "Response status: %d, Entity: %s, Reason: %s",
826 error_status,
827 entity_id,
828 error_title,
829 )
831 def handle_multi_status_response(self, res: requests.Response):
832 """
833 Handles the response of a batch_operation. If the response contains
834 errors, they are logged. If the response contains only errors, a RuntimeError
835 is raised.
836 Args:
837 res:
839 Returns:
841 """
842 try:
843 res.raise_for_status()
844 if res.text:
845 response_data = res.json()
846 if "errors" in response_data:
847 errors = response_data["errors"]
848 self.log_multi_errors(errors)
849 if "success" in response_data:
850 successList = response_data["success"]
851 if len(successList) == 0:
852 raise RuntimeError(
853 "Batch operation resulted in errors only, see logs"
854 )
855 else:
856 self.logger.info("Empty response received.")
857 except json.JSONDecodeError:
858 self.logger.info(
859 "Error decoding JSON. Response may not be in valid JSON format."
860 )
862 # Batch operation API
863 def entity_batch_operation(
864 self,
865 *,
866 entities: List[Union[ContextLDEntity, ContextLDEntityKeyValues]],
867 action_type: Union[ActionTypeLD, str],
868 options: Literal["noOverwrite", "replace", "update"] = None,
869 ) -> None:
870 """
871 This operation allows to create, update and/or delete several entities
872 in a single batch operation.
874 This operation is split in as many individual operations as entities
875 in the entities vector, so the actionType is executed for each one of
876 them. Depending on the actionType, a mapping with regular non-batch
877 operations can be done:
879 append: maps to POST /v2/entities (if the entity does not already exist)
880 or POST /v2/entities/<id>/attrs (if the entity already exists).
882 appendStrict: maps to POST /v2/entities (if the entity does not
883 already exist) or POST /v2/entities/<id>/attrs?options=append (if the
884 entity already exists).
886 update: maps to PATCH /v2/entities/<id>/attrs.
888 delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every
889 attribute included in the entity or to DELETE /v2/entities/<id> if
890 no attribute were included in the entity.
892 replace: maps to PUT /v2/entities/<id>/attrs.
894 Args:
895 entities: "an array of entities, each entity specified using the "
896 "JSON entity representation format "
897 action_type (Update): "actionType, to specify the kind of update
898 action to do: either append, appendStrict, update, delete,
899 or replace. "
900 options (str): Optional 'noOverwrite' 'replace' 'update'
902 Returns:
904 """
906 url = urljoin(
907 self.base_url, f"{self._url_version}/entityOperations/{action_type.value}"
908 )
910 headers = self.headers.copy()
911 ctx = any(e.model_dump().get("@context", None) is not None for e in entities)
913 nctx = any(e.model_dump().get("@context", None) is None for e in entities)
914 if ctx and not nctx:
915 headers.update({"Content-Type": "application/ld+json"})
916 headers.update({"Link": None})
917 elif not ctx and nctx:
918 headers.update({"Content-Type": "application/json"})
919 else:
920 self.logger.warning(
921 "Detected mixed context provision in batch operation: "
922 "Some entities have @context field while others don't. "
923 "FiLiP use application/json and Link header by default, so that "
924 "the entities with @context will be rejected by CB"
925 )
927 params = {}
928 if options:
929 params.update({"options": options})
930 update = UpdateLD(entities=entities)
931 try:
932 if action_type == ActionTypeLD.DELETE:
933 id_list = [entity.id for entity in entities]
934 res = self.post(
935 url=url, headers=headers, params=params, data=json.dumps(id_list)
936 )
937 else:
938 res = self.post(
939 url=url,
940 headers=headers,
941 params=params,
942 data=json.dumps(
943 update.model_dump(
944 by_alias=True,
945 exclude_none=True,
946 ).get("entities")
947 ),
948 )
949 self.handle_multi_status_response(res)
950 except RuntimeError as rerr:
951 raise rerr
952 except Exception as err:
953 raise err
954 else:
955 self.logger.info(f"Update operation {action_type} succeeded!")
957 def validate_relationship(
958 self,
959 relationship: Union[
960 NamedContextProperty,
961 ContextProperty,
962 NamedContextRelationship,
963 ContextRelationship,
964 Dict,
965 ],
966 ) -> bool:
967 """
968 Validates a relationship. A relationship is valid if it points to an existing
969 entity. Otherwise, it is considered invalid
971 Args:
972 relationship: relationship to validate,assumed to be property or relationship
973 since there is no geoproperty with string value
974 Returns
975 True if the relationship is valid, False otherwise
976 """
977 if hasattr(relationship, "value"):
978 destination_id = relationship.value
979 elif hasattr(relationship, "object"):
980 destination_id = relationship.object
981 elif isinstance(relationship, dict):
982 _sentinel = object()
983 destination_id = relationship.get("value", _sentinel)
984 if destination_id is _sentinel:
985 raise ValueError(
986 "Invalid ld relationship dictionary format\n"
987 "Expected format: {"
988 f'"type": "{DataTypeLD.RELATIONSHIP[0]}", '
989 '"value" "entity_id"}'
990 )
991 else:
992 raise ValueError("Invalid relationship type.")
993 try:
994 destination_entity = self.get_entity(entity_id=destination_id)
995 return destination_entity.id == destination_id
996 except requests.RequestException as err:
997 if err.response.status_code == 404:
998 return False