Coverage for filip/clients/ngsi_ld/cb.py: 82%

316 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-03-10 13:43 +0000

1""" 

2Context Broker Module for API Client 

3""" 

4 

5import re 

6import json 

7import os 

8from math import inf 

9from typing import Any, Dict, List, Union, Optional, Literal 

10from urllib.parse import urljoin 

11import requests 

12from pydantic import TypeAdapter, PositiveInt, PositiveFloat 

13from filip.clients.base_http_client import BaseHttpClient, NgsiURLVersion 

14from filip.config import settings 

15from filip.models.base import FiwareLDHeader, PaginationMethod, core_context 

16from filip.models.ngsi_v2.base import AttrsFormat 

17from filip.models.ngsi_ld.subscriptions import SubscriptionLD 

18from filip.models.ngsi_ld.context import ( 

19 ContextLDEntity, 

20 ContextLDEntityKeyValues, 

21 ContextProperty, 

22 ContextRelationship, 

23 NamedContextProperty, 

24 NamedContextRelationship, 

25 ActionTypeLD, 

26 UpdateLD, 

27) 

28from filip.models.ngsi_v2.context import Query 

29 

30 

31class ContextBrokerLDClient(BaseHttpClient): 

32 """ 

33 Implementation of NGSI-LD Context Broker functionalities, such as creating 

34 entities and subscriptions; retrieving, updating and deleting data. 

35 Further documentation: 

36 https://fiware-orion.readthedocs.io/en/master/ 

37 

38 Api specifications for LD are located here: 

39 https://www.etsi.org/deliver/etsi_gs/CIM/001_099/009/01.04.01_60/gs_cim009v010401p.pdf 

40 """ 

41 

42 def __init__( 

43 self, 

44 url: str = None, 

45 *, 

46 session: requests.Session = None, 

47 fiware_header: FiwareLDHeader = None, 

48 **kwargs, 

49 ): 

50 """ 

51 

52 Args: 

53 url: Url of context broker server 

54 session (requests.Session): 

55 fiware_header (FiwareHeader): fiware service and fiware service path 

56 **kwargs (Optional): Optional arguments that ``request`` takes. 

57 """ 

58 # set service url 

59 url = url or settings.LD_CB_URL 

60 # base_http_client overwrites empty header with FiwareHeader instead of FiwareLD 

61 init_header = fiware_header if fiware_header else FiwareLDHeader() 

62 if init_header.link_header is None: 

63 init_header.set_context(core_context) 

64 super().__init__(url=url, session=session, fiware_header=init_header, **kwargs) 

65 # set the version specific url-pattern 

66 self._url_version = NgsiURLVersion.ld_url.value 

67 # For uplink requests, the Content-Type header is essential, 

68 # Accept will be ignored 

69 # For downlink requests, the Accept header is essential, 

70 # Content-Type will be ignored 

71 

72 # default uplink content JSON 

73 self.headers.update({"Content-Type": "application/json"}) 

74 # default downlink content JSON-LD 

75 self.headers.update({"Accept": "application/ld+json"}) 

76 

77 if init_header.ngsild_tenant is not None: 

78 self.__make_tenant() 

79 

80 def __pagination( 

81 self, 

82 *, 

83 method: PaginationMethod = PaginationMethod.GET, 

84 url: str, 

85 headers: Dict, 

86 limit: Union[PositiveInt, PositiveFloat] = None, 

87 params: Dict = None, 

88 data: str = None, 

89 ) -> List[Dict]: 

90 """ 

91 NGSIv2 implements a pagination mechanism in order to help clients to 

92 retrieve large sets of resources. This mechanism works for all listing 

93 operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions, 

94 POST /v2/op/query, etc.). This function helps getting datasets that are 

95 larger than the limit for the different GET operations. 

96 

97 https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html 

98 

99 Args: 

100 url: Information about the url, obtained from the original function 

101 headers: The headers from the original function 

102 params: 

103 limit: 

104 

105 Returns: 

106 object: 

107 

108 """ 

109 if limit is None: 

110 limit = inf 

111 if limit > 1000: 

112 params["limit"] = 1000 # maximum items per request 

113 else: 

114 params["limit"] = limit 

115 

116 if self.session: 

117 session = self.session 

118 else: 

119 session = requests.Session() 

120 with session: 

121 res = session.request( 

122 method=method, url=url, params=params, headers=headers, data=data 

123 ) 

124 if res.ok: 

125 items = res.json() 

126 # do pagination 

127 if self._url_version == NgsiURLVersion.v2_url.value: 

128 count = int(res.headers["Fiware-Total-Count"]) 

129 elif self._url_version == NgsiURLVersion.ld_url.value: 

130 count = int(res.headers["NGSILD-Results-Count"]) 

131 else: 

132 count = 0 

133 

134 while len(items) < limit and len(items) < count: 

135 # Establishing the offset from where entities are retrieved 

136 params["offset"] = len(items) 

137 params["limit"] = min(1000, (limit - len(items))) 

138 res = session.request( 

139 method=method, 

140 url=url, 

141 params=params, 

142 headers=headers, 

143 data=data, 

144 ) 

145 if res.ok: 

146 items.extend(res.json()) 

147 else: 

148 res.raise_for_status() 

149 self.logger.debug("Received: %s", items) 

150 return items 

151 res.raise_for_status() 

152 

153 def get_version(self) -> Dict: 

154 """ 

155 Gets version of Orion-LD context broker 

156 Returns: 

157 Dictionary with response 

158 """ 

159 url = urljoin(self.base_url, "/version") 

160 try: 

161 res = self.get(url=url) 

162 if res.ok: 

163 return res.json() 

164 res.raise_for_status() 

165 except requests.RequestException as err: 

166 self.logger.error(err) 

167 raise 

168 

169 def __make_tenant(self): 

170 """ 

171 Create tenant if tenant 

172 is given in headers 

173 """ 

174 idhex = f"urn:ngsi-ld:{os.urandom(6).hex()}" 

175 e = ContextLDEntity(id=idhex, type=f"urn:ngsi-ld:{os.urandom(6).hex()}") 

176 try: 

177 self.post_entity(entity=e) 

178 self.delete_entity_by_id(idhex) 

179 except Exception as err: 

180 self.log_error(err=err, msg="Error while creating tenant") 

181 raise 

182 

183 def get_statistics(self) -> Dict: 

184 """ 

185 Gets statistics of context broker 

186 Returns: 

187 Dictionary with response 

188 """ 

189 url = urljoin(self.base_url, "statistics") 

190 try: 

191 res = self.get(url=url) 

192 if res.ok: 

193 return res.json() 

194 res.raise_for_status() 

195 except requests.RequestException as err: 

196 self.logger.error(err) 

197 raise 

198 

199 def post_entity( 

200 self, 

201 entity: Union[ContextLDEntity,ContextLDEntityKeyValues], 

202 append: bool = False, 

203 update: bool = False 

204 ): 

205 """ 

206 Function registers an Object with the NGSI-LD Context Broker, 

207 if it already exists it can be automatically updated 

208 if the update flag bool is True. 

209 First a post request with the entity is tried, if the response code 

210 is 422 the entity is uncrossable, as it already exists there are two 

211 options, either overwrite it, if the attribute have changed 

212 (e.g. at least one new/new values) (update = True) or leave 

213 it the way it is (update=False) 

214 

215 """ 

216 url = urljoin(self.base_url, f"{self._url_version}/entities") 

217 headers = self.headers.copy() 

218 if entity.model_dump().get("@context", None) is not None: 

219 headers.update({"Content-Type": "application/ld+json"}) 

220 headers.update({"Link": None}) 

221 try: 

222 res = self.post( 

223 url=url, 

224 headers=headers, 

225 json=entity.model_dump( 

226 exclude_unset=True, exclude_defaults=True, exclude_none=True 

227 ), 

228 ) 

229 if res.ok: 

230 self.logger.info("Entity successfully posted!") 

231 return res.headers.get("Location") 

232 res.raise_for_status() 

233 except requests.RequestException as err: 

234 if err.response is not None and err.response.status_code == 409: 

235 if append: # 409 entity already exists 

236 return self.append_entity_attributes(entity=entity) 

237 elif update: 

238 return self.override_entities(entities=[entity]) 

239 msg = f"Could not post entity {entity.id}" 

240 self.log_error(err=err, msg=msg) 

241 raise 

242 

243 def override_entities(self, entities: List[Union[ContextLDEntity,ContextLDEntityKeyValues]]): 

244 """ 

245 Function to create or override existing entites with the NGSI-LD Context Broker. 

246 The batch operation with Upsert will be used. 

247 """ 

248 return self.entity_batch_operation( 

249 entities=entities, action_type=ActionTypeLD.UPSERT, options="replace" 

250 ) 

251 

252 def get_entity( 

253 self, 

254 entity_id: str, 

255 entity_type: str = None, 

256 attrs: List[str] = None, 

257 options: Optional[str] = None, 

258 geometryProperty: Optional[str] = None, 

259 ) -> Union[ContextLDEntity, ContextLDEntityKeyValues, Dict[str, Any]]: 

260 """ 

261 This operation must return one entity element only, but there may be 

262 more than one entity with the same ID (e.g. entities with same ID but 

263 different types). In such case, an error message is returned, with 

264 the HTTP status code set to 409 Conflict. 

265 

266 Args: 

267 entity_id (String): Id of the entity to be retrieved 

268 entity_type (String): Entity type, to avoid ambiguity in case 

269 there are several entities with the same entity id. 

270 attrs (List of Strings): List of attribute names whose data must be 

271 included in the response. The attributes are retrieved in the 

272 order specified by this parameter. 

273 See "Filtering out attributes and metadata" section for more 

274 detail. If this parameter is not included, the attributes are 

275 retrieved in arbitrary order, and all the attributes of the 

276 entity are included in the response. 

277 Example: temperature, humidity. 

278 options (String): keyValues (simplified representation of entity) 

279 or sysAttrs (include generated attrs createdAt and modifiedAt) 

280 geometryProperty (String): Name of a GeoProperty. In the case of GeoJSON 

281 Entity representation, this parameter indicates which GeoProperty to 

282 use for the "geometry" element. By default, it shall be 'location'. 

283 Returns: 

284 ContextEntity 

285 """ 

286 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}") 

287 headers = self.headers.copy() 

288 params = {} 

289 if entity_type: 

290 params.update({"type": entity_type}) 

291 if attrs: 

292 params.update({"attrs": ",".join(attrs)}) 

293 if geometryProperty: 

294 params.update({"geometryProperty": geometryProperty}) 

295 if options: 

296 if options != "keyValues" and options != "sysAttrs": 

297 raise ValueError( 

298 f"Only available options are 'keyValues' and 'sysAttrs'" 

299 ) 

300 params.update({"options": options}) 

301 

302 try: 

303 res = self.get(url=url, params=params, headers=headers) 

304 if res.ok: 

305 self.logger.info("Entity successfully retrieved!") 

306 self.logger.debug("Received: %s", res.json()) 

307 if options == "keyValues": 

308 return ContextLDEntityKeyValues(**res.json()) 

309 else: 

310 return ContextLDEntity(**res.json()) 

311 res.raise_for_status() 

312 except requests.RequestException as err: 

313 msg = f"Could not load entity {entity_id}" 

314 self.log_error(err=err, msg=msg) 

315 raise 

316 

317 GeometryShape = Literal[ 

318 "Point", 

319 "MultiPoint", 

320 "LineString", 

321 "MultiLineString", 

322 "Polygon", 

323 "MultiPolygon", 

324 ] 

325 

326 def get_entity_list( 

327 self, 

328 entity_id: Optional[str] = None, 

329 id_pattern: Optional[str] = ".*", 

330 entity_type: Optional[str] = None, 

331 attrs: Optional[List[str]] = None, 

332 q: Optional[str] = None, 

333 georel: Optional[str] = None, 

334 geometry: Optional[GeometryShape] = None, 

335 coordinates: Optional[str] = None, 

336 geoproperty: Optional[str] = None, 

337 # csf: Optional[str] = None, # Context Source Filter 

338 limit: Optional[PositiveInt] = None, 

339 options: Optional[str] = None, 

340 ) -> List[Union[ContextLDEntity, ContextLDEntityKeyValues]]: 

341 """ 

342 This operation retrieves a list of entities based on different query options. 

343 By default, the operation retrieves all the entities in the context broker. 

344 Args: 

345 entity_id: 

346 Id of the entity to be retrieved 

347 id_pattern: 

348 Regular expression to match the entity id 

349 entity_type: 

350 Entity type, to avoid ambiguity in case there are several 

351 entities with the same entity id. 

352 attrs: 

353 List of attribute names whose data must be included in the response. 

354 q: 

355 Query expression, composed of attribute names, operators and values. 

356 georel: 

357 Geospatial relationship between the query geometry and the entities. 

358 geometry: 

359 Type of geometry for the query. The possible values are Point, 

360 MultiPoint, LineString, MultiLineString, Polygon, MultiPolygon. 

361 coordinates: 

362 Coordinates of the query geometry. The coordinates must be 

363 expressed as a string of comma-separated values. 

364 geoproperty: 

365 Name of a GeoProperty. In the case of GeoJSON Entity representation, 

366 this parameter indicates which GeoProperty to use for the "geometry" element. 

367 limit: 

368 Maximum number of entities to retrieve. 

369 options: 

370 Further options for the query. The available options are: 

371 - keyValues (simplified representation of entity) 

372 - sysAttrs (including createdAt and modifiedAt, etc.) 

373 - count (include number of all matched entities in response header) 

374 """ 

375 url = urljoin(self.base_url, f"{self._url_version}/entities/") 

376 headers = self.headers.copy() 

377 params = {} 

378 if entity_id: 

379 params.update({"id": entity_id}) 

380 if id_pattern: 

381 params.update({"idPattern": id_pattern}) 

382 if entity_type: 

383 params.update({"type": entity_type}) 

384 if attrs: 

385 params.update({"attrs": ",".join(attrs)}) 

386 if q: 

387 x = re.search(r"[=!<>~]{1}\'.*\'", q.replace(" ", "")) 

388 if x is not None: 

389 raise ValueError( 

390 f"String/Date/etc. value in {x.group()} must be " f"in double quote" 

391 ) 

392 params.update({"q": q}) 

393 if georel: 

394 params.update({"georel": georel}) 

395 if geometry: 

396 params.update({"geometry": geometry}) 

397 if coordinates: 

398 params.update({"coordinates": coordinates}) 

399 if geoproperty: 

400 params.update({"geoproperty": geoproperty}) 

401 # if csf: # ContextSourceRegistration not supported yet 

402 # params.update({'csf': csf}) 

403 if limit: 

404 if limit > 1000: 

405 raise ValueError("limit must be an integer value <= 1000") 

406 params.update({"limit": limit}) 

407 if options: 

408 if options != "keyValues" and options != "sysAttrs": 

409 raise ValueError( 

410 f"Only available options are 'keyValues' and 'sysAttrs'" 

411 ) 

412 params.update({"options": options}) 

413 # params.update({'local': 'true'}) 

414 

415 try: 

416 res = self.get(url=url, params=params, headers=headers) 

417 if res.ok: 

418 self.logger.info("Entity successfully retrieved!") 

419 entity_list: List[Union[ContextLDEntity, ContextLDEntityKeyValues]] = [] 

420 if options == "keyValues": 

421 entity_list = [ 

422 ContextLDEntityKeyValues(**item) for item in res.json() 

423 ] 

424 return entity_list 

425 else: 

426 entity_list = [ContextLDEntity(**item) for item in res.json()] 

427 return entity_list 

428 res.raise_for_status() 

429 except requests.RequestException as err: 

430 msg = f"Could not load entity matching{params}" 

431 self.log_error(err=err, msg=msg) 

432 raise 

433 

434 def replace_existing_attributes_of_entity( 

435 self, entity: Union[ContextLDEntity,ContextLDEntityKeyValues], append: bool = False 

436 ): 

437 """ 

438 The attributes previously existing in the entity are removed and 

439 replaced by the ones in the request. 

440 

441 Args: 

442 entity (ContextEntity): 

443 append (bool): 

444 options: 

445 Returns: 

446 

447 """ 

448 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity.id}/attrs") 

449 headers = self.headers.copy() 

450 if entity.model_dump().get("@context", None) is not None: 

451 headers.update({"Content-Type": "application/ld+json"}) 

452 headers.update({"Link": None}) 

453 try: 

454 res = self.patch( 

455 url=url, 

456 headers=headers, 

457 json=entity.model_dump( 

458 exclude={"id", "type"}, exclude_unset=True, exclude_none=True 

459 ), 

460 ) 

461 if res.ok: 

462 self.logger.info(f"Entity {entity.id} successfully " "updated!") 

463 else: 

464 res.raise_for_status() 

465 except requests.RequestException as err: 

466 if err.response is not None and append and err.response.status_code == 207: 

467 return self.append_entity_attributes(entity=entity) 

468 msg = f"Could not replace attribute of entity {entity.id} !" 

469 self.log_error(err=err, msg=msg) 

470 raise 

471 

472 def update_entity_attribute( 

473 self, 

474 entity_id: str, 

475 attr: Union[ 

476 ContextProperty, 

477 ContextRelationship, 

478 NamedContextProperty, 

479 NamedContextRelationship, 

480 ], 

481 attr_name: str = None, 

482 ): 

483 """ 

484 Updates a specified attribute from an entity. 

485 Args: 

486 attr: context attribute to update 

487 entity_id: Id of the entity. Example: Bcn_Welt 

488 entity_type: Entity type, to avoid ambiguity in case there are 

489 several entities with the same entity id. 

490 """ 

491 headers = self.headers.copy() 

492 if not isinstance(attr, NamedContextProperty) or not isinstance( 

493 attr, NamedContextRelationship 

494 ): 

495 assert attr_name is not None, ( 

496 "Missing name for attribute. " 

497 "attr_name must be present if" 

498 "attr is of type ContextAttribute" 

499 ) 

500 else: 

501 assert attr_name is None, ( 

502 "Invalid argument attr_name. Do not set " 

503 "attr_name if attr is of type " 

504 "NamedContextAttribute or NamedContextRelationship" 

505 ) 

506 

507 url = urljoin( 

508 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}" 

509 ) 

510 

511 jsonnn = {} 

512 if isinstance(attr, list) or isinstance(attr, NamedContextProperty): 

513 jsonnn = attr.model_dump( 

514 exclude={"name"}, exclude_unset=True, exclude_none=True 

515 ) 

516 else: 

517 prop = attr.model_dump() 

518 for key, value in prop.items(): 

519 if value and value != "Property": 

520 jsonnn[key] = value 

521 

522 try: 

523 res = self.patch(url=url, headers=headers, json=jsonnn) 

524 if res.ok: 

525 self.logger.info( 

526 f"Attribute {attr_name} of {entity_id} successfully updated!" 

527 ) 

528 else: 

529 res.raise_for_status() 

530 except requests.RequestException as err: 

531 msg = f"Could not update attribute '{attr_name}' of entity {entity_id}" 

532 self.log_error(err=err, msg=msg) 

533 raise 

534 

535 def append_entity_attributes( 

536 self, entity: Union[ContextLDEntity,ContextLDEntityKeyValues], options: Optional[str] = None 

537 ): 

538 """ 

539 Append new Entity attributes to an existing Entity within an NGSI-LD system 

540 Args: 

541 entity (ContextLDEntity): 

542 Entity to append attributes to. 

543 options (str): 

544 Options for the request. The only available value is 

545 'noOverwrite'. If set, it will raise 400, if all attributes 

546 exist already. 

547 

548 """ 

549 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity.id}/attrs") 

550 headers = self.headers.copy() 

551 if entity.model_dump().get("@context", None) is not None: 

552 headers.update({"Content-Type": "application/ld+json"}) 

553 headers.update({"Link": None}) 

554 params = {} 

555 

556 if options: 

557 if options != "noOverwrite": 

558 raise ValueError(f"The only available value is 'noOverwrite'") 

559 params.update({"options": options}) 

560 

561 try: 

562 res = self.post( 

563 url=url, 

564 headers=headers, 

565 params=params, 

566 json=entity.model_dump( 

567 exclude={"id", "type"}, exclude_unset=True, exclude_none=True 

568 ), 

569 ) 

570 if res.ok: 

571 self.logger.info(f"Entity {entity.id} successfully updated!") 

572 else: 

573 res.raise_for_status() 

574 except requests.RequestException as err: 

575 msg = f"Could not update entity {entity.id}!" 

576 self.log_error(err=err, msg=msg) 

577 raise 

578 

579 # def update_existing_attribute_by_name(self, entity: ContextLDEntity 

580 # ): 

581 # pass 

582 

583 def delete_entity_by_id(self, entity_id: str, entity_type: Optional[str] = None): 

584 """ 

585 Deletes an entity by its id. For deleting mulitple entities at once, 

586 entity_batch_operation() is more efficient. 

587 Args: 

588 entity_id: 

589 ID of entity to delete. 

590 entity_type: 

591 Type of entity to delete. 

592 """ 

593 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}") 

594 headers = self.headers.copy() 

595 params = {} 

596 

597 if entity_type: 

598 params.update({"type": entity_type}) 

599 

600 try: 

601 res = self.delete(url=url, headers=headers, params=params) 

602 if res.ok: 

603 self.logger.info(f"Entity {entity_id} successfully deleted") 

604 else: 

605 res.raise_for_status() 

606 except requests.RequestException as err: 

607 msg = f"Could not delete entity {entity_id}" 

608 self.log_error(err=err, msg=msg) 

609 raise 

610 

611 def delete_attribute(self, entity_id: str, attribute_id: str): 

612 """ 

613 Deletes an attribute from an entity. 

614 Args: 

615 entity_id: 

616 ID of the entity. 

617 attribute_id: 

618 Name of the attribute to delete. 

619 Returns: 

620 

621 """ 

622 url = urljoin( 

623 self.base_url, 

624 f"{self._url_version}/entities/{entity_id}/attrs/{attribute_id}", 

625 ) 

626 headers = self.headers.copy() 

627 

628 try: 

629 res = self.delete(url=url, headers=headers) 

630 if res.ok: 

631 self.logger.info( 

632 f"Attribute {attribute_id} of Entity {entity_id} successfully deleted" 

633 ) 

634 else: 

635 res.raise_for_status() 

636 except requests.RequestException as err: 

637 msg = f"Could not delete attribute {attribute_id} of entity {entity_id}" 

638 self.log_error(err=err, msg=msg) 

639 raise 

640 

641 # SUBSCRIPTION API ENDPOINTS 

642 def get_subscription_list(self, limit: PositiveInt = inf) -> List[SubscriptionLD]: 

643 """ 

644 Returns a list of all the subscriptions present in the system. 

645 Args: 

646 limit: Limit the number of subscriptions to be retrieved 

647 Returns: 

648 list of subscriptions 

649 """ 

650 url = urljoin(self.base_url, f"{self._url_version}/subscriptions/") 

651 headers = self.headers.copy() 

652 params = {} 

653 

654 # We always use the 'count' option to check weather pagination is 

655 # required 

656 params.update({"options": "count"}) 

657 try: 

658 items = self.__pagination( 

659 limit=limit, url=url, params=params, headers=headers 

660 ) 

661 adapter = TypeAdapter(List[SubscriptionLD]) 

662 return adapter.validate_python(items) 

663 except requests.RequestException as err: 

664 msg = "Could not load subscriptions!" 

665 self.log_error(err=err, msg=msg) 

666 raise 

667 

668 def post_subscription( 

669 self, subscription: SubscriptionLD, update: bool = False 

670 ) -> str: 

671 """ 

672 Creates a new subscription. The subscription is represented by a 

673 Subscription object defined in filip.cb.models. 

674 

675 If the subscription already exists, the adding is prevented and the id 

676 of the existing subscription is returned. 

677 

678 A subscription is deemed as already existing if there exists a 

679 subscription with the exact same subject and notification fields. All 

680 optional fields are not considered. 

681 

682 Args: 

683 subscription: Subscription 

684 update: True - If the subscription already exists, update it 

685 False- If the subscription already exists, throw warning 

686 Returns: 

687 str: Id of the (created) subscription 

688 

689 """ 

690 existing_subscriptions = self.get_subscription_list() 

691 

692 sub_hash = subscription.model_dump_json( 

693 include={"subject", "notification", "type"} 

694 ) 

695 for ex_sub in existing_subscriptions: 

696 if sub_hash == ex_sub.model_dump_json( 

697 include={"subject", "notification", "type"} 

698 ): 

699 self.logger.info("Subscription already exists") 

700 if update: 

701 self.logger.info("Updated subscription") 

702 subscription.id = ex_sub.id 

703 self.update_subscription(subscription) 

704 else: 

705 self.logger.warning( 

706 f"Subscription existed already with the id" f" {ex_sub.id}" 

707 ) 

708 return ex_sub.id 

709 

710 url = urljoin(self.base_url, f"{self._url_version}/subscriptions") 

711 headers = self.headers.copy() 

712 if subscription.model_dump().get("@context", None) is not None: 

713 headers.update({"Content-Type": "application/ld+json"}) 

714 headers.update({"Link": None}) 

715 try: 

716 res = self.post( 

717 url=url, 

718 headers=headers, 

719 data=subscription.model_dump_json( 

720 exclude_unset=False, exclude_defaults=False, exclude_none=True 

721 ), 

722 ) 

723 if res.ok: 

724 self.logger.info("Subscription successfully created!") 

725 return res.headers["Location"].split("/")[-1] 

726 res.raise_for_status() 

727 except requests.RequestException as err: 

728 msg = "Could not send subscription!" 

729 self.log_error(err=err, msg=msg) 

730 raise 

731 

732 def get_subscription(self, subscription_id: str) -> SubscriptionLD: 

733 """ 

734 Retrieves a subscription from the context broker. 

735 Args: 

736 subscription_id: id of the subscription 

737 

738 Returns: 

739 

740 """ 

741 url = urljoin( 

742 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}" 

743 ) 

744 headers = self.headers.copy() 

745 try: 

746 res = self.get(url=url, headers=headers) 

747 if res.ok: 

748 self.logger.debug("Received: %s", res.json()) 

749 return SubscriptionLD(**res.json()) 

750 res.raise_for_status() 

751 except requests.RequestException as err: 

752 msg = f"Could not load subscription {subscription_id}" 

753 self.log_error(err=err, msg=msg) 

754 raise 

755 

756 def update_subscription(self, subscription: SubscriptionLD) -> None: 

757 """ 

758 Only the fields included in the request are updated in the subscription. 

759 Args: 

760 subscription: Subscription to update 

761 Returns: 

762 

763 """ 

764 url = urljoin( 

765 self.base_url, f"{self._url_version}/subscriptions/{subscription.id}" 

766 ) 

767 headers = self.headers.copy() 

768 if subscription.model_dump().get("@context", None) is not None: 

769 headers.update({"Content-Type": "application/ld+json"}) 

770 headers.update({"Link": None}) 

771 try: 

772 res = self.patch( 

773 url=url, 

774 headers=headers, 

775 data=subscription.model_dump_json( 

776 exclude={"id"}, 

777 exclude_unset=True, 

778 exclude_defaults=True, 

779 exclude_none=True, 

780 ), 

781 ) 

782 if res.ok: 

783 self.logger.info("Subscription successfully updated!") 

784 else: 

785 res.raise_for_status() 

786 except requests.RequestException as err: 

787 msg = f"Could not update subscription {subscription.id}" 

788 self.log_error(err=err, msg=msg) 

789 raise 

790 

791 def delete_subscription(self, subscription_id: str) -> None: 

792 """ 

793 Deletes a subscription from a Context Broker 

794 Args: 

795 subscription_id: id of the subscription 

796 """ 

797 url = urljoin( 

798 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}" 

799 ) 

800 headers = self.headers.copy() 

801 try: 

802 res = self.delete(url=url, headers=headers) 

803 if res.ok: 

804 self.logger.info( 

805 f"Subscription '{subscription_id}' " f"successfully deleted!" 

806 ) 

807 else: 

808 res.raise_for_status() 

809 except requests.RequestException as err: 

810 msg = f"Could not delete subscription {subscription_id}" 

811 self.log_error(err=err, msg=msg) 

812 raise 

813 

814 def log_multi_errors(self, errors: List[Dict]) -> None: 

815 for error in errors: 

816 entity_id = error["entityId"] 

817 error_details: dict = error["error"] 

818 error_title = error_details.get("title") 

819 error_status = error_details.get("status") 

820 # error_detail = error_details['detail'] 

821 self.logger.error( 

822 "Response status: %d, Entity: %s, Reason: %s", 

823 error_status, 

824 entity_id, 

825 error_title, 

826 ) 

827 

828 def handle_multi_status_response(self, res: requests.Response): 

829 """ 

830 Handles the response of a batch_operation. If the response contains 

831 errors, they are logged. If the response contains only errors, a RuntimeError 

832 is raised. 

833 Args: 

834 res: 

835 

836 Returns: 

837 

838 """ 

839 try: 

840 res.raise_for_status() 

841 if res.text: 

842 response_data = res.json() 

843 if "errors" in response_data: 

844 errors = response_data["errors"] 

845 self.log_multi_errors(errors) 

846 if "success" in response_data: 

847 successList = response_data["success"] 

848 if len(successList) == 0: 

849 raise RuntimeError( 

850 "Batch operation resulted in errors only, see logs" 

851 ) 

852 else: 

853 self.logger.info("Empty response received.") 

854 except json.JSONDecodeError: 

855 self.logger.info( 

856 "Error decoding JSON. Response may not be in valid JSON format." 

857 ) 

858 

859 # Batch operation API 

860 def entity_batch_operation( 

861 self, 

862 *, 

863 entities: List[Union[ContextLDEntity,ContextLDEntityKeyValues]], 

864 action_type: Union[ActionTypeLD, str], 

865 options: Literal["noOverwrite", "replace", "update"] = None, 

866 ) -> None: 

867 """ 

868 This operation allows to create, update and/or delete several entities 

869 in a single batch operation. 

870 

871 This operation is split in as many individual operations as entities 

872 in the entities vector, so the actionType is executed for each one of 

873 them. Depending on the actionType, a mapping with regular non-batch 

874 operations can be done: 

875 

876 append: maps to POST /v2/entities (if the entity does not already exist) 

877 or POST /v2/entities/<id>/attrs (if the entity already exists). 

878 

879 appendStrict: maps to POST /v2/entities (if the entity does not 

880 already exist) or POST /v2/entities/<id>/attrs?options=append (if the 

881 entity already exists). 

882 

883 update: maps to PATCH /v2/entities/<id>/attrs. 

884 

885 delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every 

886 attribute included in the entity or to DELETE /v2/entities/<id> if 

887 no attribute were included in the entity. 

888 

889 replace: maps to PUT /v2/entities/<id>/attrs. 

890 

891 Args: 

892 entities: "an array of entities, each entity specified using the " 

893 "JSON entity representation format " 

894 action_type (Update): "actionType, to specify the kind of update 

895 action to do: either append, appendStrict, update, delete, 

896 or replace. " 

897 options (str): Optional 'noOverwrite' 'replace' 'update' 

898 

899 Returns: 

900 

901 """ 

902 

903 url = urljoin( 

904 self.base_url, f"{self._url_version}/entityOperations/{action_type.value}" 

905 ) 

906 headers = self.headers.copy() 

907 headers.update({"Content-Type": "application/json"}) 

908 params = {} 

909 if options: 

910 params.update({"options": options}) 

911 update = UpdateLD(entities=entities) 

912 try: 

913 if action_type == ActionTypeLD.DELETE: 

914 id_list = [entity.id for entity in entities] 

915 res = self.post( 

916 url=url, headers=headers, params=params, data=json.dumps(id_list) 

917 ) 

918 else: 

919 res = self.post( 

920 url=url, 

921 headers=headers, 

922 params=params, 

923 data=json.dumps( 

924 update.model_dump( 

925 by_alias=True, 

926 exclude_unset=True, 

927 exclude_none=True, 

928 ).get("entities") 

929 ), 

930 ) 

931 self.handle_multi_status_response(res) 

932 except RuntimeError as rerr: 

933 raise rerr 

934 except Exception as err: 

935 raise err 

936 else: 

937 self.logger.info(f"Update operation {action_type} succeeded!")