Coverage for filip/clients/ngsi_ld/cb.py: 84%

328 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-10 13:57 +0000

1""" 

2Context Broker Module for API Client 

3""" 

4 

5import re 

6import json 

7import os 

8from math import inf 

9from typing import Any, Dict, List, Union, Optional, Literal 

10from urllib.parse import urljoin 

11import requests 

12from pydantic import TypeAdapter, PositiveInt, PositiveFloat 

13from filip.clients.base_http_client import BaseHttpClient, NgsiURLVersion 

14from filip.config import settings 

15from filip.models.base import FiwareLDHeader, PaginationMethod, core_context 

16from filip.models.ngsi_v2.base import AttrsFormat 

17from filip.models.ngsi_ld.subscriptions import SubscriptionLD 

18from filip.models.ngsi_ld.context import ( 

19 DataTypeLD, 

20 ContextLDEntity, 

21 ContextLDEntityKeyValues, 

22 ContextProperty, 

23 ContextRelationship, 

24 NamedContextProperty, 

25 NamedContextRelationship, 

26 ActionTypeLD, 

27 UpdateLD, 

28) 

29from filip.models.ngsi_v2.context import Query 

30 

31 

32class ContextBrokerLDClient(BaseHttpClient): 

33 """ 

34 Implementation of NGSI-LD Context Broker functionalities, such as creating 

35 entities and subscriptions; retrieving, updating and deleting data. 

36 Further documentation: 

37 https://fiware-orion.readthedocs.io/en/master/ 

38 

39 Api specifications for LD are located here: 

40 https://www.etsi.org/deliver/etsi_gs/CIM/001_099/009/01.04.01_60/gs_cim009v010401p.pdf 

41 """ 

42 

43 def __init__( 

44 self, 

45 url: str = None, 

46 *, 

47 session: requests.Session = None, 

48 fiware_header: FiwareLDHeader = None, 

49 **kwargs, 

50 ): 

51 """ 

52 

53 Args: 

54 url: Url of context broker server 

55 session (requests.Session): 

56 fiware_header (FiwareHeader): fiware service and fiware service path 

57 **kwargs (Optional): Optional arguments that ``request`` takes. 

58 """ 

59 # set service url 

60 url = url or settings.LD_CB_URL 

61 # base_http_client overwrites empty header with FiwareHeader instead of FiwareLD 

62 init_header = fiware_header if fiware_header else FiwareLDHeader() 

63 if init_header.link_header is None: 

64 init_header.set_context(core_context) 

65 super().__init__(url=url, session=session, fiware_header=init_header, **kwargs) 

66 # set the version specific url-pattern 

67 self._url_version = NgsiURLVersion.ld_url.value 

68 # For uplink requests, the Content-Type header is essential, 

69 # Accept will be ignored 

70 # For downlink requests, the Accept header is essential, 

71 # Content-Type will be ignored 

72 

73 # default uplink content JSON 

74 self.headers.update({"Content-Type": "application/json"}) 

75 # default downlink content JSON-LD 

76 self.headers.update({"Accept": "application/ld+json"}) 

77 

78 if init_header.ngsild_tenant is not None: 

79 self.__make_tenant() 

80 

81 def __pagination( 

82 self, 

83 *, 

84 method: PaginationMethod = PaginationMethod.GET, 

85 url: str, 

86 headers: Dict, 

87 limit: Union[PositiveInt, PositiveFloat] = None, 

88 params: Dict = None, 

89 data: str = None, 

90 ) -> List[Dict]: 

91 """ 

92 NGSIv2 implements a pagination mechanism in order to help clients to 

93 retrieve large sets of resources. This mechanism works for all listing 

94 operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions, 

95 POST /v2/op/query, etc.). This function helps getting datasets that are 

96 larger than the limit for the different GET operations. 

97 

98 https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html 

99 

100 Args: 

101 url: Information about the url, obtained from the original function 

102 headers: The headers from the original function 

103 params: 

104 limit: 

105 

106 Returns: 

107 object: 

108 

109 """ 

110 if limit is None: 

111 limit = inf 

112 if limit > 1000: 

113 params["limit"] = 1000 # maximum items per request 

114 else: 

115 params["limit"] = limit 

116 # add count option if not present 

117 if "count" not in params: 

118 params.update({"count": "true"}) 

119 

120 if self.session: 

121 session = self.session 

122 else: 

123 session = requests.Session() 

124 with session: 

125 res = session.request( 

126 method=method, url=url, params=params, headers=headers, data=data 

127 ) 

128 if res.ok: 

129 items = res.json() 

130 # do pagination 

131 count = int(res.headers["NGSILD-Results-Count"]) 

132 

133 while len(items) < limit and len(items) < count: 

134 # Establishing the offset from where entities are retrieved 

135 params["offset"] = len(items) 

136 params["limit"] = min(1000, (limit - len(items))) 

137 res = session.request( 

138 method=method, 

139 url=url, 

140 params=params, 

141 headers=headers, 

142 data=data, 

143 ) 

144 if res.ok: 

145 items.extend(res.json()) 

146 else: 

147 res.raise_for_status() 

148 self.logger.debug("Received: %s", items) 

149 return items 

150 res.raise_for_status() 

151 

152 def get_version(self) -> Dict: 

153 """ 

154 Gets version of Orion-LD context broker 

155 Returns: 

156 Dictionary with response 

157 """ 

158 url = urljoin(self.base_url, "/version") 

159 try: 

160 res = self.get(url=url) 

161 if res.ok: 

162 return res.json() 

163 res.raise_for_status() 

164 except requests.RequestException as err: 

165 self.logger.error(err) 

166 raise 

167 

168 def __make_tenant(self): 

169 """ 

170 Create tenant if tenant 

171 is given in headers 

172 """ 

173 idhex = f"urn:ngsi-ld:{os.urandom(6).hex()}" 

174 e = ContextLDEntity(id=idhex, type="TemporaryTenant") 

175 try: 

176 self.post_entity(entity=e) 

177 self.delete_entity_by_id(idhex) 

178 except Exception as err: 

179 self.log_error(err=err, msg="Error while creating tenant") 

180 raise 

181 

182 def get_statistics(self) -> Dict: 

183 """ 

184 Gets statistics of context broker 

185 Returns: 

186 Dictionary with response 

187 """ 

188 url = urljoin(self.base_url, "statistics") 

189 try: 

190 res = self.get(url=url) 

191 if res.ok: 

192 return res.json() 

193 res.raise_for_status() 

194 except requests.RequestException as err: 

195 self.logger.error(err) 

196 raise 

197 

198 def post_entity( 

199 self, 

200 entity: Union[ContextLDEntity, ContextLDEntityKeyValues], 

201 append: bool = False, 

202 update: bool = False, 

203 ): 

204 """ 

205 Function registers an Object with the NGSI-LD Context Broker, 

206 if it already exists it can be automatically updated 

207 if the update flag bool is True. 

208 First a post request with the entity is tried, if the response code 

209 is 422 the entity is uncrossable, as it already exists there are two 

210 options, either overwrite it, if the attribute have changed 

211 (e.g. at least one new/new values) (update = True) or leave 

212 it the way it is (update=False) 

213 

214 """ 

215 url = urljoin(self.base_url, f"{self._url_version}/entities") 

216 headers = self.headers.copy() 

217 if isinstance(entity, ContextLDEntityKeyValues): 

218 entity = entity.to_entity() 

219 if entity.model_dump().get("@context", None) is not None: 

220 headers.update({"Content-Type": "application/ld+json"}) 

221 headers.update({"Link": None}) 

222 try: 

223 res = self.post( 

224 url=url, 

225 headers=headers, 

226 json=entity.model_dump( 

227 exclude_unset=False, exclude_defaults=False, exclude_none=True 

228 ), 

229 ) 

230 if res.ok: 

231 self.logger.info("Entity successfully posted!") 

232 return res.headers.get("Location") 

233 res.raise_for_status() 

234 except requests.RequestException as err: 

235 if err.response is not None and err.response.status_code == 409: 

236 if append: # 409 entity already exists 

237 return self.append_entity_attributes(entity=entity) 

238 elif update: 

239 return self.override_entities(entities=[entity]) 

240 msg = f"Could not post entity {entity.id}" 

241 self.log_error(err=err, msg=msg) 

242 raise 

243 

244 def override_entities( 

245 self, entities: List[Union[ContextLDEntity, ContextLDEntityKeyValues]] 

246 ): 

247 """ 

248 Function to create or override existing entites with the NGSI-LD Context Broker. 

249 The batch operation with Upsert will be used. 

250 """ 

251 return self.entity_batch_operation( 

252 entities=entities, action_type=ActionTypeLD.UPSERT, options="replace" 

253 ) 

254 

255 def get_entity( 

256 self, 

257 entity_id: str, 

258 entity_type: str = None, 

259 attrs: List[str] = None, 

260 options: Optional[str] = None, 

261 geometryProperty: Optional[str] = None, 

262 ) -> Union[ContextLDEntity, ContextLDEntityKeyValues, Dict[str, Any]]: 

263 """ 

264 This operation must return one entity element only, but there may be 

265 more than one entity with the same ID (e.g. entities with same ID but 

266 different types). In such case, an error message is returned, with 

267 the HTTP status code set to 409 Conflict. 

268 

269 Args: 

270 entity_id (String): Id of the entity to be retrieved 

271 entity_type (String): Entity type, to avoid ambiguity in case 

272 there are several entities with the same entity id. 

273 attrs (List of Strings): List of attribute names whose data must be 

274 included in the response. The attributes are retrieved in the 

275 order specified by this parameter. 

276 See "Filtering out attributes and metadata" section in https://fiware-orion.readthedocs.io/en/master/orion-api.html#filtering-out-attributes-and-metadata for more 

277 detail. If this parameter is not included, the attributes are 

278 retrieved in arbitrary order, and all the attributes of the 

279 entity are included in the response. 

280 Example: temperature, humidity. 

281 options (String): keyValues (simplified representation of entity) 

282 or sysAttrs (include generated attrs createdAt and modifiedAt) 

283 geometryProperty (String): Name of a GeoProperty. In the case of GeoJSON 

284 Entity representation, this parameter indicates which GeoProperty to 

285 use for the "geometry" element. By default, it shall be 'location'. 

286 Returns: 

287 ContextEntity 

288 """ 

289 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}") 

290 headers = self.headers.copy() 

291 params = {} 

292 if entity_type: 

293 params.update({"type": entity_type}) 

294 if attrs: 

295 params.update({"attrs": ",".join(attrs)}) 

296 if geometryProperty: 

297 params.update({"geometryProperty": geometryProperty}) 

298 if options: 

299 if options != "keyValues" and options != "sysAttrs": 

300 raise ValueError( 

301 f"Only available options are 'keyValues' and 'sysAttrs'" 

302 ) 

303 params.update({"options": options}) 

304 

305 try: 

306 res = self.get(url=url, params=params, headers=headers) 

307 if res.ok: 

308 self.logger.info("Entity successfully retrieved!") 

309 self.logger.debug("Received: %s", res.json()) 

310 if options == "keyValues": 

311 return ContextLDEntityKeyValues(**res.json()) 

312 else: 

313 return ContextLDEntity(**res.json()) 

314 res.raise_for_status() 

315 except requests.RequestException as err: 

316 msg = f"Could not load entity {entity_id}" 

317 self.log_error(err=err, msg=msg) 

318 raise 

319 

320 GeometryShape = Literal[ 

321 "Point", 

322 "MultiPoint", 

323 "LineString", 

324 "MultiLineString", 

325 "Polygon", 

326 "MultiPolygon", 

327 ] 

328 

329 def get_entity_list( 

330 self, 

331 entity_id: Optional[str] = None, 

332 id_pattern: Optional[str] = ".*", 

333 entity_type: Optional[str] = None, 

334 attrs: Optional[List[str]] = None, 

335 q: Optional[str] = None, 

336 georel: Optional[str] = None, 

337 geometry: Optional[GeometryShape] = None, 

338 coordinates: Optional[str] = None, 

339 geoproperty: Optional[str] = None, 

340 # csf: Optional[str] = None, # Context Source Filter 

341 limit: Optional[PositiveInt] = None, 

342 options: Optional[str] = None, 

343 ) -> List[Union[ContextLDEntity, ContextLDEntityKeyValues]]: 

344 """ 

345 This operation retrieves a list of entities based on different query options. 

346 By default, the operation retrieves all the entities in the context broker. 

347 Args: 

348 entity_id: 

349 Id of the entity to be retrieved 

350 id_pattern: 

351 Regular expression to match the entity id 

352 entity_type: 

353 Entity type, to avoid ambiguity in case there are several 

354 entities with the same entity id. 

355 attrs: 

356 List of attribute names whose data must be included in the response. 

357 q: 

358 Query expression, composed of attribute names, operators and values. 

359 georel: 

360 Geospatial relationship between the query geometry and the entities. 

361 geometry: 

362 Type of geometry for the query. The possible values are Point, 

363 MultiPoint, LineString, MultiLineString, Polygon, MultiPolygon. 

364 coordinates: 

365 Coordinates of the query geometry. The coordinates must be 

366 expressed as a string of comma-separated values. 

367 geoproperty: 

368 Name of a GeoProperty. In the case of GeoJSON Entity representation, 

369 this parameter indicates which GeoProperty to use for the "geometry" element. 

370 limit: 

371 Maximum number of entities to retrieve. 

372 options: 

373 Further options for the query. The available options are: 

374 - keyValues (simplified representation of entity) 

375 - sysAttrs (including createdAt and modifiedAt, etc.) 

376 - count (include number of all matched entities in response header) 

377 """ 

378 url = urljoin(self.base_url, f"{self._url_version}/entities/") 

379 headers = self.headers.copy() 

380 params = {} 

381 if entity_id: 

382 params.update({"id": entity_id}) 

383 if id_pattern: 

384 params.update({"idPattern": id_pattern}) 

385 if entity_type: 

386 params.update({"type": entity_type}) 

387 if attrs: 

388 params.update({"attrs": ",".join(attrs)}) 

389 if q: 

390 x = re.search(r"[=!<>~]{1}\'.*\'", q.replace(" ", "")) 

391 if x is not None: 

392 raise ValueError( 

393 f"String/Date/etc. value in {x.group()} must be " f"in double quote" 

394 ) 

395 params.update({"q": q}) 

396 if georel: 

397 params.update({"georel": georel}) 

398 if geometry: 

399 params.update({"geometry": geometry}) 

400 if coordinates: 

401 params.update({"coordinates": coordinates}) 

402 if geoproperty: 

403 params.update({"geoproperty": geoproperty}) 

404 # if csf: # ContextSourceRegistration not supported yet 

405 # params.update({'csf': csf}) 

406 if options: 

407 if options != "keyValues" and options != "sysAttrs": 

408 raise ValueError( 

409 f"Only available options are 'keyValues' and 'sysAttrs'" 

410 ) 

411 params.update({"options": options}) 

412 # params.update({'local': 'true'}) 

413 

414 try: 

415 # use pagination 

416 params.update({"count": "true"}) 

417 items = self.__pagination( 

418 limit=limit, url=url, params=params, headers=headers 

419 ) 

420 

421 self.logger.info("Entity successfully retrieved!") 

422 # convert raw data to pydantic models 

423 if options == "keyValues": 

424 entity_list = [ContextLDEntityKeyValues(**item) for item in items] 

425 return entity_list 

426 else: 

427 entity_list = [ContextLDEntity(**item) for item in items] 

428 return entity_list 

429 except requests.RequestException as err: 

430 msg = f"Could not load entity matching{params}" 

431 self.log_error(err=err, msg=msg) 

432 raise 

433 

434 def replace_existing_attributes_of_entity( 

435 self, 

436 entity: Union[ContextLDEntity, ContextLDEntityKeyValues], 

437 append: bool = False, 

438 ): 

439 """ 

440 The attributes previously existing in the entity are removed and 

441 replaced by the ones in the request. 

442 

443 Args: 

444 entity (ContextEntity): 

445 append (bool): 

446 options: 

447 Returns: 

448 

449 """ 

450 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity.id}/attrs") 

451 headers = self.headers.copy() 

452 if isinstance(entity, ContextLDEntityKeyValues): 

453 entity = entity.to_entity() 

454 if entity.model_dump().get("@context", None) is not None: 

455 headers.update({"Content-Type": "application/ld+json"}) 

456 headers.update({"Link": None}) 

457 try: 

458 res = self.patch( 

459 url=url, 

460 headers=headers, 

461 json=entity.model_dump(exclude={"id", "type"}, exclude_none=True), 

462 ) 

463 if res.ok: 

464 self.logger.info(f"Entity {entity.id} successfully " "updated!") 

465 else: 

466 res.raise_for_status() 

467 except requests.RequestException as err: 

468 if err.response is not None and append and err.response.status_code == 207: 

469 return self.append_entity_attributes(entity=entity) 

470 msg = f"Could not replace attribute of entity {entity.id} !" 

471 self.log_error(err=err, msg=msg) 

472 raise 

473 

474 def update_entity_attribute( 

475 self, 

476 entity_id: str, 

477 attr: Union[ 

478 ContextProperty, 

479 ContextRelationship, 

480 NamedContextProperty, 

481 NamedContextRelationship, 

482 ], 

483 attr_name: str = None, 

484 ): 

485 """ 

486 Updates a specified attribute from an entity. 

487 Args: 

488 attr: context attribute to update 

489 entity_id: Id of the entity. Example: Bcn_Welt 

490 entity_type: Entity type, to avoid ambiguity in case there are 

491 several entities with the same entity id. 

492 """ 

493 headers = self.headers.copy() 

494 if not isinstance(attr, NamedContextProperty) or not isinstance( 

495 attr, NamedContextRelationship 

496 ): 

497 assert attr_name is not None, ( 

498 "Missing name for attribute. " 

499 "attr_name must be present if" 

500 "attr is of type ContextAttribute" 

501 ) 

502 else: 

503 assert attr_name is None, ( 

504 "Invalid argument attr_name. Do not set " 

505 "attr_name if attr is of type " 

506 "NamedContextAttribute or NamedContextRelationship" 

507 ) 

508 attr_name = attr.name 

509 url = urljoin( 

510 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}" 

511 ) 

512 val = attr.value if "value" in attr.model_dump() else attr.object 

513 try: 

514 res = self.patch(url=url, headers=headers, json={"value": val}) 

515 if res.ok: 

516 self.logger.info( 

517 f"Attribute {attr_name} of {entity_id} successfully updated!" 

518 ) 

519 else: 

520 res.raise_for_status() 

521 except requests.RequestException as err: 

522 msg = f"Could not update attribute '{attr_name}' of entity {entity_id}" 

523 self.log_error(err=err, msg=msg) 

524 raise 

525 

526 def append_entity_attributes( 

527 self, 

528 entity: Union[ContextLDEntity, ContextLDEntityKeyValues], 

529 options: Optional[str] = None, 

530 ): 

531 """ 

532 Append new Entity attributes to an existing Entity within an NGSI-LD system 

533 Args: 

534 entity (ContextLDEntity): 

535 Entity to append attributes to. 

536 options (str): 

537 Options for the request. The only available value is 

538 'noOverwrite'. If set, it will raise 400, if all attributes 

539 exist already. 

540 

541 """ 

542 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity.id}/attrs") 

543 headers = self.headers.copy() 

544 if entity.model_dump().get("@context", None) is not None: 

545 headers.update({"Content-Type": "application/ld+json"}) 

546 headers.update({"Link": None}) 

547 params = {} 

548 

549 if options: 

550 if options != "noOverwrite": 

551 raise ValueError(f"The only available value is 'noOverwrite'") 

552 params.update({"options": options}) 

553 

554 try: 

555 res = self.post( 

556 url=url, 

557 headers=headers, 

558 params=params, 

559 json=entity.model_dump(exclude={"id", "type"}, exclude_none=True), 

560 ) 

561 if res.ok: 

562 self.logger.info(f"Entity {entity.id} successfully updated!") 

563 else: 

564 res.raise_for_status() 

565 except requests.RequestException as err: 

566 msg = f"Could not update entity {entity.id}!" 

567 self.log_error(err=err, msg=msg) 

568 raise 

569 

570 # def update_existing_attribute_by_name(self, entity: ContextLDEntity 

571 # ): 

572 # pass 

573 

574 def delete_entity_by_id(self, entity_id: str, entity_type: Optional[str] = None): 

575 """ 

576 Deletes an entity by its id. For deleting mulitple entities at once, 

577 entity_batch_operation() is more efficient. 

578 Args: 

579 entity_id: 

580 ID of entity to delete. 

581 entity_type: 

582 Type of entity to delete. 

583 """ 

584 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}") 

585 headers = self.headers.copy() 

586 params = {} 

587 

588 if entity_type: 

589 params.update({"type": entity_type}) 

590 

591 try: 

592 res = self.delete(url=url, headers=headers, params=params) 

593 if res.ok: 

594 self.logger.info(f"Entity {entity_id} successfully deleted") 

595 else: 

596 res.raise_for_status() 

597 except requests.RequestException as err: 

598 msg = f"Could not delete entity {entity_id}" 

599 self.log_error(err=err, msg=msg) 

600 raise 

601 

602 def delete_attribute(self, entity_id: str, attribute_id: str): 

603 """ 

604 Deletes an attribute from an entity. 

605 Args: 

606 entity_id: 

607 ID of the entity. 

608 attribute_id: 

609 Name of the attribute to delete. 

610 Returns: 

611 

612 """ 

613 url = urljoin( 

614 self.base_url, 

615 f"{self._url_version}/entities/{entity_id}/attrs/{attribute_id}", 

616 ) 

617 headers = self.headers.copy() 

618 

619 try: 

620 res = self.delete(url=url, headers=headers) 

621 if res.ok: 

622 self.logger.info( 

623 f"Attribute {attribute_id} of Entity {entity_id} successfully deleted" 

624 ) 

625 else: 

626 res.raise_for_status() 

627 except requests.RequestException as err: 

628 msg = f"Could not delete attribute {attribute_id} of entity {entity_id}" 

629 self.log_error(err=err, msg=msg) 

630 raise 

631 

632 # SUBSCRIPTION API ENDPOINTS 

633 def get_subscription_list(self, limit: PositiveInt = inf) -> List[SubscriptionLD]: 

634 """ 

635 Returns a list of all the subscriptions present in the system. 

636 Args: 

637 limit: Limit the number of subscriptions to be retrieved 

638 Returns: 

639 list of subscriptions 

640 """ 

641 url = urljoin(self.base_url, f"{self._url_version}/subscriptions/") 

642 headers = self.headers.copy() 

643 params = {} 

644 

645 # We always use the 'count' option to check weather pagination is 

646 # required 

647 params.update({"options": "count"}) 

648 try: 

649 items = self.__pagination( 

650 limit=limit, url=url, params=params, headers=headers 

651 ) 

652 adapter = TypeAdapter(List[SubscriptionLD]) 

653 return adapter.validate_python(items) 

654 except requests.RequestException as err: 

655 msg = "Could not load subscriptions!" 

656 self.log_error(err=err, msg=msg) 

657 raise 

658 

659 def post_subscription( 

660 self, subscription: SubscriptionLD, update: bool = False 

661 ) -> str: 

662 """ 

663 Creates a new subscription. The subscription is represented by a 

664 Subscription object defined in filip.cb.models. 

665 

666 If the subscription already exists, the adding is prevented and the id 

667 of the existing subscription is returned. 

668 

669 A subscription is deemed as already existing if there exists a 

670 subscription with the exact same subject and notification fields. All 

671 optional fields are not considered. 

672 

673 Args: 

674 subscription: Subscription 

675 update: True - If the subscription already exists, update it 

676 False- If the subscription already exists, throw warning 

677 Returns: 

678 str: Id of the (created) subscription 

679 

680 """ 

681 existing_subscriptions = self.get_subscription_list() 

682 

683 sub_hash = subscription.model_dump_json( 

684 include={"subject", "notification", "type"} 

685 ) 

686 for ex_sub in existing_subscriptions: 

687 if sub_hash == ex_sub.model_dump_json( 

688 include={"subject", "notification", "type"} 

689 ): 

690 self.logger.info("Subscription already exists") 

691 if update: 

692 self.logger.info("Updated subscription") 

693 subscription.id = ex_sub.id 

694 self.update_subscription(subscription) 

695 else: 

696 self.logger.warning( 

697 f"Subscription existed already with the id" f" {ex_sub.id}" 

698 ) 

699 return ex_sub.id 

700 

701 url = urljoin(self.base_url, f"{self._url_version}/subscriptions") 

702 headers = self.headers.copy() 

703 if subscription.model_dump().get("@context", None) is not None: 

704 headers.update({"Content-Type": "application/ld+json"}) 

705 headers.update({"Link": None}) 

706 try: 

707 res = self.post( 

708 url=url, 

709 headers=headers, 

710 data=subscription.model_dump_json( 

711 exclude_unset=False, exclude_defaults=False, exclude_none=True 

712 ), 

713 ) 

714 if res.ok: 

715 self.logger.info("Subscription successfully created!") 

716 return res.headers["Location"].split("/")[-1] 

717 res.raise_for_status() 

718 except requests.RequestException as err: 

719 msg = "Could not send subscription!" 

720 self.log_error(err=err, msg=msg) 

721 raise 

722 

723 def get_subscription(self, subscription_id: str) -> SubscriptionLD: 

724 """ 

725 Retrieves a subscription from the context broker. 

726 Args: 

727 subscription_id: id of the subscription 

728 

729 Returns: 

730 

731 """ 

732 url = urljoin( 

733 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}" 

734 ) 

735 headers = self.headers.copy() 

736 try: 

737 res = self.get(url=url, headers=headers) 

738 if res.ok: 

739 self.logger.debug("Received: %s", res.json()) 

740 return SubscriptionLD(**res.json()) 

741 res.raise_for_status() 

742 except requests.RequestException as err: 

743 msg = f"Could not load subscription {subscription_id}" 

744 self.log_error(err=err, msg=msg) 

745 raise 

746 

747 def update_subscription(self, subscription: SubscriptionLD) -> None: 

748 """ 

749 Only the fields included in the request are updated in the subscription. 

750 Args: 

751 subscription: Subscription to update 

752 Returns: 

753 

754 """ 

755 url = urljoin( 

756 self.base_url, f"{self._url_version}/subscriptions/{subscription.id}" 

757 ) 

758 headers = self.headers.copy() 

759 if subscription.model_dump().get("@context", None) is not None: 

760 headers.update({"Content-Type": "application/ld+json"}) 

761 headers.update({"Link": None}) 

762 try: 

763 res = self.patch( 

764 url=url, 

765 headers=headers, 

766 data=subscription.model_dump_json( 

767 exclude={"id"}, 

768 exclude_none=True, 

769 ), 

770 ) 

771 if res.ok: 

772 self.logger.info("Subscription successfully updated!") 

773 else: 

774 res.raise_for_status() 

775 except requests.RequestException as err: 

776 msg = f"Could not update subscription {subscription.id}" 

777 self.log_error(err=err, msg=msg) 

778 raise 

779 

780 def delete_subscription(self, subscription_id: str) -> None: 

781 """ 

782 Deletes a subscription from a Context Broker 

783 Args: 

784 subscription_id: id of the subscription 

785 """ 

786 url = urljoin( 

787 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}" 

788 ) 

789 headers = self.headers.copy() 

790 try: 

791 res = self.delete(url=url, headers=headers) 

792 if res.ok: 

793 self.logger.info( 

794 f"Subscription '{subscription_id}' " f"successfully deleted!" 

795 ) 

796 else: 

797 res.raise_for_status() 

798 except requests.RequestException as err: 

799 msg = f"Could not delete subscription {subscription_id}" 

800 self.log_error(err=err, msg=msg) 

801 raise 

802 

803 def log_multi_errors(self, errors: List[Dict]) -> None: 

804 for error in errors: 

805 entity_id = error["entityId"] 

806 error_details: dict = error["error"] 

807 error_title = error_details.get("title") 

808 error_status = error_details.get("status") 

809 # error_detail = error_details['detail'] 

810 self.logger.error( 

811 "Response status: %d, Entity: %s, Reason: %s", 

812 error_status, 

813 entity_id, 

814 error_title, 

815 ) 

816 

817 def handle_multi_status_response(self, res: requests.Response): 

818 """ 

819 Handles the response of a batch_operation. If the response contains 

820 errors, they are logged. If the response contains only errors, a RuntimeError 

821 is raised. 

822 Args: 

823 res: 

824 

825 Returns: 

826 

827 """ 

828 try: 

829 res.raise_for_status() 

830 if res.text: 

831 response_data = res.json() 

832 if "errors" in response_data: 

833 errors = response_data["errors"] 

834 self.log_multi_errors(errors) 

835 if "success" in response_data: 

836 successList = response_data["success"] 

837 if len(successList) == 0: 

838 raise RuntimeError( 

839 "Batch operation resulted in errors only, see logs" 

840 ) 

841 else: 

842 self.logger.info("Empty response received.") 

843 except json.JSONDecodeError: 

844 self.logger.info( 

845 "Error decoding JSON. Response may not be in valid JSON format." 

846 ) 

847 

848 # Batch operation API 

849 def entity_batch_operation( 

850 self, 

851 *, 

852 entities: List[Union[ContextLDEntity, ContextLDEntityKeyValues]], 

853 action_type: Union[ActionTypeLD, str], 

854 options: Literal["noOverwrite", "replace", "update"] = None, 

855 ) -> None: 

856 """ 

857 This operation allows to create, update and/or delete several entities 

858 in a single batch operation. 

859 

860 This operation is split in as many individual operations as entities 

861 in the entities vector, so the actionType is executed for each one of 

862 them. Depending on the actionType, a mapping with regular non-batch 

863 operations can be done: 

864 

865 append: maps to POST /v2/entities (if the entity does not already exist) 

866 or POST /v2/entities/<id>/attrs (if the entity already exists). 

867 

868 appendStrict: maps to POST /v2/entities (if the entity does not 

869 already exist) or POST /v2/entities/<id>/attrs?options=append (if the 

870 entity already exists). 

871 

872 update: maps to PATCH /v2/entities/<id>/attrs. 

873 

874 delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every 

875 attribute included in the entity or to DELETE /v2/entities/<id> if 

876 no attribute were included in the entity. 

877 

878 replace: maps to PUT /v2/entities/<id>/attrs. 

879 

880 Args: 

881 entities: "an array of entities, each entity specified using the " 

882 "JSON entity representation format " 

883 action_type (Update): "actionType, to specify the kind of update 

884 action to do: either append, appendStrict, update, delete, 

885 or replace. " 

886 options (str): Optional 'noOverwrite' 'replace' 'update' 

887 

888 Returns: 

889 

890 """ 

891 

892 url = urljoin( 

893 self.base_url, f"{self._url_version}/entityOperations/{action_type.value}" 

894 ) 

895 

896 headers = self.headers.copy() 

897 ctx = any(e.model_dump().get("@context", None) is not None for e in entities) 

898 

899 nctx = any(e.model_dump().get("@context", None) is None for e in entities) 

900 if ctx and not nctx: 

901 headers.update({"Content-Type": "application/ld+json"}) 

902 headers.update({"Link": None}) 

903 elif not ctx and nctx: 

904 headers.update({"Content-Type": "application/json"}) 

905 else: 

906 self.logger.warning( 

907 "Detected mixed context provision in batch operation: " 

908 "Some entities have @context field while others don't. " 

909 "FiLiP use application/json and Link header by default, so that " 

910 "the entities with @context will be rejected by CB" 

911 ) 

912 

913 params = {} 

914 if options: 

915 params.update({"options": options}) 

916 update = UpdateLD(entities=entities) 

917 try: 

918 if action_type == ActionTypeLD.DELETE: 

919 id_list = [entity.id for entity in entities] 

920 res = self.post( 

921 url=url, headers=headers, params=params, data=json.dumps(id_list) 

922 ) 

923 else: 

924 res = self.post( 

925 url=url, 

926 headers=headers, 

927 params=params, 

928 data=json.dumps( 

929 update.model_dump( 

930 by_alias=True, 

931 exclude_none=True, 

932 ).get("entities") 

933 ), 

934 ) 

935 self.handle_multi_status_response(res) 

936 except RuntimeError as rerr: 

937 raise rerr 

938 except Exception as err: 

939 raise err 

940 else: 

941 self.logger.info(f"Update operation {action_type} succeeded!") 

942 

943 def validate_relationship( 

944 self, 

945 relationship: Union[ 

946 NamedContextProperty, 

947 ContextProperty, 

948 NamedContextRelationship, 

949 ContextRelationship, 

950 Dict, 

951 ], 

952 ) -> bool: 

953 """ 

954 Validates a relationship. A relationship is valid if it points to an existing 

955 entity. Otherwise, it is considered invalid 

956 

957 Args: 

958 relationship: relationship to validate,assumed to be property or relationship 

959 since there is no geoproperty with string value 

960 Returns 

961 True if the relationship is valid, False otherwise 

962 """ 

963 if hasattr(relationship, "value"): 

964 destination_id = relationship.value 

965 elif hasattr(relationship, "object"): 

966 destination_id = relationship.object 

967 elif isinstance(relationship, dict): 

968 _sentinel = object() 

969 destination_id = relationship.get("value", _sentinel) 

970 if destination_id is _sentinel: 

971 raise ValueError( 

972 "Invalid ld relationship dictionary format\n" 

973 "Expected format: {" 

974 f'"type": "{DataTypeLD.RELATIONSHIP[0]}", ' 

975 '"value" "entity_id"}' 

976 ) 

977 else: 

978 raise ValueError("Invalid relationship type.") 

979 try: 

980 destination_entity = self.get_entity(entity_id=destination_id) 

981 return destination_entity.id == destination_id 

982 except requests.RequestException as err: 

983 if err.response.status_code == 404: 

984 return False