Coverage for filip/clients/ngsi_ld/cb.py: 82%

316 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-04-17 14:42 +0000

1""" 

2Context Broker Module for API Client 

3""" 

4 

5import re 

6import json 

7import os 

8from math import inf 

9from typing import Any, Dict, List, Union, Optional, Literal 

10from urllib.parse import urljoin 

11import requests 

12from pydantic import TypeAdapter, PositiveInt, PositiveFloat 

13from filip.clients.base_http_client import BaseHttpClient, NgsiURLVersion 

14from filip.config import settings 

15from filip.models.base import FiwareLDHeader, PaginationMethod, core_context 

16from filip.models.ngsi_v2.base import AttrsFormat 

17from filip.models.ngsi_ld.subscriptions import SubscriptionLD 

18from filip.models.ngsi_ld.context import ( 

19 ContextLDEntity, 

20 ContextLDEntityKeyValues, 

21 ContextProperty, 

22 ContextRelationship, 

23 NamedContextProperty, 

24 NamedContextRelationship, 

25 ActionTypeLD, 

26 UpdateLD, 

27) 

28from filip.models.ngsi_v2.context import Query 

29 

30 

31class ContextBrokerLDClient(BaseHttpClient): 

32 """ 

33 Implementation of NGSI-LD Context Broker functionalities, such as creating 

34 entities and subscriptions; retrieving, updating and deleting data. 

35 Further documentation: 

36 https://fiware-orion.readthedocs.io/en/master/ 

37 

38 Api specifications for LD are located here: 

39 https://www.etsi.org/deliver/etsi_gs/CIM/001_099/009/01.04.01_60/gs_cim009v010401p.pdf 

40 """ 

41 

42 def __init__( 

43 self, 

44 url: str = None, 

45 *, 

46 session: requests.Session = None, 

47 fiware_header: FiwareLDHeader = None, 

48 **kwargs, 

49 ): 

50 """ 

51 

52 Args: 

53 url: Url of context broker server 

54 session (requests.Session): 

55 fiware_header (FiwareHeader): fiware service and fiware service path 

56 **kwargs (Optional): Optional arguments that ``request`` takes. 

57 """ 

58 # set service url 

59 url = url or settings.LD_CB_URL 

60 # base_http_client overwrites empty header with FiwareHeader instead of FiwareLD 

61 init_header = fiware_header if fiware_header else FiwareLDHeader() 

62 if init_header.link_header is None: 

63 init_header.set_context(core_context) 

64 super().__init__(url=url, session=session, fiware_header=init_header, **kwargs) 

65 # set the version specific url-pattern 

66 self._url_version = NgsiURLVersion.ld_url.value 

67 # For uplink requests, the Content-Type header is essential, 

68 # Accept will be ignored 

69 # For downlink requests, the Accept header is essential, 

70 # Content-Type will be ignored 

71 

72 # default uplink content JSON 

73 self.headers.update({"Content-Type": "application/json"}) 

74 # default downlink content JSON-LD 

75 self.headers.update({"Accept": "application/ld+json"}) 

76 

77 if init_header.ngsild_tenant is not None: 

78 self.__make_tenant() 

79 

80 def __pagination( 

81 self, 

82 *, 

83 method: PaginationMethod = PaginationMethod.GET, 

84 url: str, 

85 headers: Dict, 

86 limit: Union[PositiveInt, PositiveFloat] = None, 

87 params: Dict = None, 

88 data: str = None, 

89 ) -> List[Dict]: 

90 """ 

91 NGSIv2 implements a pagination mechanism in order to help clients to 

92 retrieve large sets of resources. This mechanism works for all listing 

93 operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions, 

94 POST /v2/op/query, etc.). This function helps getting datasets that are 

95 larger than the limit for the different GET operations. 

96 

97 https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html 

98 

99 Args: 

100 url: Information about the url, obtained from the original function 

101 headers: The headers from the original function 

102 params: 

103 limit: 

104 

105 Returns: 

106 object: 

107 

108 """ 

109 if limit is None: 

110 limit = inf 

111 if limit > 1000: 

112 params["limit"] = 1000 # maximum items per request 

113 else: 

114 params["limit"] = limit 

115 

116 if self.session: 

117 session = self.session 

118 else: 

119 session = requests.Session() 

120 with session: 

121 res = session.request( 

122 method=method, url=url, params=params, headers=headers, data=data 

123 ) 

124 if res.ok: 

125 items = res.json() 

126 # do pagination 

127 if self._url_version == NgsiURLVersion.v2_url.value: 

128 count = int(res.headers["Fiware-Total-Count"]) 

129 elif self._url_version == NgsiURLVersion.ld_url.value: 

130 count = int(res.headers["NGSILD-Results-Count"]) 

131 else: 

132 count = 0 

133 

134 while len(items) < limit and len(items) < count: 

135 # Establishing the offset from where entities are retrieved 

136 params["offset"] = len(items) 

137 params["limit"] = min(1000, (limit - len(items))) 

138 res = session.request( 

139 method=method, 

140 url=url, 

141 params=params, 

142 headers=headers, 

143 data=data, 

144 ) 

145 if res.ok: 

146 items.extend(res.json()) 

147 else: 

148 res.raise_for_status() 

149 self.logger.debug("Received: %s", items) 

150 return items 

151 res.raise_for_status() 

152 

153 def get_version(self) -> Dict: 

154 """ 

155 Gets version of Orion-LD context broker 

156 Returns: 

157 Dictionary with response 

158 """ 

159 url = urljoin(self.base_url, "/version") 

160 try: 

161 res = self.get(url=url) 

162 if res.ok: 

163 return res.json() 

164 res.raise_for_status() 

165 except requests.RequestException as err: 

166 self.logger.error(err) 

167 raise 

168 

169 def __make_tenant(self): 

170 """ 

171 Create tenant if tenant 

172 is given in headers 

173 """ 

174 idhex = f"urn:ngsi-ld:{os.urandom(6).hex()}" 

175 e = ContextLDEntity(id=idhex, type=f"urn:ngsi-ld:{os.urandom(6).hex()}") 

176 try: 

177 self.post_entity(entity=e) 

178 self.delete_entity_by_id(idhex) 

179 except Exception as err: 

180 self.log_error(err=err, msg="Error while creating tenant") 

181 raise 

182 

183 def get_statistics(self) -> Dict: 

184 """ 

185 Gets statistics of context broker 

186 Returns: 

187 Dictionary with response 

188 """ 

189 url = urljoin(self.base_url, "statistics") 

190 try: 

191 res = self.get(url=url) 

192 if res.ok: 

193 return res.json() 

194 res.raise_for_status() 

195 except requests.RequestException as err: 

196 self.logger.error(err) 

197 raise 

198 

199 def post_entity( 

200 self, 

201 entity: Union[ContextLDEntity, ContextLDEntityKeyValues], 

202 append: bool = False, 

203 update: bool = False, 

204 ): 

205 """ 

206 Function registers an Object with the NGSI-LD Context Broker, 

207 if it already exists it can be automatically updated 

208 if the update flag bool is True. 

209 First a post request with the entity is tried, if the response code 

210 is 422 the entity is uncrossable, as it already exists there are two 

211 options, either overwrite it, if the attribute have changed 

212 (e.g. at least one new/new values) (update = True) or leave 

213 it the way it is (update=False) 

214 

215 """ 

216 url = urljoin(self.base_url, f"{self._url_version}/entities") 

217 headers = self.headers.copy() 

218 if entity.model_dump().get("@context", None) is not None: 

219 headers.update({"Content-Type": "application/ld+json"}) 

220 headers.update({"Link": None}) 

221 try: 

222 res = self.post( 

223 url=url, 

224 headers=headers, 

225 json=entity.model_dump( 

226 exclude_unset=False, exclude_defaults=False, exclude_none=True 

227 ), 

228 ) 

229 if res.ok: 

230 self.logger.info("Entity successfully posted!") 

231 return res.headers.get("Location") 

232 res.raise_for_status() 

233 except requests.RequestException as err: 

234 if err.response is not None and err.response.status_code == 409: 

235 if append: # 409 entity already exists 

236 return self.append_entity_attributes(entity=entity) 

237 elif update: 

238 return self.override_entities(entities=[entity]) 

239 msg = f"Could not post entity {entity.id}" 

240 self.log_error(err=err, msg=msg) 

241 raise 

242 

243 def override_entities( 

244 self, entities: List[Union[ContextLDEntity, ContextLDEntityKeyValues]] 

245 ): 

246 """ 

247 Function to create or override existing entites with the NGSI-LD Context Broker. 

248 The batch operation with Upsert will be used. 

249 """ 

250 return self.entity_batch_operation( 

251 entities=entities, action_type=ActionTypeLD.UPSERT, options="replace" 

252 ) 

253 

254 def get_entity( 

255 self, 

256 entity_id: str, 

257 entity_type: str = None, 

258 attrs: List[str] = None, 

259 options: Optional[str] = None, 

260 geometryProperty: Optional[str] = None, 

261 ) -> Union[ContextLDEntity, ContextLDEntityKeyValues, Dict[str, Any]]: 

262 """ 

263 This operation must return one entity element only, but there may be 

264 more than one entity with the same ID (e.g. entities with same ID but 

265 different types). In such case, an error message is returned, with 

266 the HTTP status code set to 409 Conflict. 

267 

268 Args: 

269 entity_id (String): Id of the entity to be retrieved 

270 entity_type (String): Entity type, to avoid ambiguity in case 

271 there are several entities with the same entity id. 

272 attrs (List of Strings): List of attribute names whose data must be 

273 included in the response. The attributes are retrieved in the 

274 order specified by this parameter. 

275 See "Filtering out attributes and metadata" section for more 

276 detail. If this parameter is not included, the attributes are 

277 retrieved in arbitrary order, and all the attributes of the 

278 entity are included in the response. 

279 Example: temperature, humidity. 

280 options (String): keyValues (simplified representation of entity) 

281 or sysAttrs (include generated attrs createdAt and modifiedAt) 

282 geometryProperty (String): Name of a GeoProperty. In the case of GeoJSON 

283 Entity representation, this parameter indicates which GeoProperty to 

284 use for the "geometry" element. By default, it shall be 'location'. 

285 Returns: 

286 ContextEntity 

287 """ 

288 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}") 

289 headers = self.headers.copy() 

290 params = {} 

291 if entity_type: 

292 params.update({"type": entity_type}) 

293 if attrs: 

294 params.update({"attrs": ",".join(attrs)}) 

295 if geometryProperty: 

296 params.update({"geometryProperty": geometryProperty}) 

297 if options: 

298 if options != "keyValues" and options != "sysAttrs": 

299 raise ValueError( 

300 f"Only available options are 'keyValues' and 'sysAttrs'" 

301 ) 

302 params.update({"options": options}) 

303 

304 try: 

305 res = self.get(url=url, params=params, headers=headers) 

306 if res.ok: 

307 self.logger.info("Entity successfully retrieved!") 

308 self.logger.debug("Received: %s", res.json()) 

309 if options == "keyValues": 

310 return ContextLDEntityKeyValues(**res.json()) 

311 else: 

312 return ContextLDEntity(**res.json()) 

313 res.raise_for_status() 

314 except requests.RequestException as err: 

315 msg = f"Could not load entity {entity_id}" 

316 self.log_error(err=err, msg=msg) 

317 raise 

318 

319 GeometryShape = Literal[ 

320 "Point", 

321 "MultiPoint", 

322 "LineString", 

323 "MultiLineString", 

324 "Polygon", 

325 "MultiPolygon", 

326 ] 

327 

328 def get_entity_list( 

329 self, 

330 entity_id: Optional[str] = None, 

331 id_pattern: Optional[str] = ".*", 

332 entity_type: Optional[str] = None, 

333 attrs: Optional[List[str]] = None, 

334 q: Optional[str] = None, 

335 georel: Optional[str] = None, 

336 geometry: Optional[GeometryShape] = None, 

337 coordinates: Optional[str] = None, 

338 geoproperty: Optional[str] = None, 

339 # csf: Optional[str] = None, # Context Source Filter 

340 limit: Optional[PositiveInt] = None, 

341 options: Optional[str] = None, 

342 ) -> List[Union[ContextLDEntity, ContextLDEntityKeyValues]]: 

343 """ 

344 This operation retrieves a list of entities based on different query options. 

345 By default, the operation retrieves all the entities in the context broker. 

346 Args: 

347 entity_id: 

348 Id of the entity to be retrieved 

349 id_pattern: 

350 Regular expression to match the entity id 

351 entity_type: 

352 Entity type, to avoid ambiguity in case there are several 

353 entities with the same entity id. 

354 attrs: 

355 List of attribute names whose data must be included in the response. 

356 q: 

357 Query expression, composed of attribute names, operators and values. 

358 georel: 

359 Geospatial relationship between the query geometry and the entities. 

360 geometry: 

361 Type of geometry for the query. The possible values are Point, 

362 MultiPoint, LineString, MultiLineString, Polygon, MultiPolygon. 

363 coordinates: 

364 Coordinates of the query geometry. The coordinates must be 

365 expressed as a string of comma-separated values. 

366 geoproperty: 

367 Name of a GeoProperty. In the case of GeoJSON Entity representation, 

368 this parameter indicates which GeoProperty to use for the "geometry" element. 

369 limit: 

370 Maximum number of entities to retrieve. 

371 options: 

372 Further options for the query. The available options are: 

373 - keyValues (simplified representation of entity) 

374 - sysAttrs (including createdAt and modifiedAt, etc.) 

375 - count (include number of all matched entities in response header) 

376 """ 

377 url = urljoin(self.base_url, f"{self._url_version}/entities/") 

378 headers = self.headers.copy() 

379 params = {} 

380 if entity_id: 

381 params.update({"id": entity_id}) 

382 if id_pattern: 

383 params.update({"idPattern": id_pattern}) 

384 if entity_type: 

385 params.update({"type": entity_type}) 

386 if attrs: 

387 params.update({"attrs": ",".join(attrs)}) 

388 if q: 

389 x = re.search(r"[=!<>~]{1}\'.*\'", q.replace(" ", "")) 

390 if x is not None: 

391 raise ValueError( 

392 f"String/Date/etc. value in {x.group()} must be " f"in double quote" 

393 ) 

394 params.update({"q": q}) 

395 if georel: 

396 params.update({"georel": georel}) 

397 if geometry: 

398 params.update({"geometry": geometry}) 

399 if coordinates: 

400 params.update({"coordinates": coordinates}) 

401 if geoproperty: 

402 params.update({"geoproperty": geoproperty}) 

403 # if csf: # ContextSourceRegistration not supported yet 

404 # params.update({'csf': csf}) 

405 if limit: 

406 if limit > 1000: 

407 raise ValueError("limit must be an integer value <= 1000") 

408 params.update({"limit": limit}) 

409 if options: 

410 if options != "keyValues" and options != "sysAttrs": 

411 raise ValueError( 

412 f"Only available options are 'keyValues' and 'sysAttrs'" 

413 ) 

414 params.update({"options": options}) 

415 # params.update({'local': 'true'}) 

416 

417 try: 

418 res = self.get(url=url, params=params, headers=headers) 

419 if res.ok: 

420 self.logger.info("Entity successfully retrieved!") 

421 entity_list: List[Union[ContextLDEntity, ContextLDEntityKeyValues]] = [] 

422 if options == "keyValues": 

423 entity_list = [ 

424 ContextLDEntityKeyValues(**item) for item in res.json() 

425 ] 

426 return entity_list 

427 else: 

428 entity_list = [ContextLDEntity(**item) for item in res.json()] 

429 return entity_list 

430 res.raise_for_status() 

431 except requests.RequestException as err: 

432 msg = f"Could not load entity matching{params}" 

433 self.log_error(err=err, msg=msg) 

434 raise 

435 

436 def replace_existing_attributes_of_entity( 

437 self, 

438 entity: Union[ContextLDEntity, ContextLDEntityKeyValues], 

439 append: bool = False, 

440 ): 

441 """ 

442 The attributes previously existing in the entity are removed and 

443 replaced by the ones in the request. 

444 

445 Args: 

446 entity (ContextEntity): 

447 append (bool): 

448 options: 

449 Returns: 

450 

451 """ 

452 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity.id}/attrs") 

453 headers = self.headers.copy() 

454 if entity.model_dump().get("@context", None) is not None: 

455 headers.update({"Content-Type": "application/ld+json"}) 

456 headers.update({"Link": None}) 

457 try: 

458 res = self.patch( 

459 url=url, 

460 headers=headers, 

461 json=entity.model_dump(exclude={"id", "type"}, exclude_none=True), 

462 ) 

463 if res.ok: 

464 self.logger.info(f"Entity {entity.id} successfully " "updated!") 

465 else: 

466 res.raise_for_status() 

467 except requests.RequestException as err: 

468 if err.response is not None and append and err.response.status_code == 207: 

469 return self.append_entity_attributes(entity=entity) 

470 msg = f"Could not replace attribute of entity {entity.id} !" 

471 self.log_error(err=err, msg=msg) 

472 raise 

473 

474 def update_entity_attribute( 

475 self, 

476 entity_id: str, 

477 attr: Union[ 

478 ContextProperty, 

479 ContextRelationship, 

480 NamedContextProperty, 

481 NamedContextRelationship, 

482 ], 

483 attr_name: str = None, 

484 ): 

485 """ 

486 Updates a specified attribute from an entity. 

487 Args: 

488 attr: context attribute to update 

489 entity_id: Id of the entity. Example: Bcn_Welt 

490 entity_type: Entity type, to avoid ambiguity in case there are 

491 several entities with the same entity id. 

492 """ 

493 headers = self.headers.copy() 

494 if not isinstance(attr, NamedContextProperty) or not isinstance( 

495 attr, NamedContextRelationship 

496 ): 

497 assert attr_name is not None, ( 

498 "Missing name for attribute. " 

499 "attr_name must be present if" 

500 "attr is of type ContextAttribute" 

501 ) 

502 else: 

503 assert attr_name is None, ( 

504 "Invalid argument attr_name. Do not set " 

505 "attr_name if attr is of type " 

506 "NamedContextAttribute or NamedContextRelationship" 

507 ) 

508 

509 url = urljoin( 

510 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}" 

511 ) 

512 

513 jsonnn = {} 

514 if isinstance(attr, list) or isinstance(attr, NamedContextProperty): 

515 jsonnn = attr.model_dump(exclude={"name"}, exclude_none=True) 

516 else: 

517 prop = attr.model_dump() 

518 for key, value in prop.items(): 

519 if value and value != "Property": 

520 jsonnn[key] = value 

521 

522 try: 

523 res = self.patch(url=url, headers=headers, json=jsonnn) 

524 if res.ok: 

525 self.logger.info( 

526 f"Attribute {attr_name} of {entity_id} successfully updated!" 

527 ) 

528 else: 

529 res.raise_for_status() 

530 except requests.RequestException as err: 

531 msg = f"Could not update attribute '{attr_name}' of entity {entity_id}" 

532 self.log_error(err=err, msg=msg) 

533 raise 

534 

535 def append_entity_attributes( 

536 self, 

537 entity: Union[ContextLDEntity, ContextLDEntityKeyValues], 

538 options: Optional[str] = None, 

539 ): 

540 """ 

541 Append new Entity attributes to an existing Entity within an NGSI-LD system 

542 Args: 

543 entity (ContextLDEntity): 

544 Entity to append attributes to. 

545 options (str): 

546 Options for the request. The only available value is 

547 'noOverwrite'. If set, it will raise 400, if all attributes 

548 exist already. 

549 

550 """ 

551 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity.id}/attrs") 

552 headers = self.headers.copy() 

553 if entity.model_dump().get("@context", None) is not None: 

554 headers.update({"Content-Type": "application/ld+json"}) 

555 headers.update({"Link": None}) 

556 params = {} 

557 

558 if options: 

559 if options != "noOverwrite": 

560 raise ValueError(f"The only available value is 'noOverwrite'") 

561 params.update({"options": options}) 

562 

563 try: 

564 res = self.post( 

565 url=url, 

566 headers=headers, 

567 params=params, 

568 json=entity.model_dump(exclude={"id", "type"}, exclude_none=True), 

569 ) 

570 if res.ok: 

571 self.logger.info(f"Entity {entity.id} successfully updated!") 

572 else: 

573 res.raise_for_status() 

574 except requests.RequestException as err: 

575 msg = f"Could not update entity {entity.id}!" 

576 self.log_error(err=err, msg=msg) 

577 raise 

578 

579 # def update_existing_attribute_by_name(self, entity: ContextLDEntity 

580 # ): 

581 # pass 

582 

583 def delete_entity_by_id(self, entity_id: str, entity_type: Optional[str] = None): 

584 """ 

585 Deletes an entity by its id. For deleting mulitple entities at once, 

586 entity_batch_operation() is more efficient. 

587 Args: 

588 entity_id: 

589 ID of entity to delete. 

590 entity_type: 

591 Type of entity to delete. 

592 """ 

593 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}") 

594 headers = self.headers.copy() 

595 params = {} 

596 

597 if entity_type: 

598 params.update({"type": entity_type}) 

599 

600 try: 

601 res = self.delete(url=url, headers=headers, params=params) 

602 if res.ok: 

603 self.logger.info(f"Entity {entity_id} successfully deleted") 

604 else: 

605 res.raise_for_status() 

606 except requests.RequestException as err: 

607 msg = f"Could not delete entity {entity_id}" 

608 self.log_error(err=err, msg=msg) 

609 raise 

610 

611 def delete_attribute(self, entity_id: str, attribute_id: str): 

612 """ 

613 Deletes an attribute from an entity. 

614 Args: 

615 entity_id: 

616 ID of the entity. 

617 attribute_id: 

618 Name of the attribute to delete. 

619 Returns: 

620 

621 """ 

622 url = urljoin( 

623 self.base_url, 

624 f"{self._url_version}/entities/{entity_id}/attrs/{attribute_id}", 

625 ) 

626 headers = self.headers.copy() 

627 

628 try: 

629 res = self.delete(url=url, headers=headers) 

630 if res.ok: 

631 self.logger.info( 

632 f"Attribute {attribute_id} of Entity {entity_id} successfully deleted" 

633 ) 

634 else: 

635 res.raise_for_status() 

636 except requests.RequestException as err: 

637 msg = f"Could not delete attribute {attribute_id} of entity {entity_id}" 

638 self.log_error(err=err, msg=msg) 

639 raise 

640 

641 # SUBSCRIPTION API ENDPOINTS 

642 def get_subscription_list(self, limit: PositiveInt = inf) -> List[SubscriptionLD]: 

643 """ 

644 Returns a list of all the subscriptions present in the system. 

645 Args: 

646 limit: Limit the number of subscriptions to be retrieved 

647 Returns: 

648 list of subscriptions 

649 """ 

650 url = urljoin(self.base_url, f"{self._url_version}/subscriptions/") 

651 headers = self.headers.copy() 

652 params = {} 

653 

654 # We always use the 'count' option to check weather pagination is 

655 # required 

656 params.update({"options": "count"}) 

657 try: 

658 items = self.__pagination( 

659 limit=limit, url=url, params=params, headers=headers 

660 ) 

661 adapter = TypeAdapter(List[SubscriptionLD]) 

662 return adapter.validate_python(items) 

663 except requests.RequestException as err: 

664 msg = "Could not load subscriptions!" 

665 self.log_error(err=err, msg=msg) 

666 raise 

667 

668 def post_subscription( 

669 self, subscription: SubscriptionLD, update: bool = False 

670 ) -> str: 

671 """ 

672 Creates a new subscription. The subscription is represented by a 

673 Subscription object defined in filip.cb.models. 

674 

675 If the subscription already exists, the adding is prevented and the id 

676 of the existing subscription is returned. 

677 

678 A subscription is deemed as already existing if there exists a 

679 subscription with the exact same subject and notification fields. All 

680 optional fields are not considered. 

681 

682 Args: 

683 subscription: Subscription 

684 update: True - If the subscription already exists, update it 

685 False- If the subscription already exists, throw warning 

686 Returns: 

687 str: Id of the (created) subscription 

688 

689 """ 

690 existing_subscriptions = self.get_subscription_list() 

691 

692 sub_hash = subscription.model_dump_json( 

693 include={"subject", "notification", "type"} 

694 ) 

695 for ex_sub in existing_subscriptions: 

696 if sub_hash == ex_sub.model_dump_json( 

697 include={"subject", "notification", "type"} 

698 ): 

699 self.logger.info("Subscription already exists") 

700 if update: 

701 self.logger.info("Updated subscription") 

702 subscription.id = ex_sub.id 

703 self.update_subscription(subscription) 

704 else: 

705 self.logger.warning( 

706 f"Subscription existed already with the id" f" {ex_sub.id}" 

707 ) 

708 return ex_sub.id 

709 

710 url = urljoin(self.base_url, f"{self._url_version}/subscriptions") 

711 headers = self.headers.copy() 

712 if subscription.model_dump().get("@context", None) is not None: 

713 headers.update({"Content-Type": "application/ld+json"}) 

714 headers.update({"Link": None}) 

715 try: 

716 res = self.post( 

717 url=url, 

718 headers=headers, 

719 data=subscription.model_dump_json( 

720 exclude_unset=False, exclude_defaults=False, exclude_none=True 

721 ), 

722 ) 

723 if res.ok: 

724 self.logger.info("Subscription successfully created!") 

725 return res.headers["Location"].split("/")[-1] 

726 res.raise_for_status() 

727 except requests.RequestException as err: 

728 msg = "Could not send subscription!" 

729 self.log_error(err=err, msg=msg) 

730 raise 

731 

732 def get_subscription(self, subscription_id: str) -> SubscriptionLD: 

733 """ 

734 Retrieves a subscription from the context broker. 

735 Args: 

736 subscription_id: id of the subscription 

737 

738 Returns: 

739 

740 """ 

741 url = urljoin( 

742 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}" 

743 ) 

744 headers = self.headers.copy() 

745 try: 

746 res = self.get(url=url, headers=headers) 

747 if res.ok: 

748 self.logger.debug("Received: %s", res.json()) 

749 return SubscriptionLD(**res.json()) 

750 res.raise_for_status() 

751 except requests.RequestException as err: 

752 msg = f"Could not load subscription {subscription_id}" 

753 self.log_error(err=err, msg=msg) 

754 raise 

755 

756 def update_subscription(self, subscription: SubscriptionLD) -> None: 

757 """ 

758 Only the fields included in the request are updated in the subscription. 

759 Args: 

760 subscription: Subscription to update 

761 Returns: 

762 

763 """ 

764 url = urljoin( 

765 self.base_url, f"{self._url_version}/subscriptions/{subscription.id}" 

766 ) 

767 headers = self.headers.copy() 

768 if subscription.model_dump().get("@context", None) is not None: 

769 headers.update({"Content-Type": "application/ld+json"}) 

770 headers.update({"Link": None}) 

771 try: 

772 res = self.patch( 

773 url=url, 

774 headers=headers, 

775 data=subscription.model_dump_json( 

776 exclude={"id"}, 

777 exclude_none=True, 

778 ), 

779 ) 

780 if res.ok: 

781 self.logger.info("Subscription successfully updated!") 

782 else: 

783 res.raise_for_status() 

784 except requests.RequestException as err: 

785 msg = f"Could not update subscription {subscription.id}" 

786 self.log_error(err=err, msg=msg) 

787 raise 

788 

789 def delete_subscription(self, subscription_id: str) -> None: 

790 """ 

791 Deletes a subscription from a Context Broker 

792 Args: 

793 subscription_id: id of the subscription 

794 """ 

795 url = urljoin( 

796 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}" 

797 ) 

798 headers = self.headers.copy() 

799 try: 

800 res = self.delete(url=url, headers=headers) 

801 if res.ok: 

802 self.logger.info( 

803 f"Subscription '{subscription_id}' " f"successfully deleted!" 

804 ) 

805 else: 

806 res.raise_for_status() 

807 except requests.RequestException as err: 

808 msg = f"Could not delete subscription {subscription_id}" 

809 self.log_error(err=err, msg=msg) 

810 raise 

811 

812 def log_multi_errors(self, errors: List[Dict]) -> None: 

813 for error in errors: 

814 entity_id = error["entityId"] 

815 error_details: dict = error["error"] 

816 error_title = error_details.get("title") 

817 error_status = error_details.get("status") 

818 # error_detail = error_details['detail'] 

819 self.logger.error( 

820 "Response status: %d, Entity: %s, Reason: %s", 

821 error_status, 

822 entity_id, 

823 error_title, 

824 ) 

825 

826 def handle_multi_status_response(self, res: requests.Response): 

827 """ 

828 Handles the response of a batch_operation. If the response contains 

829 errors, they are logged. If the response contains only errors, a RuntimeError 

830 is raised. 

831 Args: 

832 res: 

833 

834 Returns: 

835 

836 """ 

837 try: 

838 res.raise_for_status() 

839 if res.text: 

840 response_data = res.json() 

841 if "errors" in response_data: 

842 errors = response_data["errors"] 

843 self.log_multi_errors(errors) 

844 if "success" in response_data: 

845 successList = response_data["success"] 

846 if len(successList) == 0: 

847 raise RuntimeError( 

848 "Batch operation resulted in errors only, see logs" 

849 ) 

850 else: 

851 self.logger.info("Empty response received.") 

852 except json.JSONDecodeError: 

853 self.logger.info( 

854 "Error decoding JSON. Response may not be in valid JSON format." 

855 ) 

856 

857 # Batch operation API 

858 def entity_batch_operation( 

859 self, 

860 *, 

861 entities: List[Union[ContextLDEntity, ContextLDEntityKeyValues]], 

862 action_type: Union[ActionTypeLD, str], 

863 options: Literal["noOverwrite", "replace", "update"] = None, 

864 ) -> None: 

865 """ 

866 This operation allows to create, update and/or delete several entities 

867 in a single batch operation. 

868 

869 This operation is split in as many individual operations as entities 

870 in the entities vector, so the actionType is executed for each one of 

871 them. Depending on the actionType, a mapping with regular non-batch 

872 operations can be done: 

873 

874 append: maps to POST /v2/entities (if the entity does not already exist) 

875 or POST /v2/entities/<id>/attrs (if the entity already exists). 

876 

877 appendStrict: maps to POST /v2/entities (if the entity does not 

878 already exist) or POST /v2/entities/<id>/attrs?options=append (if the 

879 entity already exists). 

880 

881 update: maps to PATCH /v2/entities/<id>/attrs. 

882 

883 delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every 

884 attribute included in the entity or to DELETE /v2/entities/<id> if 

885 no attribute were included in the entity. 

886 

887 replace: maps to PUT /v2/entities/<id>/attrs. 

888 

889 Args: 

890 entities: "an array of entities, each entity specified using the " 

891 "JSON entity representation format " 

892 action_type (Update): "actionType, to specify the kind of update 

893 action to do: either append, appendStrict, update, delete, 

894 or replace. " 

895 options (str): Optional 'noOverwrite' 'replace' 'update' 

896 

897 Returns: 

898 

899 """ 

900 

901 url = urljoin( 

902 self.base_url, f"{self._url_version}/entityOperations/{action_type.value}" 

903 ) 

904 headers = self.headers.copy() 

905 headers.update({"Content-Type": "application/json"}) 

906 params = {} 

907 if options: 

908 params.update({"options": options}) 

909 update = UpdateLD(entities=entities) 

910 try: 

911 if action_type == ActionTypeLD.DELETE: 

912 id_list = [entity.id for entity in entities] 

913 res = self.post( 

914 url=url, headers=headers, params=params, data=json.dumps(id_list) 

915 ) 

916 else: 

917 res = self.post( 

918 url=url, 

919 headers=headers, 

920 params=params, 

921 data=json.dumps( 

922 update.model_dump( 

923 by_alias=True, 

924 exclude_none=True, 

925 ).get("entities") 

926 ), 

927 ) 

928 self.handle_multi_status_response(res) 

929 except RuntimeError as rerr: 

930 raise rerr 

931 except Exception as err: 

932 raise err 

933 else: 

934 self.logger.info(f"Update operation {action_type} succeeded!")