Coverage for filip/clients/ngsi_ld/cb.py: 82%

341 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-05 11:07 +0000

1""" 

2Context Broker Module for API Client 

3""" 

4 

5import re 

6import json 

7import os 

8from math import inf 

9from typing import Any, Dict, List, Union, Optional, Literal 

10from urllib.parse import urljoin 

11import requests 

12from pydantic import TypeAdapter, PositiveInt, PositiveFloat 

13from filip.clients.base_http_client import BaseHttpClient, NgsiURLVersion 

14from filip.config import settings 

15from filip.models.base import FiwareLDHeader, PaginationMethod, core_context 

16from filip.models.ngsi_v2.base import AttrsFormat 

17from filip.models.ngsi_ld.subscriptions import SubscriptionLD 

18from filip.models.ngsi_ld.context import ( 

19 DataTypeLD, 

20 ContextLDEntity, 

21 ContextLDEntityKeyValues, 

22 ContextProperty, 

23 ContextRelationship, 

24 NamedContextProperty, 

25 NamedContextRelationship, 

26 ActionTypeLD, 

27 UpdateLD, 

28) 

29from filip.models.ngsi_v2.context import Query 

30 

31 

32class ContextBrokerLDClient(BaseHttpClient): 

33 """ 

34 Implementation of NGSI-LD Context Broker functionalities, such as creating 

35 entities and subscriptions; retrieving, updating and deleting data. 

36 Further documentation: 

37 https://fiware-orion.readthedocs.io/en/master/ 

38 

39 Api specifications for LD are located here: 

40 https://www.etsi.org/deliver/etsi_gs/CIM/001_099/009/01.04.01_60/gs_cim009v010401p.pdf 

41 """ 

42 

43 def __init__( 

44 self, 

45 url: str = None, 

46 *, 

47 session: requests.Session = None, 

48 fiware_header: FiwareLDHeader = None, 

49 **kwargs, 

50 ): 

51 """ 

52 

53 Args: 

54 url: Url of context broker server 

55 session (requests.Session): 

56 fiware_header (FiwareHeader): fiware service and fiware service path 

57 **kwargs (Optional): Optional arguments that ``request`` takes. 

58 """ 

59 # set service url 

60 url = url or settings.LD_CB_URL 

61 # base_http_client overwrites empty header with FiwareHeader instead of FiwareLD 

62 init_header = fiware_header if fiware_header else FiwareLDHeader() 

63 if init_header.link_header is None: 

64 init_header.set_context(core_context) 

65 super().__init__(url=url, session=session, fiware_header=init_header, **kwargs) 

66 # set the version specific url-pattern 

67 self._url_version = NgsiURLVersion.ld_url.value 

68 # For uplink requests, the Content-Type header is essential, 

69 # Accept will be ignored 

70 # For downlink requests, the Accept header is essential, 

71 # Content-Type will be ignored 

72 

73 # default uplink content JSON 

74 self.headers.update({"Content-Type": "application/json"}) 

75 # default downlink content JSON-LD 

76 self.headers.update({"Accept": "application/ld+json"}) 

77 

78 if init_header.ngsild_tenant is not None: 

79 self.__make_tenant() 

80 

81 def __pagination( 

82 self, 

83 *, 

84 method: PaginationMethod = PaginationMethod.GET, 

85 url: str, 

86 headers: Dict, 

87 limit: Union[PositiveInt, PositiveFloat] = None, 

88 params: Dict = None, 

89 data: str = None, 

90 ) -> List[Dict]: 

91 """ 

92 NGSIv2 implements a pagination mechanism in order to help clients to 

93 retrieve large sets of resources. This mechanism works for all listing 

94 operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions, 

95 POST /v2/op/query, etc.). This function helps getting datasets that are 

96 larger than the limit for the different GET operations. 

97 

98 https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html 

99 

100 Args: 

101 url: Information about the url, obtained from the original function 

102 headers: The headers from the original function 

103 params: 

104 limit: 

105 

106 Returns: 

107 object: 

108 

109 """ 

110 if limit is None: 

111 limit = inf 

112 if limit > 1000: 

113 params["limit"] = 1000 # maximum items per request 

114 else: 

115 params["limit"] = limit 

116 

117 if self.session: 

118 session = self.session 

119 else: 

120 session = requests.Session() 

121 with session: 

122 res = session.request( 

123 method=method, url=url, params=params, headers=headers, data=data 

124 ) 

125 if res.ok: 

126 items = res.json() 

127 # do pagination 

128 if self._url_version == NgsiURLVersion.v2_url.value: 

129 count = int(res.headers["Fiware-Total-Count"]) 

130 elif self._url_version == NgsiURLVersion.ld_url.value: 

131 count = int(res.headers["NGSILD-Results-Count"]) 

132 else: 

133 count = 0 

134 

135 while len(items) < limit and len(items) < count: 

136 # Establishing the offset from where entities are retrieved 

137 params["offset"] = len(items) 

138 params["limit"] = min(1000, (limit - len(items))) 

139 res = session.request( 

140 method=method, 

141 url=url, 

142 params=params, 

143 headers=headers, 

144 data=data, 

145 ) 

146 if res.ok: 

147 items.extend(res.json()) 

148 else: 

149 res.raise_for_status() 

150 self.logger.debug("Received: %s", items) 

151 return items 

152 res.raise_for_status() 

153 

154 def get_version(self) -> Dict: 

155 """ 

156 Gets version of Orion-LD context broker 

157 Returns: 

158 Dictionary with response 

159 """ 

160 url = urljoin(self.base_url, "/version") 

161 try: 

162 res = self.get(url=url) 

163 if res.ok: 

164 return res.json() 

165 res.raise_for_status() 

166 except requests.RequestException as err: 

167 self.logger.error(err) 

168 raise 

169 

170 def __make_tenant(self): 

171 """ 

172 Create tenant if tenant 

173 is given in headers 

174 """ 

175 idhex = f"urn:ngsi-ld:{os.urandom(6).hex()}" 

176 e = ContextLDEntity(id=idhex, type="TemporaryTenant") 

177 try: 

178 self.post_entity(entity=e) 

179 self.delete_entity_by_id(idhex) 

180 except Exception as err: 

181 self.log_error(err=err, msg="Error while creating tenant") 

182 raise 

183 

184 def get_statistics(self) -> Dict: 

185 """ 

186 Gets statistics of context broker 

187 Returns: 

188 Dictionary with response 

189 """ 

190 url = urljoin(self.base_url, "statistics") 

191 try: 

192 res = self.get(url=url) 

193 if res.ok: 

194 return res.json() 

195 res.raise_for_status() 

196 except requests.RequestException as err: 

197 self.logger.error(err) 

198 raise 

199 

200 def post_entity( 

201 self, 

202 entity: Union[ContextLDEntity, ContextLDEntityKeyValues], 

203 append: bool = False, 

204 update: bool = False, 

205 ): 

206 """ 

207 Function registers an Object with the NGSI-LD Context Broker, 

208 if it already exists it can be automatically updated 

209 if the update flag bool is True. 

210 First a post request with the entity is tried, if the response code 

211 is 422 the entity is uncrossable, as it already exists there are two 

212 options, either overwrite it, if the attribute have changed 

213 (e.g. at least one new/new values) (update = True) or leave 

214 it the way it is (update=False) 

215 

216 """ 

217 url = urljoin(self.base_url, f"{self._url_version}/entities") 

218 headers = self.headers.copy() 

219 if isinstance(entity, ContextLDEntityKeyValues): 

220 entity = entity.to_entity() 

221 if entity.model_dump().get("@context", None) is not None: 

222 headers.update({"Content-Type": "application/ld+json"}) 

223 headers.update({"Link": None}) 

224 try: 

225 res = self.post( 

226 url=url, 

227 headers=headers, 

228 json=entity.model_dump( 

229 exclude_unset=False, exclude_defaults=False, exclude_none=True 

230 ), 

231 ) 

232 if res.ok: 

233 self.logger.info("Entity successfully posted!") 

234 return res.headers.get("Location") 

235 res.raise_for_status() 

236 except requests.RequestException as err: 

237 if err.response is not None and err.response.status_code == 409: 

238 if append: # 409 entity already exists 

239 return self.append_entity_attributes(entity=entity) 

240 elif update: 

241 return self.override_entities(entities=[entity]) 

242 msg = f"Could not post entity {entity.id}" 

243 self.log_error(err=err, msg=msg) 

244 raise 

245 

246 def override_entities( 

247 self, entities: List[Union[ContextLDEntity, ContextLDEntityKeyValues]] 

248 ): 

249 """ 

250 Function to create or override existing entites with the NGSI-LD Context Broker. 

251 The batch operation with Upsert will be used. 

252 """ 

253 return self.entity_batch_operation( 

254 entities=entities, action_type=ActionTypeLD.UPSERT, options="replace" 

255 ) 

256 

257 def get_entity( 

258 self, 

259 entity_id: str, 

260 entity_type: str = None, 

261 attrs: List[str] = None, 

262 options: Optional[str] = None, 

263 geometryProperty: Optional[str] = None, 

264 ) -> Union[ContextLDEntity, ContextLDEntityKeyValues, Dict[str, Any]]: 

265 """ 

266 This operation must return one entity element only, but there may be 

267 more than one entity with the same ID (e.g. entities with same ID but 

268 different types). In such case, an error message is returned, with 

269 the HTTP status code set to 409 Conflict. 

270 

271 Args: 

272 entity_id (String): Id of the entity to be retrieved 

273 entity_type (String): Entity type, to avoid ambiguity in case 

274 there are several entities with the same entity id. 

275 attrs (List of Strings): List of attribute names whose data must be 

276 included in the response. The attributes are retrieved in the 

277 order specified by this parameter. 

278 See "Filtering out attributes and metadata" section for more 

279 detail. If this parameter is not included, the attributes are 

280 retrieved in arbitrary order, and all the attributes of the 

281 entity are included in the response. 

282 Example: temperature, humidity. 

283 options (String): keyValues (simplified representation of entity) 

284 or sysAttrs (include generated attrs createdAt and modifiedAt) 

285 geometryProperty (String): Name of a GeoProperty. In the case of GeoJSON 

286 Entity representation, this parameter indicates which GeoProperty to 

287 use for the "geometry" element. By default, it shall be 'location'. 

288 Returns: 

289 ContextEntity 

290 """ 

291 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}") 

292 headers = self.headers.copy() 

293 params = {} 

294 if entity_type: 

295 params.update({"type": entity_type}) 

296 if attrs: 

297 params.update({"attrs": ",".join(attrs)}) 

298 if geometryProperty: 

299 params.update({"geometryProperty": geometryProperty}) 

300 if options: 

301 if options != "keyValues" and options != "sysAttrs": 

302 raise ValueError( 

303 f"Only available options are 'keyValues' and 'sysAttrs'" 

304 ) 

305 params.update({"options": options}) 

306 

307 try: 

308 res = self.get(url=url, params=params, headers=headers) 

309 if res.ok: 

310 self.logger.info("Entity successfully retrieved!") 

311 self.logger.debug("Received: %s", res.json()) 

312 if options == "keyValues": 

313 return ContextLDEntityKeyValues(**res.json()) 

314 else: 

315 return ContextLDEntity(**res.json()) 

316 res.raise_for_status() 

317 except requests.RequestException as err: 

318 msg = f"Could not load entity {entity_id}" 

319 self.log_error(err=err, msg=msg) 

320 raise 

321 

322 GeometryShape = Literal[ 

323 "Point", 

324 "MultiPoint", 

325 "LineString", 

326 "MultiLineString", 

327 "Polygon", 

328 "MultiPolygon", 

329 ] 

330 

331 def get_entity_list( 

332 self, 

333 entity_id: Optional[str] = None, 

334 id_pattern: Optional[str] = ".*", 

335 entity_type: Optional[str] = None, 

336 attrs: Optional[List[str]] = None, 

337 q: Optional[str] = None, 

338 georel: Optional[str] = None, 

339 geometry: Optional[GeometryShape] = None, 

340 coordinates: Optional[str] = None, 

341 geoproperty: Optional[str] = None, 

342 # csf: Optional[str] = None, # Context Source Filter 

343 limit: Optional[PositiveInt] = None, 

344 options: Optional[str] = None, 

345 ) -> List[Union[ContextLDEntity, ContextLDEntityKeyValues]]: 

346 """ 

347 This operation retrieves a list of entities based on different query options. 

348 By default, the operation retrieves all the entities in the context broker. 

349 Args: 

350 entity_id: 

351 Id of the entity to be retrieved 

352 id_pattern: 

353 Regular expression to match the entity id 

354 entity_type: 

355 Entity type, to avoid ambiguity in case there are several 

356 entities with the same entity id. 

357 attrs: 

358 List of attribute names whose data must be included in the response. 

359 q: 

360 Query expression, composed of attribute names, operators and values. 

361 georel: 

362 Geospatial relationship between the query geometry and the entities. 

363 geometry: 

364 Type of geometry for the query. The possible values are Point, 

365 MultiPoint, LineString, MultiLineString, Polygon, MultiPolygon. 

366 coordinates: 

367 Coordinates of the query geometry. The coordinates must be 

368 expressed as a string of comma-separated values. 

369 geoproperty: 

370 Name of a GeoProperty. In the case of GeoJSON Entity representation, 

371 this parameter indicates which GeoProperty to use for the "geometry" element. 

372 limit: 

373 Maximum number of entities to retrieve. 

374 options: 

375 Further options for the query. The available options are: 

376 - keyValues (simplified representation of entity) 

377 - sysAttrs (including createdAt and modifiedAt, etc.) 

378 - count (include number of all matched entities in response header) 

379 """ 

380 url = urljoin(self.base_url, f"{self._url_version}/entities/") 

381 headers = self.headers.copy() 

382 params = {} 

383 if entity_id: 

384 params.update({"id": entity_id}) 

385 if id_pattern: 

386 params.update({"idPattern": id_pattern}) 

387 if entity_type: 

388 params.update({"type": entity_type}) 

389 if attrs: 

390 params.update({"attrs": ",".join(attrs)}) 

391 if q: 

392 x = re.search(r"[=!<>~]{1}\'.*\'", q.replace(" ", "")) 

393 if x is not None: 

394 raise ValueError( 

395 f"String/Date/etc. value in {x.group()} must be " f"in double quote" 

396 ) 

397 params.update({"q": q}) 

398 if georel: 

399 params.update({"georel": georel}) 

400 if geometry: 

401 params.update({"geometry": geometry}) 

402 if coordinates: 

403 params.update({"coordinates": coordinates}) 

404 if geoproperty: 

405 params.update({"geoproperty": geoproperty}) 

406 # if csf: # ContextSourceRegistration not supported yet 

407 # params.update({'csf': csf}) 

408 if limit: 

409 if limit > 1000: 

410 raise ValueError("limit must be an integer value <= 1000") 

411 params.update({"limit": limit}) 

412 if options: 

413 if options != "keyValues" and options != "sysAttrs": 

414 raise ValueError( 

415 f"Only available options are 'keyValues' and 'sysAttrs'" 

416 ) 

417 params.update({"options": options}) 

418 # params.update({'local': 'true'}) 

419 

420 try: 

421 res = self.get(url=url, params=params, headers=headers) 

422 if res.ok: 

423 self.logger.info("Entity successfully retrieved!") 

424 entity_list: List[Union[ContextLDEntity, ContextLDEntityKeyValues]] = [] 

425 if options == "keyValues": 

426 entity_list = [ 

427 ContextLDEntityKeyValues(**item) for item in res.json() 

428 ] 

429 return entity_list 

430 else: 

431 entity_list = [ContextLDEntity(**item) for item in res.json()] 

432 return entity_list 

433 res.raise_for_status() 

434 except requests.RequestException as err: 

435 msg = f"Could not load entity matching{params}" 

436 self.log_error(err=err, msg=msg) 

437 raise 

438 

439 def replace_existing_attributes_of_entity( 

440 self, 

441 entity: Union[ContextLDEntity, ContextLDEntityKeyValues], 

442 append: bool = False, 

443 ): 

444 """ 

445 The attributes previously existing in the entity are removed and 

446 replaced by the ones in the request. 

447 

448 Args: 

449 entity (ContextEntity): 

450 append (bool): 

451 options: 

452 Returns: 

453 

454 """ 

455 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity.id}/attrs") 

456 headers = self.headers.copy() 

457 if isinstance(entity, ContextLDEntityKeyValues): 

458 entity = entity.to_entity() 

459 if entity.model_dump().get("@context", None) is not None: 

460 headers.update({"Content-Type": "application/ld+json"}) 

461 headers.update({"Link": None}) 

462 try: 

463 res = self.patch( 

464 url=url, 

465 headers=headers, 

466 json=entity.model_dump(exclude={"id", "type"}, exclude_none=True), 

467 ) 

468 if res.ok: 

469 self.logger.info(f"Entity {entity.id} successfully " "updated!") 

470 else: 

471 res.raise_for_status() 

472 except requests.RequestException as err: 

473 if err.response is not None and append and err.response.status_code == 207: 

474 return self.append_entity_attributes(entity=entity) 

475 msg = f"Could not replace attribute of entity {entity.id} !" 

476 self.log_error(err=err, msg=msg) 

477 raise 

478 

479 def update_entity_attribute( 

480 self, 

481 entity_id: str, 

482 attr: Union[ 

483 ContextProperty, 

484 ContextRelationship, 

485 NamedContextProperty, 

486 NamedContextRelationship, 

487 ], 

488 attr_name: str = None, 

489 ): 

490 """ 

491 Updates a specified attribute from an entity. 

492 Args: 

493 attr: context attribute to update 

494 entity_id: Id of the entity. Example: Bcn_Welt 

495 entity_type: Entity type, to avoid ambiguity in case there are 

496 several entities with the same entity id. 

497 """ 

498 headers = self.headers.copy() 

499 if not isinstance(attr, NamedContextProperty) or not isinstance( 

500 attr, NamedContextRelationship 

501 ): 

502 assert attr_name is not None, ( 

503 "Missing name for attribute. " 

504 "attr_name must be present if" 

505 "attr is of type ContextAttribute" 

506 ) 

507 else: 

508 assert attr_name is None, ( 

509 "Invalid argument attr_name. Do not set " 

510 "attr_name if attr is of type " 

511 "NamedContextAttribute or NamedContextRelationship" 

512 ) 

513 

514 url = urljoin( 

515 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}" 

516 ) 

517 

518 jsonnn = {} 

519 if isinstance(attr, list) or isinstance(attr, NamedContextProperty): 

520 jsonnn = attr.model_dump(exclude={"name"}, exclude_none=True) 

521 else: 

522 prop = attr.model_dump() 

523 for key, value in prop.items(): 

524 if value and value != "Property": 

525 jsonnn[key] = value 

526 

527 try: 

528 res = self.patch(url=url, headers=headers, json=jsonnn) 

529 if res.ok: 

530 self.logger.info( 

531 f"Attribute {attr_name} of {entity_id} successfully updated!" 

532 ) 

533 else: 

534 res.raise_for_status() 

535 except requests.RequestException as err: 

536 msg = f"Could not update attribute '{attr_name}' of entity {entity_id}" 

537 self.log_error(err=err, msg=msg) 

538 raise 

539 

540 def append_entity_attributes( 

541 self, 

542 entity: Union[ContextLDEntity, ContextLDEntityKeyValues], 

543 options: Optional[str] = None, 

544 ): 

545 """ 

546 Append new Entity attributes to an existing Entity within an NGSI-LD system 

547 Args: 

548 entity (ContextLDEntity): 

549 Entity to append attributes to. 

550 options (str): 

551 Options for the request. The only available value is 

552 'noOverwrite'. If set, it will raise 400, if all attributes 

553 exist already. 

554 

555 """ 

556 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity.id}/attrs") 

557 headers = self.headers.copy() 

558 if entity.model_dump().get("@context", None) is not None: 

559 headers.update({"Content-Type": "application/ld+json"}) 

560 headers.update({"Link": None}) 

561 params = {} 

562 

563 if options: 

564 if options != "noOverwrite": 

565 raise ValueError(f"The only available value is 'noOverwrite'") 

566 params.update({"options": options}) 

567 

568 try: 

569 res = self.post( 

570 url=url, 

571 headers=headers, 

572 params=params, 

573 json=entity.model_dump(exclude={"id", "type"}, exclude_none=True), 

574 ) 

575 if res.ok: 

576 self.logger.info(f"Entity {entity.id} successfully updated!") 

577 else: 

578 res.raise_for_status() 

579 except requests.RequestException as err: 

580 msg = f"Could not update entity {entity.id}!" 

581 self.log_error(err=err, msg=msg) 

582 raise 

583 

584 # def update_existing_attribute_by_name(self, entity: ContextLDEntity 

585 # ): 

586 # pass 

587 

588 def delete_entity_by_id(self, entity_id: str, entity_type: Optional[str] = None): 

589 """ 

590 Deletes an entity by its id. For deleting mulitple entities at once, 

591 entity_batch_operation() is more efficient. 

592 Args: 

593 entity_id: 

594 ID of entity to delete. 

595 entity_type: 

596 Type of entity to delete. 

597 """ 

598 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}") 

599 headers = self.headers.copy() 

600 params = {} 

601 

602 if entity_type: 

603 params.update({"type": entity_type}) 

604 

605 try: 

606 res = self.delete(url=url, headers=headers, params=params) 

607 if res.ok: 

608 self.logger.info(f"Entity {entity_id} successfully deleted") 

609 else: 

610 res.raise_for_status() 

611 except requests.RequestException as err: 

612 msg = f"Could not delete entity {entity_id}" 

613 self.log_error(err=err, msg=msg) 

614 raise 

615 

616 def delete_attribute(self, entity_id: str, attribute_id: str): 

617 """ 

618 Deletes an attribute from an entity. 

619 Args: 

620 entity_id: 

621 ID of the entity. 

622 attribute_id: 

623 Name of the attribute to delete. 

624 Returns: 

625 

626 """ 

627 url = urljoin( 

628 self.base_url, 

629 f"{self._url_version}/entities/{entity_id}/attrs/{attribute_id}", 

630 ) 

631 headers = self.headers.copy() 

632 

633 try: 

634 res = self.delete(url=url, headers=headers) 

635 if res.ok: 

636 self.logger.info( 

637 f"Attribute {attribute_id} of Entity {entity_id} successfully deleted" 

638 ) 

639 else: 

640 res.raise_for_status() 

641 except requests.RequestException as err: 

642 msg = f"Could not delete attribute {attribute_id} of entity {entity_id}" 

643 self.log_error(err=err, msg=msg) 

644 raise 

645 

646 # SUBSCRIPTION API ENDPOINTS 

647 def get_subscription_list(self, limit: PositiveInt = inf) -> List[SubscriptionLD]: 

648 """ 

649 Returns a list of all the subscriptions present in the system. 

650 Args: 

651 limit: Limit the number of subscriptions to be retrieved 

652 Returns: 

653 list of subscriptions 

654 """ 

655 url = urljoin(self.base_url, f"{self._url_version}/subscriptions/") 

656 headers = self.headers.copy() 

657 params = {} 

658 

659 # We always use the 'count' option to check weather pagination is 

660 # required 

661 params.update({"options": "count"}) 

662 try: 

663 items = self.__pagination( 

664 limit=limit, url=url, params=params, headers=headers 

665 ) 

666 adapter = TypeAdapter(List[SubscriptionLD]) 

667 return adapter.validate_python(items) 

668 except requests.RequestException as err: 

669 msg = "Could not load subscriptions!" 

670 self.log_error(err=err, msg=msg) 

671 raise 

672 

673 def post_subscription( 

674 self, subscription: SubscriptionLD, update: bool = False 

675 ) -> str: 

676 """ 

677 Creates a new subscription. The subscription is represented by a 

678 Subscription object defined in filip.cb.models. 

679 

680 If the subscription already exists, the adding is prevented and the id 

681 of the existing subscription is returned. 

682 

683 A subscription is deemed as already existing if there exists a 

684 subscription with the exact same subject and notification fields. All 

685 optional fields are not considered. 

686 

687 Args: 

688 subscription: Subscription 

689 update: True - If the subscription already exists, update it 

690 False- If the subscription already exists, throw warning 

691 Returns: 

692 str: Id of the (created) subscription 

693 

694 """ 

695 existing_subscriptions = self.get_subscription_list() 

696 

697 sub_hash = subscription.model_dump_json( 

698 include={"subject", "notification", "type"} 

699 ) 

700 for ex_sub in existing_subscriptions: 

701 if sub_hash == ex_sub.model_dump_json( 

702 include={"subject", "notification", "type"} 

703 ): 

704 self.logger.info("Subscription already exists") 

705 if update: 

706 self.logger.info("Updated subscription") 

707 subscription.id = ex_sub.id 

708 self.update_subscription(subscription) 

709 else: 

710 self.logger.warning( 

711 f"Subscription existed already with the id" f" {ex_sub.id}" 

712 ) 

713 return ex_sub.id 

714 

715 url = urljoin(self.base_url, f"{self._url_version}/subscriptions") 

716 headers = self.headers.copy() 

717 if subscription.model_dump().get("@context", None) is not None: 

718 headers.update({"Content-Type": "application/ld+json"}) 

719 headers.update({"Link": None}) 

720 try: 

721 res = self.post( 

722 url=url, 

723 headers=headers, 

724 data=subscription.model_dump_json( 

725 exclude_unset=False, exclude_defaults=False, exclude_none=True 

726 ), 

727 ) 

728 if res.ok: 

729 self.logger.info("Subscription successfully created!") 

730 return res.headers["Location"].split("/")[-1] 

731 res.raise_for_status() 

732 except requests.RequestException as err: 

733 msg = "Could not send subscription!" 

734 self.log_error(err=err, msg=msg) 

735 raise 

736 

737 def get_subscription(self, subscription_id: str) -> SubscriptionLD: 

738 """ 

739 Retrieves a subscription from the context broker. 

740 Args: 

741 subscription_id: id of the subscription 

742 

743 Returns: 

744 

745 """ 

746 url = urljoin( 

747 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}" 

748 ) 

749 headers = self.headers.copy() 

750 try: 

751 res = self.get(url=url, headers=headers) 

752 if res.ok: 

753 self.logger.debug("Received: %s", res.json()) 

754 return SubscriptionLD(**res.json()) 

755 res.raise_for_status() 

756 except requests.RequestException as err: 

757 msg = f"Could not load subscription {subscription_id}" 

758 self.log_error(err=err, msg=msg) 

759 raise 

760 

761 def update_subscription(self, subscription: SubscriptionLD) -> None: 

762 """ 

763 Only the fields included in the request are updated in the subscription. 

764 Args: 

765 subscription: Subscription to update 

766 Returns: 

767 

768 """ 

769 url = urljoin( 

770 self.base_url, f"{self._url_version}/subscriptions/{subscription.id}" 

771 ) 

772 headers = self.headers.copy() 

773 if subscription.model_dump().get("@context", None) is not None: 

774 headers.update({"Content-Type": "application/ld+json"}) 

775 headers.update({"Link": None}) 

776 try: 

777 res = self.patch( 

778 url=url, 

779 headers=headers, 

780 data=subscription.model_dump_json( 

781 exclude={"id"}, 

782 exclude_none=True, 

783 ), 

784 ) 

785 if res.ok: 

786 self.logger.info("Subscription successfully updated!") 

787 else: 

788 res.raise_for_status() 

789 except requests.RequestException as err: 

790 msg = f"Could not update subscription {subscription.id}" 

791 self.log_error(err=err, msg=msg) 

792 raise 

793 

794 def delete_subscription(self, subscription_id: str) -> None: 

795 """ 

796 Deletes a subscription from a Context Broker 

797 Args: 

798 subscription_id: id of the subscription 

799 """ 

800 url = urljoin( 

801 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}" 

802 ) 

803 headers = self.headers.copy() 

804 try: 

805 res = self.delete(url=url, headers=headers) 

806 if res.ok: 

807 self.logger.info( 

808 f"Subscription '{subscription_id}' " f"successfully deleted!" 

809 ) 

810 else: 

811 res.raise_for_status() 

812 except requests.RequestException as err: 

813 msg = f"Could not delete subscription {subscription_id}" 

814 self.log_error(err=err, msg=msg) 

815 raise 

816 

817 def log_multi_errors(self, errors: List[Dict]) -> None: 

818 for error in errors: 

819 entity_id = error["entityId"] 

820 error_details: dict = error["error"] 

821 error_title = error_details.get("title") 

822 error_status = error_details.get("status") 

823 # error_detail = error_details['detail'] 

824 self.logger.error( 

825 "Response status: %d, Entity: %s, Reason: %s", 

826 error_status, 

827 entity_id, 

828 error_title, 

829 ) 

830 

831 def handle_multi_status_response(self, res: requests.Response): 

832 """ 

833 Handles the response of a batch_operation. If the response contains 

834 errors, they are logged. If the response contains only errors, a RuntimeError 

835 is raised. 

836 Args: 

837 res: 

838 

839 Returns: 

840 

841 """ 

842 try: 

843 res.raise_for_status() 

844 if res.text: 

845 response_data = res.json() 

846 if "errors" in response_data: 

847 errors = response_data["errors"] 

848 self.log_multi_errors(errors) 

849 if "success" in response_data: 

850 successList = response_data["success"] 

851 if len(successList) == 0: 

852 raise RuntimeError( 

853 "Batch operation resulted in errors only, see logs" 

854 ) 

855 else: 

856 self.logger.info("Empty response received.") 

857 except json.JSONDecodeError: 

858 self.logger.info( 

859 "Error decoding JSON. Response may not be in valid JSON format." 

860 ) 

861 

862 # Batch operation API 

863 def entity_batch_operation( 

864 self, 

865 *, 

866 entities: List[Union[ContextLDEntity, ContextLDEntityKeyValues]], 

867 action_type: Union[ActionTypeLD, str], 

868 options: Literal["noOverwrite", "replace", "update"] = None, 

869 ) -> None: 

870 """ 

871 This operation allows to create, update and/or delete several entities 

872 in a single batch operation. 

873 

874 This operation is split in as many individual operations as entities 

875 in the entities vector, so the actionType is executed for each one of 

876 them. Depending on the actionType, a mapping with regular non-batch 

877 operations can be done: 

878 

879 append: maps to POST /v2/entities (if the entity does not already exist) 

880 or POST /v2/entities/<id>/attrs (if the entity already exists). 

881 

882 appendStrict: maps to POST /v2/entities (if the entity does not 

883 already exist) or POST /v2/entities/<id>/attrs?options=append (if the 

884 entity already exists). 

885 

886 update: maps to PATCH /v2/entities/<id>/attrs. 

887 

888 delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every 

889 attribute included in the entity or to DELETE /v2/entities/<id> if 

890 no attribute were included in the entity. 

891 

892 replace: maps to PUT /v2/entities/<id>/attrs. 

893 

894 Args: 

895 entities: "an array of entities, each entity specified using the " 

896 "JSON entity representation format " 

897 action_type (Update): "actionType, to specify the kind of update 

898 action to do: either append, appendStrict, update, delete, 

899 or replace. " 

900 options (str): Optional 'noOverwrite' 'replace' 'update' 

901 

902 Returns: 

903 

904 """ 

905 

906 url = urljoin( 

907 self.base_url, f"{self._url_version}/entityOperations/{action_type.value}" 

908 ) 

909 

910 headers = self.headers.copy() 

911 ctx = any(e.model_dump().get("@context", None) is not None for e in entities) 

912 

913 nctx = any(e.model_dump().get("@context", None) is None for e in entities) 

914 if ctx and not nctx: 

915 headers.update({"Content-Type": "application/ld+json"}) 

916 headers.update({"Link": None}) 

917 elif not ctx and nctx: 

918 headers.update({"Content-Type": "application/json"}) 

919 else: 

920 self.logger.warning( 

921 "Detected mixed context provision in batch operation: " 

922 "Some entities have @context field while others don't. " 

923 "FiLiP use application/json and Link header by default, so that " 

924 "the entities with @context will be rejected by CB" 

925 ) 

926 

927 params = {} 

928 if options: 

929 params.update({"options": options}) 

930 update = UpdateLD(entities=entities) 

931 try: 

932 if action_type == ActionTypeLD.DELETE: 

933 id_list = [entity.id for entity in entities] 

934 res = self.post( 

935 url=url, headers=headers, params=params, data=json.dumps(id_list) 

936 ) 

937 else: 

938 res = self.post( 

939 url=url, 

940 headers=headers, 

941 params=params, 

942 data=json.dumps( 

943 update.model_dump( 

944 by_alias=True, 

945 exclude_none=True, 

946 ).get("entities") 

947 ), 

948 ) 

949 self.handle_multi_status_response(res) 

950 except RuntimeError as rerr: 

951 raise rerr 

952 except Exception as err: 

953 raise err 

954 else: 

955 self.logger.info(f"Update operation {action_type} succeeded!") 

956 

957 def validate_relationship( 

958 self, 

959 relationship: Union[ 

960 NamedContextProperty, 

961 ContextProperty, 

962 NamedContextRelationship, 

963 ContextRelationship, 

964 Dict, 

965 ], 

966 ) -> bool: 

967 """ 

968 Validates a relationship. A relationship is valid if it points to an existing 

969 entity. Otherwise, it is considered invalid 

970 

971 Args: 

972 relationship: relationship to validate,assumed to be property or relationship 

973 since there is no geoproperty with string value 

974 Returns 

975 True if the relationship is valid, False otherwise 

976 """ 

977 if hasattr(relationship, "value"): 

978 destination_id = relationship.value 

979 elif hasattr(relationship, "object"): 

980 destination_id = relationship.object 

981 elif isinstance(relationship, dict): 

982 _sentinel = object() 

983 destination_id = relationship.get("value", _sentinel) 

984 if destination_id is _sentinel: 

985 raise ValueError( 

986 "Invalid ld relationship dictionary format\n" 

987 "Expected format: {" 

988 f'"type": "{DataTypeLD.RELATIONSHIP[0]}", ' 

989 '"value" "entity_id"}' 

990 ) 

991 else: 

992 raise ValueError("Invalid relationship type.") 

993 try: 

994 destination_entity = self.get_entity(entity_id=destination_id) 

995 return destination_entity.id == destination_id 

996 except requests.RequestException as err: 

997 if err.response.status_code == 404: 

998 return False