Coverage for filip/clients/ngsi_ld/cb.py: 82%
316 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-17 14:42 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-04-17 14:42 +0000
1"""
2Context Broker Module for API Client
3"""
5import re
6import json
7import os
8from math import inf
9from typing import Any, Dict, List, Union, Optional, Literal
10from urllib.parse import urljoin
11import requests
12from pydantic import TypeAdapter, PositiveInt, PositiveFloat
13from filip.clients.base_http_client import BaseHttpClient, NgsiURLVersion
14from filip.config import settings
15from filip.models.base import FiwareLDHeader, PaginationMethod, core_context
16from filip.models.ngsi_v2.base import AttrsFormat
17from filip.models.ngsi_ld.subscriptions import SubscriptionLD
18from filip.models.ngsi_ld.context import (
19 ContextLDEntity,
20 ContextLDEntityKeyValues,
21 ContextProperty,
22 ContextRelationship,
23 NamedContextProperty,
24 NamedContextRelationship,
25 ActionTypeLD,
26 UpdateLD,
27)
28from filip.models.ngsi_v2.context import Query
31class ContextBrokerLDClient(BaseHttpClient):
32 """
33 Implementation of NGSI-LD Context Broker functionalities, such as creating
34 entities and subscriptions; retrieving, updating and deleting data.
35 Further documentation:
36 https://fiware-orion.readthedocs.io/en/master/
38 Api specifications for LD are located here:
39 https://www.etsi.org/deliver/etsi_gs/CIM/001_099/009/01.04.01_60/gs_cim009v010401p.pdf
40 """
42 def __init__(
43 self,
44 url: str = None,
45 *,
46 session: requests.Session = None,
47 fiware_header: FiwareLDHeader = None,
48 **kwargs,
49 ):
50 """
52 Args:
53 url: Url of context broker server
54 session (requests.Session):
55 fiware_header (FiwareHeader): fiware service and fiware service path
56 **kwargs (Optional): Optional arguments that ``request`` takes.
57 """
58 # set service url
59 url = url or settings.LD_CB_URL
60 # base_http_client overwrites empty header with FiwareHeader instead of FiwareLD
61 init_header = fiware_header if fiware_header else FiwareLDHeader()
62 if init_header.link_header is None:
63 init_header.set_context(core_context)
64 super().__init__(url=url, session=session, fiware_header=init_header, **kwargs)
65 # set the version specific url-pattern
66 self._url_version = NgsiURLVersion.ld_url.value
67 # For uplink requests, the Content-Type header is essential,
68 # Accept will be ignored
69 # For downlink requests, the Accept header is essential,
70 # Content-Type will be ignored
72 # default uplink content JSON
73 self.headers.update({"Content-Type": "application/json"})
74 # default downlink content JSON-LD
75 self.headers.update({"Accept": "application/ld+json"})
77 if init_header.ngsild_tenant is not None:
78 self.__make_tenant()
80 def __pagination(
81 self,
82 *,
83 method: PaginationMethod = PaginationMethod.GET,
84 url: str,
85 headers: Dict,
86 limit: Union[PositiveInt, PositiveFloat] = None,
87 params: Dict = None,
88 data: str = None,
89 ) -> List[Dict]:
90 """
91 NGSIv2 implements a pagination mechanism in order to help clients to
92 retrieve large sets of resources. This mechanism works for all listing
93 operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions,
94 POST /v2/op/query, etc.). This function helps getting datasets that are
95 larger than the limit for the different GET operations.
97 https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html
99 Args:
100 url: Information about the url, obtained from the original function
101 headers: The headers from the original function
102 params:
103 limit:
105 Returns:
106 object:
108 """
109 if limit is None:
110 limit = inf
111 if limit > 1000:
112 params["limit"] = 1000 # maximum items per request
113 else:
114 params["limit"] = limit
116 if self.session:
117 session = self.session
118 else:
119 session = requests.Session()
120 with session:
121 res = session.request(
122 method=method, url=url, params=params, headers=headers, data=data
123 )
124 if res.ok:
125 items = res.json()
126 # do pagination
127 if self._url_version == NgsiURLVersion.v2_url.value:
128 count = int(res.headers["Fiware-Total-Count"])
129 elif self._url_version == NgsiURLVersion.ld_url.value:
130 count = int(res.headers["NGSILD-Results-Count"])
131 else:
132 count = 0
134 while len(items) < limit and len(items) < count:
135 # Establishing the offset from where entities are retrieved
136 params["offset"] = len(items)
137 params["limit"] = min(1000, (limit - len(items)))
138 res = session.request(
139 method=method,
140 url=url,
141 params=params,
142 headers=headers,
143 data=data,
144 )
145 if res.ok:
146 items.extend(res.json())
147 else:
148 res.raise_for_status()
149 self.logger.debug("Received: %s", items)
150 return items
151 res.raise_for_status()
153 def get_version(self) -> Dict:
154 """
155 Gets version of Orion-LD context broker
156 Returns:
157 Dictionary with response
158 """
159 url = urljoin(self.base_url, "/version")
160 try:
161 res = self.get(url=url)
162 if res.ok:
163 return res.json()
164 res.raise_for_status()
165 except requests.RequestException as err:
166 self.logger.error(err)
167 raise
169 def __make_tenant(self):
170 """
171 Create tenant if tenant
172 is given in headers
173 """
174 idhex = f"urn:ngsi-ld:{os.urandom(6).hex()}"
175 e = ContextLDEntity(id=idhex, type=f"urn:ngsi-ld:{os.urandom(6).hex()}")
176 try:
177 self.post_entity(entity=e)
178 self.delete_entity_by_id(idhex)
179 except Exception as err:
180 self.log_error(err=err, msg="Error while creating tenant")
181 raise
183 def get_statistics(self) -> Dict:
184 """
185 Gets statistics of context broker
186 Returns:
187 Dictionary with response
188 """
189 url = urljoin(self.base_url, "statistics")
190 try:
191 res = self.get(url=url)
192 if res.ok:
193 return res.json()
194 res.raise_for_status()
195 except requests.RequestException as err:
196 self.logger.error(err)
197 raise
199 def post_entity(
200 self,
201 entity: Union[ContextLDEntity, ContextLDEntityKeyValues],
202 append: bool = False,
203 update: bool = False,
204 ):
205 """
206 Function registers an Object with the NGSI-LD Context Broker,
207 if it already exists it can be automatically updated
208 if the update flag bool is True.
209 First a post request with the entity is tried, if the response code
210 is 422 the entity is uncrossable, as it already exists there are two
211 options, either overwrite it, if the attribute have changed
212 (e.g. at least one new/new values) (update = True) or leave
213 it the way it is (update=False)
215 """
216 url = urljoin(self.base_url, f"{self._url_version}/entities")
217 headers = self.headers.copy()
218 if entity.model_dump().get("@context", None) is not None:
219 headers.update({"Content-Type": "application/ld+json"})
220 headers.update({"Link": None})
221 try:
222 res = self.post(
223 url=url,
224 headers=headers,
225 json=entity.model_dump(
226 exclude_unset=False, exclude_defaults=False, exclude_none=True
227 ),
228 )
229 if res.ok:
230 self.logger.info("Entity successfully posted!")
231 return res.headers.get("Location")
232 res.raise_for_status()
233 except requests.RequestException as err:
234 if err.response is not None and err.response.status_code == 409:
235 if append: # 409 entity already exists
236 return self.append_entity_attributes(entity=entity)
237 elif update:
238 return self.override_entities(entities=[entity])
239 msg = f"Could not post entity {entity.id}"
240 self.log_error(err=err, msg=msg)
241 raise
243 def override_entities(
244 self, entities: List[Union[ContextLDEntity, ContextLDEntityKeyValues]]
245 ):
246 """
247 Function to create or override existing entites with the NGSI-LD Context Broker.
248 The batch operation with Upsert will be used.
249 """
250 return self.entity_batch_operation(
251 entities=entities, action_type=ActionTypeLD.UPSERT, options="replace"
252 )
254 def get_entity(
255 self,
256 entity_id: str,
257 entity_type: str = None,
258 attrs: List[str] = None,
259 options: Optional[str] = None,
260 geometryProperty: Optional[str] = None,
261 ) -> Union[ContextLDEntity, ContextLDEntityKeyValues, Dict[str, Any]]:
262 """
263 This operation must return one entity element only, but there may be
264 more than one entity with the same ID (e.g. entities with same ID but
265 different types). In such case, an error message is returned, with
266 the HTTP status code set to 409 Conflict.
268 Args:
269 entity_id (String): Id of the entity to be retrieved
270 entity_type (String): Entity type, to avoid ambiguity in case
271 there are several entities with the same entity id.
272 attrs (List of Strings): List of attribute names whose data must be
273 included in the response. The attributes are retrieved in the
274 order specified by this parameter.
275 See "Filtering out attributes and metadata" section for more
276 detail. If this parameter is not included, the attributes are
277 retrieved in arbitrary order, and all the attributes of the
278 entity are included in the response.
279 Example: temperature, humidity.
280 options (String): keyValues (simplified representation of entity)
281 or sysAttrs (include generated attrs createdAt and modifiedAt)
282 geometryProperty (String): Name of a GeoProperty. In the case of GeoJSON
283 Entity representation, this parameter indicates which GeoProperty to
284 use for the "geometry" element. By default, it shall be 'location'.
285 Returns:
286 ContextEntity
287 """
288 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}")
289 headers = self.headers.copy()
290 params = {}
291 if entity_type:
292 params.update({"type": entity_type})
293 if attrs:
294 params.update({"attrs": ",".join(attrs)})
295 if geometryProperty:
296 params.update({"geometryProperty": geometryProperty})
297 if options:
298 if options != "keyValues" and options != "sysAttrs":
299 raise ValueError(
300 f"Only available options are 'keyValues' and 'sysAttrs'"
301 )
302 params.update({"options": options})
304 try:
305 res = self.get(url=url, params=params, headers=headers)
306 if res.ok:
307 self.logger.info("Entity successfully retrieved!")
308 self.logger.debug("Received: %s", res.json())
309 if options == "keyValues":
310 return ContextLDEntityKeyValues(**res.json())
311 else:
312 return ContextLDEntity(**res.json())
313 res.raise_for_status()
314 except requests.RequestException as err:
315 msg = f"Could not load entity {entity_id}"
316 self.log_error(err=err, msg=msg)
317 raise
319 GeometryShape = Literal[
320 "Point",
321 "MultiPoint",
322 "LineString",
323 "MultiLineString",
324 "Polygon",
325 "MultiPolygon",
326 ]
328 def get_entity_list(
329 self,
330 entity_id: Optional[str] = None,
331 id_pattern: Optional[str] = ".*",
332 entity_type: Optional[str] = None,
333 attrs: Optional[List[str]] = None,
334 q: Optional[str] = None,
335 georel: Optional[str] = None,
336 geometry: Optional[GeometryShape] = None,
337 coordinates: Optional[str] = None,
338 geoproperty: Optional[str] = None,
339 # csf: Optional[str] = None, # Context Source Filter
340 limit: Optional[PositiveInt] = None,
341 options: Optional[str] = None,
342 ) -> List[Union[ContextLDEntity, ContextLDEntityKeyValues]]:
343 """
344 This operation retrieves a list of entities based on different query options.
345 By default, the operation retrieves all the entities in the context broker.
346 Args:
347 entity_id:
348 Id of the entity to be retrieved
349 id_pattern:
350 Regular expression to match the entity id
351 entity_type:
352 Entity type, to avoid ambiguity in case there are several
353 entities with the same entity id.
354 attrs:
355 List of attribute names whose data must be included in the response.
356 q:
357 Query expression, composed of attribute names, operators and values.
358 georel:
359 Geospatial relationship between the query geometry and the entities.
360 geometry:
361 Type of geometry for the query. The possible values are Point,
362 MultiPoint, LineString, MultiLineString, Polygon, MultiPolygon.
363 coordinates:
364 Coordinates of the query geometry. The coordinates must be
365 expressed as a string of comma-separated values.
366 geoproperty:
367 Name of a GeoProperty. In the case of GeoJSON Entity representation,
368 this parameter indicates which GeoProperty to use for the "geometry" element.
369 limit:
370 Maximum number of entities to retrieve.
371 options:
372 Further options for the query. The available options are:
373 - keyValues (simplified representation of entity)
374 - sysAttrs (including createdAt and modifiedAt, etc.)
375 - count (include number of all matched entities in response header)
376 """
377 url = urljoin(self.base_url, f"{self._url_version}/entities/")
378 headers = self.headers.copy()
379 params = {}
380 if entity_id:
381 params.update({"id": entity_id})
382 if id_pattern:
383 params.update({"idPattern": id_pattern})
384 if entity_type:
385 params.update({"type": entity_type})
386 if attrs:
387 params.update({"attrs": ",".join(attrs)})
388 if q:
389 x = re.search(r"[=!<>~]{1}\'.*\'", q.replace(" ", ""))
390 if x is not None:
391 raise ValueError(
392 f"String/Date/etc. value in {x.group()} must be " f"in double quote"
393 )
394 params.update({"q": q})
395 if georel:
396 params.update({"georel": georel})
397 if geometry:
398 params.update({"geometry": geometry})
399 if coordinates:
400 params.update({"coordinates": coordinates})
401 if geoproperty:
402 params.update({"geoproperty": geoproperty})
403 # if csf: # ContextSourceRegistration not supported yet
404 # params.update({'csf': csf})
405 if limit:
406 if limit > 1000:
407 raise ValueError("limit must be an integer value <= 1000")
408 params.update({"limit": limit})
409 if options:
410 if options != "keyValues" and options != "sysAttrs":
411 raise ValueError(
412 f"Only available options are 'keyValues' and 'sysAttrs'"
413 )
414 params.update({"options": options})
415 # params.update({'local': 'true'})
417 try:
418 res = self.get(url=url, params=params, headers=headers)
419 if res.ok:
420 self.logger.info("Entity successfully retrieved!")
421 entity_list: List[Union[ContextLDEntity, ContextLDEntityKeyValues]] = []
422 if options == "keyValues":
423 entity_list = [
424 ContextLDEntityKeyValues(**item) for item in res.json()
425 ]
426 return entity_list
427 else:
428 entity_list = [ContextLDEntity(**item) for item in res.json()]
429 return entity_list
430 res.raise_for_status()
431 except requests.RequestException as err:
432 msg = f"Could not load entity matching{params}"
433 self.log_error(err=err, msg=msg)
434 raise
436 def replace_existing_attributes_of_entity(
437 self,
438 entity: Union[ContextLDEntity, ContextLDEntityKeyValues],
439 append: bool = False,
440 ):
441 """
442 The attributes previously existing in the entity are removed and
443 replaced by the ones in the request.
445 Args:
446 entity (ContextEntity):
447 append (bool):
448 options:
449 Returns:
451 """
452 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity.id}/attrs")
453 headers = self.headers.copy()
454 if entity.model_dump().get("@context", None) is not None:
455 headers.update({"Content-Type": "application/ld+json"})
456 headers.update({"Link": None})
457 try:
458 res = self.patch(
459 url=url,
460 headers=headers,
461 json=entity.model_dump(exclude={"id", "type"}, exclude_none=True),
462 )
463 if res.ok:
464 self.logger.info(f"Entity {entity.id} successfully " "updated!")
465 else:
466 res.raise_for_status()
467 except requests.RequestException as err:
468 if err.response is not None and append and err.response.status_code == 207:
469 return self.append_entity_attributes(entity=entity)
470 msg = f"Could not replace attribute of entity {entity.id} !"
471 self.log_error(err=err, msg=msg)
472 raise
474 def update_entity_attribute(
475 self,
476 entity_id: str,
477 attr: Union[
478 ContextProperty,
479 ContextRelationship,
480 NamedContextProperty,
481 NamedContextRelationship,
482 ],
483 attr_name: str = None,
484 ):
485 """
486 Updates a specified attribute from an entity.
487 Args:
488 attr: context attribute to update
489 entity_id: Id of the entity. Example: Bcn_Welt
490 entity_type: Entity type, to avoid ambiguity in case there are
491 several entities with the same entity id.
492 """
493 headers = self.headers.copy()
494 if not isinstance(attr, NamedContextProperty) or not isinstance(
495 attr, NamedContextRelationship
496 ):
497 assert attr_name is not None, (
498 "Missing name for attribute. "
499 "attr_name must be present if"
500 "attr is of type ContextAttribute"
501 )
502 else:
503 assert attr_name is None, (
504 "Invalid argument attr_name. Do not set "
505 "attr_name if attr is of type "
506 "NamedContextAttribute or NamedContextRelationship"
507 )
509 url = urljoin(
510 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
511 )
513 jsonnn = {}
514 if isinstance(attr, list) or isinstance(attr, NamedContextProperty):
515 jsonnn = attr.model_dump(exclude={"name"}, exclude_none=True)
516 else:
517 prop = attr.model_dump()
518 for key, value in prop.items():
519 if value and value != "Property":
520 jsonnn[key] = value
522 try:
523 res = self.patch(url=url, headers=headers, json=jsonnn)
524 if res.ok:
525 self.logger.info(
526 f"Attribute {attr_name} of {entity_id} successfully updated!"
527 )
528 else:
529 res.raise_for_status()
530 except requests.RequestException as err:
531 msg = f"Could not update attribute '{attr_name}' of entity {entity_id}"
532 self.log_error(err=err, msg=msg)
533 raise
535 def append_entity_attributes(
536 self,
537 entity: Union[ContextLDEntity, ContextLDEntityKeyValues],
538 options: Optional[str] = None,
539 ):
540 """
541 Append new Entity attributes to an existing Entity within an NGSI-LD system
542 Args:
543 entity (ContextLDEntity):
544 Entity to append attributes to.
545 options (str):
546 Options for the request. The only available value is
547 'noOverwrite'. If set, it will raise 400, if all attributes
548 exist already.
550 """
551 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity.id}/attrs")
552 headers = self.headers.copy()
553 if entity.model_dump().get("@context", None) is not None:
554 headers.update({"Content-Type": "application/ld+json"})
555 headers.update({"Link": None})
556 params = {}
558 if options:
559 if options != "noOverwrite":
560 raise ValueError(f"The only available value is 'noOverwrite'")
561 params.update({"options": options})
563 try:
564 res = self.post(
565 url=url,
566 headers=headers,
567 params=params,
568 json=entity.model_dump(exclude={"id", "type"}, exclude_none=True),
569 )
570 if res.ok:
571 self.logger.info(f"Entity {entity.id} successfully updated!")
572 else:
573 res.raise_for_status()
574 except requests.RequestException as err:
575 msg = f"Could not update entity {entity.id}!"
576 self.log_error(err=err, msg=msg)
577 raise
579 # def update_existing_attribute_by_name(self, entity: ContextLDEntity
580 # ):
581 # pass
583 def delete_entity_by_id(self, entity_id: str, entity_type: Optional[str] = None):
584 """
585 Deletes an entity by its id. For deleting mulitple entities at once,
586 entity_batch_operation() is more efficient.
587 Args:
588 entity_id:
589 ID of entity to delete.
590 entity_type:
591 Type of entity to delete.
592 """
593 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}")
594 headers = self.headers.copy()
595 params = {}
597 if entity_type:
598 params.update({"type": entity_type})
600 try:
601 res = self.delete(url=url, headers=headers, params=params)
602 if res.ok:
603 self.logger.info(f"Entity {entity_id} successfully deleted")
604 else:
605 res.raise_for_status()
606 except requests.RequestException as err:
607 msg = f"Could not delete entity {entity_id}"
608 self.log_error(err=err, msg=msg)
609 raise
611 def delete_attribute(self, entity_id: str, attribute_id: str):
612 """
613 Deletes an attribute from an entity.
614 Args:
615 entity_id:
616 ID of the entity.
617 attribute_id:
618 Name of the attribute to delete.
619 Returns:
621 """
622 url = urljoin(
623 self.base_url,
624 f"{self._url_version}/entities/{entity_id}/attrs/{attribute_id}",
625 )
626 headers = self.headers.copy()
628 try:
629 res = self.delete(url=url, headers=headers)
630 if res.ok:
631 self.logger.info(
632 f"Attribute {attribute_id} of Entity {entity_id} successfully deleted"
633 )
634 else:
635 res.raise_for_status()
636 except requests.RequestException as err:
637 msg = f"Could not delete attribute {attribute_id} of entity {entity_id}"
638 self.log_error(err=err, msg=msg)
639 raise
641 # SUBSCRIPTION API ENDPOINTS
642 def get_subscription_list(self, limit: PositiveInt = inf) -> List[SubscriptionLD]:
643 """
644 Returns a list of all the subscriptions present in the system.
645 Args:
646 limit: Limit the number of subscriptions to be retrieved
647 Returns:
648 list of subscriptions
649 """
650 url = urljoin(self.base_url, f"{self._url_version}/subscriptions/")
651 headers = self.headers.copy()
652 params = {}
654 # We always use the 'count' option to check weather pagination is
655 # required
656 params.update({"options": "count"})
657 try:
658 items = self.__pagination(
659 limit=limit, url=url, params=params, headers=headers
660 )
661 adapter = TypeAdapter(List[SubscriptionLD])
662 return adapter.validate_python(items)
663 except requests.RequestException as err:
664 msg = "Could not load subscriptions!"
665 self.log_error(err=err, msg=msg)
666 raise
668 def post_subscription(
669 self, subscription: SubscriptionLD, update: bool = False
670 ) -> str:
671 """
672 Creates a new subscription. The subscription is represented by a
673 Subscription object defined in filip.cb.models.
675 If the subscription already exists, the adding is prevented and the id
676 of the existing subscription is returned.
678 A subscription is deemed as already existing if there exists a
679 subscription with the exact same subject and notification fields. All
680 optional fields are not considered.
682 Args:
683 subscription: Subscription
684 update: True - If the subscription already exists, update it
685 False- If the subscription already exists, throw warning
686 Returns:
687 str: Id of the (created) subscription
689 """
690 existing_subscriptions = self.get_subscription_list()
692 sub_hash = subscription.model_dump_json(
693 include={"subject", "notification", "type"}
694 )
695 for ex_sub in existing_subscriptions:
696 if sub_hash == ex_sub.model_dump_json(
697 include={"subject", "notification", "type"}
698 ):
699 self.logger.info("Subscription already exists")
700 if update:
701 self.logger.info("Updated subscription")
702 subscription.id = ex_sub.id
703 self.update_subscription(subscription)
704 else:
705 self.logger.warning(
706 f"Subscription existed already with the id" f" {ex_sub.id}"
707 )
708 return ex_sub.id
710 url = urljoin(self.base_url, f"{self._url_version}/subscriptions")
711 headers = self.headers.copy()
712 if subscription.model_dump().get("@context", None) is not None:
713 headers.update({"Content-Type": "application/ld+json"})
714 headers.update({"Link": None})
715 try:
716 res = self.post(
717 url=url,
718 headers=headers,
719 data=subscription.model_dump_json(
720 exclude_unset=False, exclude_defaults=False, exclude_none=True
721 ),
722 )
723 if res.ok:
724 self.logger.info("Subscription successfully created!")
725 return res.headers["Location"].split("/")[-1]
726 res.raise_for_status()
727 except requests.RequestException as err:
728 msg = "Could not send subscription!"
729 self.log_error(err=err, msg=msg)
730 raise
732 def get_subscription(self, subscription_id: str) -> SubscriptionLD:
733 """
734 Retrieves a subscription from the context broker.
735 Args:
736 subscription_id: id of the subscription
738 Returns:
740 """
741 url = urljoin(
742 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}"
743 )
744 headers = self.headers.copy()
745 try:
746 res = self.get(url=url, headers=headers)
747 if res.ok:
748 self.logger.debug("Received: %s", res.json())
749 return SubscriptionLD(**res.json())
750 res.raise_for_status()
751 except requests.RequestException as err:
752 msg = f"Could not load subscription {subscription_id}"
753 self.log_error(err=err, msg=msg)
754 raise
756 def update_subscription(self, subscription: SubscriptionLD) -> None:
757 """
758 Only the fields included in the request are updated in the subscription.
759 Args:
760 subscription: Subscription to update
761 Returns:
763 """
764 url = urljoin(
765 self.base_url, f"{self._url_version}/subscriptions/{subscription.id}"
766 )
767 headers = self.headers.copy()
768 if subscription.model_dump().get("@context", None) is not None:
769 headers.update({"Content-Type": "application/ld+json"})
770 headers.update({"Link": None})
771 try:
772 res = self.patch(
773 url=url,
774 headers=headers,
775 data=subscription.model_dump_json(
776 exclude={"id"},
777 exclude_none=True,
778 ),
779 )
780 if res.ok:
781 self.logger.info("Subscription successfully updated!")
782 else:
783 res.raise_for_status()
784 except requests.RequestException as err:
785 msg = f"Could not update subscription {subscription.id}"
786 self.log_error(err=err, msg=msg)
787 raise
789 def delete_subscription(self, subscription_id: str) -> None:
790 """
791 Deletes a subscription from a Context Broker
792 Args:
793 subscription_id: id of the subscription
794 """
795 url = urljoin(
796 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}"
797 )
798 headers = self.headers.copy()
799 try:
800 res = self.delete(url=url, headers=headers)
801 if res.ok:
802 self.logger.info(
803 f"Subscription '{subscription_id}' " f"successfully deleted!"
804 )
805 else:
806 res.raise_for_status()
807 except requests.RequestException as err:
808 msg = f"Could not delete subscription {subscription_id}"
809 self.log_error(err=err, msg=msg)
810 raise
812 def log_multi_errors(self, errors: List[Dict]) -> None:
813 for error in errors:
814 entity_id = error["entityId"]
815 error_details: dict = error["error"]
816 error_title = error_details.get("title")
817 error_status = error_details.get("status")
818 # error_detail = error_details['detail']
819 self.logger.error(
820 "Response status: %d, Entity: %s, Reason: %s",
821 error_status,
822 entity_id,
823 error_title,
824 )
826 def handle_multi_status_response(self, res: requests.Response):
827 """
828 Handles the response of a batch_operation. If the response contains
829 errors, they are logged. If the response contains only errors, a RuntimeError
830 is raised.
831 Args:
832 res:
834 Returns:
836 """
837 try:
838 res.raise_for_status()
839 if res.text:
840 response_data = res.json()
841 if "errors" in response_data:
842 errors = response_data["errors"]
843 self.log_multi_errors(errors)
844 if "success" in response_data:
845 successList = response_data["success"]
846 if len(successList) == 0:
847 raise RuntimeError(
848 "Batch operation resulted in errors only, see logs"
849 )
850 else:
851 self.logger.info("Empty response received.")
852 except json.JSONDecodeError:
853 self.logger.info(
854 "Error decoding JSON. Response may not be in valid JSON format."
855 )
857 # Batch operation API
858 def entity_batch_operation(
859 self,
860 *,
861 entities: List[Union[ContextLDEntity, ContextLDEntityKeyValues]],
862 action_type: Union[ActionTypeLD, str],
863 options: Literal["noOverwrite", "replace", "update"] = None,
864 ) -> None:
865 """
866 This operation allows to create, update and/or delete several entities
867 in a single batch operation.
869 This operation is split in as many individual operations as entities
870 in the entities vector, so the actionType is executed for each one of
871 them. Depending on the actionType, a mapping with regular non-batch
872 operations can be done:
874 append: maps to POST /v2/entities (if the entity does not already exist)
875 or POST /v2/entities/<id>/attrs (if the entity already exists).
877 appendStrict: maps to POST /v2/entities (if the entity does not
878 already exist) or POST /v2/entities/<id>/attrs?options=append (if the
879 entity already exists).
881 update: maps to PATCH /v2/entities/<id>/attrs.
883 delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every
884 attribute included in the entity or to DELETE /v2/entities/<id> if
885 no attribute were included in the entity.
887 replace: maps to PUT /v2/entities/<id>/attrs.
889 Args:
890 entities: "an array of entities, each entity specified using the "
891 "JSON entity representation format "
892 action_type (Update): "actionType, to specify the kind of update
893 action to do: either append, appendStrict, update, delete,
894 or replace. "
895 options (str): Optional 'noOverwrite' 'replace' 'update'
897 Returns:
899 """
901 url = urljoin(
902 self.base_url, f"{self._url_version}/entityOperations/{action_type.value}"
903 )
904 headers = self.headers.copy()
905 headers.update({"Content-Type": "application/json"})
906 params = {}
907 if options:
908 params.update({"options": options})
909 update = UpdateLD(entities=entities)
910 try:
911 if action_type == ActionTypeLD.DELETE:
912 id_list = [entity.id for entity in entities]
913 res = self.post(
914 url=url, headers=headers, params=params, data=json.dumps(id_list)
915 )
916 else:
917 res = self.post(
918 url=url,
919 headers=headers,
920 params=params,
921 data=json.dumps(
922 update.model_dump(
923 by_alias=True,
924 exclude_none=True,
925 ).get("entities")
926 ),
927 )
928 self.handle_multi_status_response(res)
929 except RuntimeError as rerr:
930 raise rerr
931 except Exception as err:
932 raise err
933 else:
934 self.logger.info(f"Update operation {action_type} succeeded!")