Coverage for filip/clients/ngsi_ld/cb.py: 82%
316 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-03-10 13:43 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-03-10 13:43 +0000
1"""
2Context Broker Module for API Client
3"""
5import re
6import json
7import os
8from math import inf
9from typing import Any, Dict, List, Union, Optional, Literal
10from urllib.parse import urljoin
11import requests
12from pydantic import TypeAdapter, PositiveInt, PositiveFloat
13from filip.clients.base_http_client import BaseHttpClient, NgsiURLVersion
14from filip.config import settings
15from filip.models.base import FiwareLDHeader, PaginationMethod, core_context
16from filip.models.ngsi_v2.base import AttrsFormat
17from filip.models.ngsi_ld.subscriptions import SubscriptionLD
18from filip.models.ngsi_ld.context import (
19 ContextLDEntity,
20 ContextLDEntityKeyValues,
21 ContextProperty,
22 ContextRelationship,
23 NamedContextProperty,
24 NamedContextRelationship,
25 ActionTypeLD,
26 UpdateLD,
27)
28from filip.models.ngsi_v2.context import Query
31class ContextBrokerLDClient(BaseHttpClient):
32 """
33 Implementation of NGSI-LD Context Broker functionalities, such as creating
34 entities and subscriptions; retrieving, updating and deleting data.
35 Further documentation:
36 https://fiware-orion.readthedocs.io/en/master/
38 Api specifications for LD are located here:
39 https://www.etsi.org/deliver/etsi_gs/CIM/001_099/009/01.04.01_60/gs_cim009v010401p.pdf
40 """
42 def __init__(
43 self,
44 url: str = None,
45 *,
46 session: requests.Session = None,
47 fiware_header: FiwareLDHeader = None,
48 **kwargs,
49 ):
50 """
52 Args:
53 url: Url of context broker server
54 session (requests.Session):
55 fiware_header (FiwareHeader): fiware service and fiware service path
56 **kwargs (Optional): Optional arguments that ``request`` takes.
57 """
58 # set service url
59 url = url or settings.LD_CB_URL
60 # base_http_client overwrites empty header with FiwareHeader instead of FiwareLD
61 init_header = fiware_header if fiware_header else FiwareLDHeader()
62 if init_header.link_header is None:
63 init_header.set_context(core_context)
64 super().__init__(url=url, session=session, fiware_header=init_header, **kwargs)
65 # set the version specific url-pattern
66 self._url_version = NgsiURLVersion.ld_url.value
67 # For uplink requests, the Content-Type header is essential,
68 # Accept will be ignored
69 # For downlink requests, the Accept header is essential,
70 # Content-Type will be ignored
72 # default uplink content JSON
73 self.headers.update({"Content-Type": "application/json"})
74 # default downlink content JSON-LD
75 self.headers.update({"Accept": "application/ld+json"})
77 if init_header.ngsild_tenant is not None:
78 self.__make_tenant()
80 def __pagination(
81 self,
82 *,
83 method: PaginationMethod = PaginationMethod.GET,
84 url: str,
85 headers: Dict,
86 limit: Union[PositiveInt, PositiveFloat] = None,
87 params: Dict = None,
88 data: str = None,
89 ) -> List[Dict]:
90 """
91 NGSIv2 implements a pagination mechanism in order to help clients to
92 retrieve large sets of resources. This mechanism works for all listing
93 operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions,
94 POST /v2/op/query, etc.). This function helps getting datasets that are
95 larger than the limit for the different GET operations.
97 https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html
99 Args:
100 url: Information about the url, obtained from the original function
101 headers: The headers from the original function
102 params:
103 limit:
105 Returns:
106 object:
108 """
109 if limit is None:
110 limit = inf
111 if limit > 1000:
112 params["limit"] = 1000 # maximum items per request
113 else:
114 params["limit"] = limit
116 if self.session:
117 session = self.session
118 else:
119 session = requests.Session()
120 with session:
121 res = session.request(
122 method=method, url=url, params=params, headers=headers, data=data
123 )
124 if res.ok:
125 items = res.json()
126 # do pagination
127 if self._url_version == NgsiURLVersion.v2_url.value:
128 count = int(res.headers["Fiware-Total-Count"])
129 elif self._url_version == NgsiURLVersion.ld_url.value:
130 count = int(res.headers["NGSILD-Results-Count"])
131 else:
132 count = 0
134 while len(items) < limit and len(items) < count:
135 # Establishing the offset from where entities are retrieved
136 params["offset"] = len(items)
137 params["limit"] = min(1000, (limit - len(items)))
138 res = session.request(
139 method=method,
140 url=url,
141 params=params,
142 headers=headers,
143 data=data,
144 )
145 if res.ok:
146 items.extend(res.json())
147 else:
148 res.raise_for_status()
149 self.logger.debug("Received: %s", items)
150 return items
151 res.raise_for_status()
153 def get_version(self) -> Dict:
154 """
155 Gets version of Orion-LD context broker
156 Returns:
157 Dictionary with response
158 """
159 url = urljoin(self.base_url, "/version")
160 try:
161 res = self.get(url=url)
162 if res.ok:
163 return res.json()
164 res.raise_for_status()
165 except requests.RequestException as err:
166 self.logger.error(err)
167 raise
169 def __make_tenant(self):
170 """
171 Create tenant if tenant
172 is given in headers
173 """
174 idhex = f"urn:ngsi-ld:{os.urandom(6).hex()}"
175 e = ContextLDEntity(id=idhex, type=f"urn:ngsi-ld:{os.urandom(6).hex()}")
176 try:
177 self.post_entity(entity=e)
178 self.delete_entity_by_id(idhex)
179 except Exception as err:
180 self.log_error(err=err, msg="Error while creating tenant")
181 raise
183 def get_statistics(self) -> Dict:
184 """
185 Gets statistics of context broker
186 Returns:
187 Dictionary with response
188 """
189 url = urljoin(self.base_url, "statistics")
190 try:
191 res = self.get(url=url)
192 if res.ok:
193 return res.json()
194 res.raise_for_status()
195 except requests.RequestException as err:
196 self.logger.error(err)
197 raise
199 def post_entity(
200 self,
201 entity: Union[ContextLDEntity,ContextLDEntityKeyValues],
202 append: bool = False,
203 update: bool = False
204 ):
205 """
206 Function registers an Object with the NGSI-LD Context Broker,
207 if it already exists it can be automatically updated
208 if the update flag bool is True.
209 First a post request with the entity is tried, if the response code
210 is 422 the entity is uncrossable, as it already exists there are two
211 options, either overwrite it, if the attribute have changed
212 (e.g. at least one new/new values) (update = True) or leave
213 it the way it is (update=False)
215 """
216 url = urljoin(self.base_url, f"{self._url_version}/entities")
217 headers = self.headers.copy()
218 if entity.model_dump().get("@context", None) is not None:
219 headers.update({"Content-Type": "application/ld+json"})
220 headers.update({"Link": None})
221 try:
222 res = self.post(
223 url=url,
224 headers=headers,
225 json=entity.model_dump(
226 exclude_unset=True, exclude_defaults=True, exclude_none=True
227 ),
228 )
229 if res.ok:
230 self.logger.info("Entity successfully posted!")
231 return res.headers.get("Location")
232 res.raise_for_status()
233 except requests.RequestException as err:
234 if err.response is not None and err.response.status_code == 409:
235 if append: # 409 entity already exists
236 return self.append_entity_attributes(entity=entity)
237 elif update:
238 return self.override_entities(entities=[entity])
239 msg = f"Could not post entity {entity.id}"
240 self.log_error(err=err, msg=msg)
241 raise
243 def override_entities(self, entities: List[Union[ContextLDEntity,ContextLDEntityKeyValues]]):
244 """
245 Function to create or override existing entites with the NGSI-LD Context Broker.
246 The batch operation with Upsert will be used.
247 """
248 return self.entity_batch_operation(
249 entities=entities, action_type=ActionTypeLD.UPSERT, options="replace"
250 )
252 def get_entity(
253 self,
254 entity_id: str,
255 entity_type: str = None,
256 attrs: List[str] = None,
257 options: Optional[str] = None,
258 geometryProperty: Optional[str] = None,
259 ) -> Union[ContextLDEntity, ContextLDEntityKeyValues, Dict[str, Any]]:
260 """
261 This operation must return one entity element only, but there may be
262 more than one entity with the same ID (e.g. entities with same ID but
263 different types). In such case, an error message is returned, with
264 the HTTP status code set to 409 Conflict.
266 Args:
267 entity_id (String): Id of the entity to be retrieved
268 entity_type (String): Entity type, to avoid ambiguity in case
269 there are several entities with the same entity id.
270 attrs (List of Strings): List of attribute names whose data must be
271 included in the response. The attributes are retrieved in the
272 order specified by this parameter.
273 See "Filtering out attributes and metadata" section for more
274 detail. If this parameter is not included, the attributes are
275 retrieved in arbitrary order, and all the attributes of the
276 entity are included in the response.
277 Example: temperature, humidity.
278 options (String): keyValues (simplified representation of entity)
279 or sysAttrs (include generated attrs createdAt and modifiedAt)
280 geometryProperty (String): Name of a GeoProperty. In the case of GeoJSON
281 Entity representation, this parameter indicates which GeoProperty to
282 use for the "geometry" element. By default, it shall be 'location'.
283 Returns:
284 ContextEntity
285 """
286 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}")
287 headers = self.headers.copy()
288 params = {}
289 if entity_type:
290 params.update({"type": entity_type})
291 if attrs:
292 params.update({"attrs": ",".join(attrs)})
293 if geometryProperty:
294 params.update({"geometryProperty": geometryProperty})
295 if options:
296 if options != "keyValues" and options != "sysAttrs":
297 raise ValueError(
298 f"Only available options are 'keyValues' and 'sysAttrs'"
299 )
300 params.update({"options": options})
302 try:
303 res = self.get(url=url, params=params, headers=headers)
304 if res.ok:
305 self.logger.info("Entity successfully retrieved!")
306 self.logger.debug("Received: %s", res.json())
307 if options == "keyValues":
308 return ContextLDEntityKeyValues(**res.json())
309 else:
310 return ContextLDEntity(**res.json())
311 res.raise_for_status()
312 except requests.RequestException as err:
313 msg = f"Could not load entity {entity_id}"
314 self.log_error(err=err, msg=msg)
315 raise
317 GeometryShape = Literal[
318 "Point",
319 "MultiPoint",
320 "LineString",
321 "MultiLineString",
322 "Polygon",
323 "MultiPolygon",
324 ]
326 def get_entity_list(
327 self,
328 entity_id: Optional[str] = None,
329 id_pattern: Optional[str] = ".*",
330 entity_type: Optional[str] = None,
331 attrs: Optional[List[str]] = None,
332 q: Optional[str] = None,
333 georel: Optional[str] = None,
334 geometry: Optional[GeometryShape] = None,
335 coordinates: Optional[str] = None,
336 geoproperty: Optional[str] = None,
337 # csf: Optional[str] = None, # Context Source Filter
338 limit: Optional[PositiveInt] = None,
339 options: Optional[str] = None,
340 ) -> List[Union[ContextLDEntity, ContextLDEntityKeyValues]]:
341 """
342 This operation retrieves a list of entities based on different query options.
343 By default, the operation retrieves all the entities in the context broker.
344 Args:
345 entity_id:
346 Id of the entity to be retrieved
347 id_pattern:
348 Regular expression to match the entity id
349 entity_type:
350 Entity type, to avoid ambiguity in case there are several
351 entities with the same entity id.
352 attrs:
353 List of attribute names whose data must be included in the response.
354 q:
355 Query expression, composed of attribute names, operators and values.
356 georel:
357 Geospatial relationship between the query geometry and the entities.
358 geometry:
359 Type of geometry for the query. The possible values are Point,
360 MultiPoint, LineString, MultiLineString, Polygon, MultiPolygon.
361 coordinates:
362 Coordinates of the query geometry. The coordinates must be
363 expressed as a string of comma-separated values.
364 geoproperty:
365 Name of a GeoProperty. In the case of GeoJSON Entity representation,
366 this parameter indicates which GeoProperty to use for the "geometry" element.
367 limit:
368 Maximum number of entities to retrieve.
369 options:
370 Further options for the query. The available options are:
371 - keyValues (simplified representation of entity)
372 - sysAttrs (including createdAt and modifiedAt, etc.)
373 - count (include number of all matched entities in response header)
374 """
375 url = urljoin(self.base_url, f"{self._url_version}/entities/")
376 headers = self.headers.copy()
377 params = {}
378 if entity_id:
379 params.update({"id": entity_id})
380 if id_pattern:
381 params.update({"idPattern": id_pattern})
382 if entity_type:
383 params.update({"type": entity_type})
384 if attrs:
385 params.update({"attrs": ",".join(attrs)})
386 if q:
387 x = re.search(r"[=!<>~]{1}\'.*\'", q.replace(" ", ""))
388 if x is not None:
389 raise ValueError(
390 f"String/Date/etc. value in {x.group()} must be " f"in double quote"
391 )
392 params.update({"q": q})
393 if georel:
394 params.update({"georel": georel})
395 if geometry:
396 params.update({"geometry": geometry})
397 if coordinates:
398 params.update({"coordinates": coordinates})
399 if geoproperty:
400 params.update({"geoproperty": geoproperty})
401 # if csf: # ContextSourceRegistration not supported yet
402 # params.update({'csf': csf})
403 if limit:
404 if limit > 1000:
405 raise ValueError("limit must be an integer value <= 1000")
406 params.update({"limit": limit})
407 if options:
408 if options != "keyValues" and options != "sysAttrs":
409 raise ValueError(
410 f"Only available options are 'keyValues' and 'sysAttrs'"
411 )
412 params.update({"options": options})
413 # params.update({'local': 'true'})
415 try:
416 res = self.get(url=url, params=params, headers=headers)
417 if res.ok:
418 self.logger.info("Entity successfully retrieved!")
419 entity_list: List[Union[ContextLDEntity, ContextLDEntityKeyValues]] = []
420 if options == "keyValues":
421 entity_list = [
422 ContextLDEntityKeyValues(**item) for item in res.json()
423 ]
424 return entity_list
425 else:
426 entity_list = [ContextLDEntity(**item) for item in res.json()]
427 return entity_list
428 res.raise_for_status()
429 except requests.RequestException as err:
430 msg = f"Could not load entity matching{params}"
431 self.log_error(err=err, msg=msg)
432 raise
434 def replace_existing_attributes_of_entity(
435 self, entity: Union[ContextLDEntity,ContextLDEntityKeyValues], append: bool = False
436 ):
437 """
438 The attributes previously existing in the entity are removed and
439 replaced by the ones in the request.
441 Args:
442 entity (ContextEntity):
443 append (bool):
444 options:
445 Returns:
447 """
448 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity.id}/attrs")
449 headers = self.headers.copy()
450 if entity.model_dump().get("@context", None) is not None:
451 headers.update({"Content-Type": "application/ld+json"})
452 headers.update({"Link": None})
453 try:
454 res = self.patch(
455 url=url,
456 headers=headers,
457 json=entity.model_dump(
458 exclude={"id", "type"}, exclude_unset=True, exclude_none=True
459 ),
460 )
461 if res.ok:
462 self.logger.info(f"Entity {entity.id} successfully " "updated!")
463 else:
464 res.raise_for_status()
465 except requests.RequestException as err:
466 if err.response is not None and append and err.response.status_code == 207:
467 return self.append_entity_attributes(entity=entity)
468 msg = f"Could not replace attribute of entity {entity.id} !"
469 self.log_error(err=err, msg=msg)
470 raise
472 def update_entity_attribute(
473 self,
474 entity_id: str,
475 attr: Union[
476 ContextProperty,
477 ContextRelationship,
478 NamedContextProperty,
479 NamedContextRelationship,
480 ],
481 attr_name: str = None,
482 ):
483 """
484 Updates a specified attribute from an entity.
485 Args:
486 attr: context attribute to update
487 entity_id: Id of the entity. Example: Bcn_Welt
488 entity_type: Entity type, to avoid ambiguity in case there are
489 several entities with the same entity id.
490 """
491 headers = self.headers.copy()
492 if not isinstance(attr, NamedContextProperty) or not isinstance(
493 attr, NamedContextRelationship
494 ):
495 assert attr_name is not None, (
496 "Missing name for attribute. "
497 "attr_name must be present if"
498 "attr is of type ContextAttribute"
499 )
500 else:
501 assert attr_name is None, (
502 "Invalid argument attr_name. Do not set "
503 "attr_name if attr is of type "
504 "NamedContextAttribute or NamedContextRelationship"
505 )
507 url = urljoin(
508 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}"
509 )
511 jsonnn = {}
512 if isinstance(attr, list) or isinstance(attr, NamedContextProperty):
513 jsonnn = attr.model_dump(
514 exclude={"name"}, exclude_unset=True, exclude_none=True
515 )
516 else:
517 prop = attr.model_dump()
518 for key, value in prop.items():
519 if value and value != "Property":
520 jsonnn[key] = value
522 try:
523 res = self.patch(url=url, headers=headers, json=jsonnn)
524 if res.ok:
525 self.logger.info(
526 f"Attribute {attr_name} of {entity_id} successfully updated!"
527 )
528 else:
529 res.raise_for_status()
530 except requests.RequestException as err:
531 msg = f"Could not update attribute '{attr_name}' of entity {entity_id}"
532 self.log_error(err=err, msg=msg)
533 raise
535 def append_entity_attributes(
536 self, entity: Union[ContextLDEntity,ContextLDEntityKeyValues], options: Optional[str] = None
537 ):
538 """
539 Append new Entity attributes to an existing Entity within an NGSI-LD system
540 Args:
541 entity (ContextLDEntity):
542 Entity to append attributes to.
543 options (str):
544 Options for the request. The only available value is
545 'noOverwrite'. If set, it will raise 400, if all attributes
546 exist already.
548 """
549 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity.id}/attrs")
550 headers = self.headers.copy()
551 if entity.model_dump().get("@context", None) is not None:
552 headers.update({"Content-Type": "application/ld+json"})
553 headers.update({"Link": None})
554 params = {}
556 if options:
557 if options != "noOverwrite":
558 raise ValueError(f"The only available value is 'noOverwrite'")
559 params.update({"options": options})
561 try:
562 res = self.post(
563 url=url,
564 headers=headers,
565 params=params,
566 json=entity.model_dump(
567 exclude={"id", "type"}, exclude_unset=True, exclude_none=True
568 ),
569 )
570 if res.ok:
571 self.logger.info(f"Entity {entity.id} successfully updated!")
572 else:
573 res.raise_for_status()
574 except requests.RequestException as err:
575 msg = f"Could not update entity {entity.id}!"
576 self.log_error(err=err, msg=msg)
577 raise
579 # def update_existing_attribute_by_name(self, entity: ContextLDEntity
580 # ):
581 # pass
583 def delete_entity_by_id(self, entity_id: str, entity_type: Optional[str] = None):
584 """
585 Deletes an entity by its id. For deleting mulitple entities at once,
586 entity_batch_operation() is more efficient.
587 Args:
588 entity_id:
589 ID of entity to delete.
590 entity_type:
591 Type of entity to delete.
592 """
593 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}")
594 headers = self.headers.copy()
595 params = {}
597 if entity_type:
598 params.update({"type": entity_type})
600 try:
601 res = self.delete(url=url, headers=headers, params=params)
602 if res.ok:
603 self.logger.info(f"Entity {entity_id} successfully deleted")
604 else:
605 res.raise_for_status()
606 except requests.RequestException as err:
607 msg = f"Could not delete entity {entity_id}"
608 self.log_error(err=err, msg=msg)
609 raise
611 def delete_attribute(self, entity_id: str, attribute_id: str):
612 """
613 Deletes an attribute from an entity.
614 Args:
615 entity_id:
616 ID of the entity.
617 attribute_id:
618 Name of the attribute to delete.
619 Returns:
621 """
622 url = urljoin(
623 self.base_url,
624 f"{self._url_version}/entities/{entity_id}/attrs/{attribute_id}",
625 )
626 headers = self.headers.copy()
628 try:
629 res = self.delete(url=url, headers=headers)
630 if res.ok:
631 self.logger.info(
632 f"Attribute {attribute_id} of Entity {entity_id} successfully deleted"
633 )
634 else:
635 res.raise_for_status()
636 except requests.RequestException as err:
637 msg = f"Could not delete attribute {attribute_id} of entity {entity_id}"
638 self.log_error(err=err, msg=msg)
639 raise
641 # SUBSCRIPTION API ENDPOINTS
642 def get_subscription_list(self, limit: PositiveInt = inf) -> List[SubscriptionLD]:
643 """
644 Returns a list of all the subscriptions present in the system.
645 Args:
646 limit: Limit the number of subscriptions to be retrieved
647 Returns:
648 list of subscriptions
649 """
650 url = urljoin(self.base_url, f"{self._url_version}/subscriptions/")
651 headers = self.headers.copy()
652 params = {}
654 # We always use the 'count' option to check weather pagination is
655 # required
656 params.update({"options": "count"})
657 try:
658 items = self.__pagination(
659 limit=limit, url=url, params=params, headers=headers
660 )
661 adapter = TypeAdapter(List[SubscriptionLD])
662 return adapter.validate_python(items)
663 except requests.RequestException as err:
664 msg = "Could not load subscriptions!"
665 self.log_error(err=err, msg=msg)
666 raise
668 def post_subscription(
669 self, subscription: SubscriptionLD, update: bool = False
670 ) -> str:
671 """
672 Creates a new subscription. The subscription is represented by a
673 Subscription object defined in filip.cb.models.
675 If the subscription already exists, the adding is prevented and the id
676 of the existing subscription is returned.
678 A subscription is deemed as already existing if there exists a
679 subscription with the exact same subject and notification fields. All
680 optional fields are not considered.
682 Args:
683 subscription: Subscription
684 update: True - If the subscription already exists, update it
685 False- If the subscription already exists, throw warning
686 Returns:
687 str: Id of the (created) subscription
689 """
690 existing_subscriptions = self.get_subscription_list()
692 sub_hash = subscription.model_dump_json(
693 include={"subject", "notification", "type"}
694 )
695 for ex_sub in existing_subscriptions:
696 if sub_hash == ex_sub.model_dump_json(
697 include={"subject", "notification", "type"}
698 ):
699 self.logger.info("Subscription already exists")
700 if update:
701 self.logger.info("Updated subscription")
702 subscription.id = ex_sub.id
703 self.update_subscription(subscription)
704 else:
705 self.logger.warning(
706 f"Subscription existed already with the id" f" {ex_sub.id}"
707 )
708 return ex_sub.id
710 url = urljoin(self.base_url, f"{self._url_version}/subscriptions")
711 headers = self.headers.copy()
712 if subscription.model_dump().get("@context", None) is not None:
713 headers.update({"Content-Type": "application/ld+json"})
714 headers.update({"Link": None})
715 try:
716 res = self.post(
717 url=url,
718 headers=headers,
719 data=subscription.model_dump_json(
720 exclude_unset=False, exclude_defaults=False, exclude_none=True
721 ),
722 )
723 if res.ok:
724 self.logger.info("Subscription successfully created!")
725 return res.headers["Location"].split("/")[-1]
726 res.raise_for_status()
727 except requests.RequestException as err:
728 msg = "Could not send subscription!"
729 self.log_error(err=err, msg=msg)
730 raise
732 def get_subscription(self, subscription_id: str) -> SubscriptionLD:
733 """
734 Retrieves a subscription from the context broker.
735 Args:
736 subscription_id: id of the subscription
738 Returns:
740 """
741 url = urljoin(
742 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}"
743 )
744 headers = self.headers.copy()
745 try:
746 res = self.get(url=url, headers=headers)
747 if res.ok:
748 self.logger.debug("Received: %s", res.json())
749 return SubscriptionLD(**res.json())
750 res.raise_for_status()
751 except requests.RequestException as err:
752 msg = f"Could not load subscription {subscription_id}"
753 self.log_error(err=err, msg=msg)
754 raise
756 def update_subscription(self, subscription: SubscriptionLD) -> None:
757 """
758 Only the fields included in the request are updated in the subscription.
759 Args:
760 subscription: Subscription to update
761 Returns:
763 """
764 url = urljoin(
765 self.base_url, f"{self._url_version}/subscriptions/{subscription.id}"
766 )
767 headers = self.headers.copy()
768 if subscription.model_dump().get("@context", None) is not None:
769 headers.update({"Content-Type": "application/ld+json"})
770 headers.update({"Link": None})
771 try:
772 res = self.patch(
773 url=url,
774 headers=headers,
775 data=subscription.model_dump_json(
776 exclude={"id"},
777 exclude_unset=True,
778 exclude_defaults=True,
779 exclude_none=True,
780 ),
781 )
782 if res.ok:
783 self.logger.info("Subscription successfully updated!")
784 else:
785 res.raise_for_status()
786 except requests.RequestException as err:
787 msg = f"Could not update subscription {subscription.id}"
788 self.log_error(err=err, msg=msg)
789 raise
791 def delete_subscription(self, subscription_id: str) -> None:
792 """
793 Deletes a subscription from a Context Broker
794 Args:
795 subscription_id: id of the subscription
796 """
797 url = urljoin(
798 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}"
799 )
800 headers = self.headers.copy()
801 try:
802 res = self.delete(url=url, headers=headers)
803 if res.ok:
804 self.logger.info(
805 f"Subscription '{subscription_id}' " f"successfully deleted!"
806 )
807 else:
808 res.raise_for_status()
809 except requests.RequestException as err:
810 msg = f"Could not delete subscription {subscription_id}"
811 self.log_error(err=err, msg=msg)
812 raise
814 def log_multi_errors(self, errors: List[Dict]) -> None:
815 for error in errors:
816 entity_id = error["entityId"]
817 error_details: dict = error["error"]
818 error_title = error_details.get("title")
819 error_status = error_details.get("status")
820 # error_detail = error_details['detail']
821 self.logger.error(
822 "Response status: %d, Entity: %s, Reason: %s",
823 error_status,
824 entity_id,
825 error_title,
826 )
828 def handle_multi_status_response(self, res: requests.Response):
829 """
830 Handles the response of a batch_operation. If the response contains
831 errors, they are logged. If the response contains only errors, a RuntimeError
832 is raised.
833 Args:
834 res:
836 Returns:
838 """
839 try:
840 res.raise_for_status()
841 if res.text:
842 response_data = res.json()
843 if "errors" in response_data:
844 errors = response_data["errors"]
845 self.log_multi_errors(errors)
846 if "success" in response_data:
847 successList = response_data["success"]
848 if len(successList) == 0:
849 raise RuntimeError(
850 "Batch operation resulted in errors only, see logs"
851 )
852 else:
853 self.logger.info("Empty response received.")
854 except json.JSONDecodeError:
855 self.logger.info(
856 "Error decoding JSON. Response may not be in valid JSON format."
857 )
859 # Batch operation API
860 def entity_batch_operation(
861 self,
862 *,
863 entities: List[Union[ContextLDEntity,ContextLDEntityKeyValues]],
864 action_type: Union[ActionTypeLD, str],
865 options: Literal["noOverwrite", "replace", "update"] = None,
866 ) -> None:
867 """
868 This operation allows to create, update and/or delete several entities
869 in a single batch operation.
871 This operation is split in as many individual operations as entities
872 in the entities vector, so the actionType is executed for each one of
873 them. Depending on the actionType, a mapping with regular non-batch
874 operations can be done:
876 append: maps to POST /v2/entities (if the entity does not already exist)
877 or POST /v2/entities/<id>/attrs (if the entity already exists).
879 appendStrict: maps to POST /v2/entities (if the entity does not
880 already exist) or POST /v2/entities/<id>/attrs?options=append (if the
881 entity already exists).
883 update: maps to PATCH /v2/entities/<id>/attrs.
885 delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every
886 attribute included in the entity or to DELETE /v2/entities/<id> if
887 no attribute were included in the entity.
889 replace: maps to PUT /v2/entities/<id>/attrs.
891 Args:
892 entities: "an array of entities, each entity specified using the "
893 "JSON entity representation format "
894 action_type (Update): "actionType, to specify the kind of update
895 action to do: either append, appendStrict, update, delete,
896 or replace. "
897 options (str): Optional 'noOverwrite' 'replace' 'update'
899 Returns:
901 """
903 url = urljoin(
904 self.base_url, f"{self._url_version}/entityOperations/{action_type.value}"
905 )
906 headers = self.headers.copy()
907 headers.update({"Content-Type": "application/json"})
908 params = {}
909 if options:
910 params.update({"options": options})
911 update = UpdateLD(entities=entities)
912 try:
913 if action_type == ActionTypeLD.DELETE:
914 id_list = [entity.id for entity in entities]
915 res = self.post(
916 url=url, headers=headers, params=params, data=json.dumps(id_list)
917 )
918 else:
919 res = self.post(
920 url=url,
921 headers=headers,
922 params=params,
923 data=json.dumps(
924 update.model_dump(
925 by_alias=True,
926 exclude_unset=True,
927 exclude_none=True,
928 ).get("entities")
929 ),
930 )
931 self.handle_multi_status_response(res)
932 except RuntimeError as rerr:
933 raise rerr
934 except Exception as err:
935 raise err
936 else:
937 self.logger.info(f"Update operation {action_type} succeeded!")