Coverage for filip / clients / ngsi_v2 / cb.py: 81%

703 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-02 08:01 +0000

1""" 

2Context Broker Module for API Client 

3""" 

4 

5from __future__ import annotations 

6 

7import copy 

8import json 

9from copy import deepcopy 

10from enum import Enum 

11from math import inf 

12from packaging.version import parse as parse_version 

13from packaging import version 

14from pydantic import PositiveInt, PositiveFloat, AnyHttpUrl, ValidationError 

15from pydantic.type_adapter import TypeAdapter 

16from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union 

17import re 

18import requests 

19from urllib.parse import urljoin 

20import warnings 

21from requests import RequestException 

22from filip.clients.base_http_client import BaseHttpClient, NgsiURLVersion 

23from filip.config import settings 

24from filip.models.base import FiwareHeader, PaginationMethod, DataType 

25from filip.utils.simple_ql import QueryString 

26from filip.models.ngsi_v2.context import ( 

27 ActionType, 

28 Command, 

29 ContextEntity, 

30 ContextEntityKeyValues, 

31 ContextAttribute, 

32 NamedCommand, 

33 NamedContextAttribute, 

34 Query, 

35 Update, 

36 PropertyFormat, 

37 ContextEntityList, 

38 ContextEntityKeyValuesList, 

39 ContextEntityValidationList, 

40 ContextEntityKeyValuesValidationList, 

41) 

42from filip.models.ngsi_v2.base import AttrsFormat 

43from filip.models.ngsi_v2.subscriptions import Subscription, Message 

44from filip.models.ngsi_v2.registrations import Registration 

45from filip.clients.exceptions import BaseHttpClientException 

46 

47if TYPE_CHECKING: 

48 from filip.clients.ngsi_v2.iota import IoTAClient 

49 

50 

51class ContextBrokerClient(BaseHttpClient): 

52 """ 

53 Implementation of NGSI Context Broker functionalities, such as creating 

54 entities and subscriptions; retrieving, updating and deleting data. 

55 Further documentation: 

56 https://fiware-orion.readthedocs.io/en/master/ 

57 

58 Api specifications for v2 are located here: 

59 https://telefonicaid.github.io/fiware-orion/api/v2/stable/ 

60 

61 Note: 

62 We use the reference implementation for development. Therefore, some 

63 other brokers may show slightly different behavior! 

64 """ 

65 

66 def __init__( 

67 self, 

68 url: str = None, 

69 *, 

70 session: requests.Session = None, 

71 fiware_header: FiwareHeader = None, 

72 **kwargs, 

73 ): 

74 """ 

75 

76 Args: 

77 url: Url of context broker server 

78 session (requests.Session): 

79 fiware_header (FiwareHeader): fiware service and fiware service path 

80 **kwargs (Optional): Optional arguments that ``request`` takes. 

81 """ 

82 # set service url 

83 url = url or settings.CB_URL 

84 self._url_version = NgsiURLVersion.v2_url.value 

85 super().__init__( 

86 url=url, session=session, fiware_header=fiware_header, **kwargs 

87 ) 

88 self._check_correct_cb_version() 

89 

90 def __pagination( 

91 self, 

92 *, 

93 method: PaginationMethod = PaginationMethod.GET, 

94 url: str, 

95 headers: Dict, 

96 limit: Union[PositiveInt, PositiveFloat] = None, 

97 params: Dict = None, 

98 data: str = None, 

99 ) -> List[Dict]: 

100 """ 

101 NGSIv2 implements a pagination mechanism in order to help clients to 

102 retrieve large sets of resources. This mechanism works for all listing 

103 operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions, 

104 POST /v2/op/query, etc.). This function helps getting datasets that are 

105 larger than the limit for the different GET operations. 

106 

107 https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html 

108 

109 Args: 

110 url: Information about the url, obtained from the original function 

111 headers: The headers from the original function 

112 params: 

113 limit: 

114 

115 Returns: 

116 object: 

117 

118 """ 

119 if limit is None: 

120 limit = inf 

121 if limit > 1000: 

122 params["limit"] = 1000 # maximum items per request 

123 else: 

124 params["limit"] = limit 

125 

126 if self.session: 

127 session = self.session 

128 else: 

129 session = requests.Session() 

130 with session: 

131 res = session.request( 

132 method=method, url=url, params=params, headers=headers, data=data 

133 ) 

134 if res.ok: 

135 items = res.json() 

136 # do pagination 

137 count = int(res.headers["Fiware-Total-Count"]) 

138 

139 while len(items) < limit and len(items) < count: 

140 # Establishing the offset from where entities are retrieved 

141 params["offset"] = len(items) 

142 params["limit"] = min(1000, (limit - len(items))) 

143 res = session.request( 

144 method=method, 

145 url=url, 

146 params=params, 

147 headers=headers, 

148 data=data, 

149 ) 

150 if res.ok: 

151 items.extend(res.json()) 

152 else: 

153 res.raise_for_status() 

154 self.logger.debug("Received: %s", items) 

155 return items 

156 res.raise_for_status() 

157 

158 # MANAGEMENT API 

159 def get_version(self) -> Dict: 

160 """ 

161 Gets version of IoT Agent 

162 Returns: 

163 Dictionary with response 

164 """ 

165 url = urljoin(self.base_url, "version") 

166 try: 

167 res = self.get(url=url, headers=self.headers) 

168 if res.ok: 

169 return res.json() 

170 res.raise_for_status() 

171 except requests.RequestException as err: 

172 self.logger.error(err) 

173 msg = f"Fetch version fails, reason: {err.args}" 

174 raise BaseHttpClientException(message=msg, response=err.response) from err 

175 

176 def _check_correct_cb_version(self) -> None: 

177 """ 

178 Checks whether the used Orion version is greater or equal than the minimum required orion version of 

179 the current filip version 

180 """ 

181 orion_version = self.get_version()["orion"]["version"] 

182 if version.parse(orion_version) < version.parse(settings.MINIMUM_ORION_VERSION): 

183 self.logger.warning( 

184 f"You are using orion version {orion_version}. There was a breaking change in Orion Version " 

185 f"{settings.MINIMUM_ORION_VERSION}, therefore functionality is not assured when using " 

186 f"version {orion_version}." 

187 ) 

188 

189 def get_resources(self) -> Dict: 

190 """ 

191 Gets reo 

192 

193 Returns: 

194 Dict 

195 """ 

196 url = urljoin(self.base_url, self._url_version) 

197 try: 

198 res = self.get(url=url, headers=self.headers) 

199 if res.ok: 

200 return res.json() 

201 res.raise_for_status() 

202 except requests.RequestException as err: 

203 self.logger.error(err) 

204 raise RequestException(response=err.response) from err 

205 

206 # STATISTICS API 

207 def get_statistics(self) -> Dict: 

208 """ 

209 Gets statistics of context broker 

210 Returns: 

211 Dictionary with response 

212 """ 

213 url = urljoin(self.base_url, "statistics") 

214 try: 

215 res = self.get(url=url, headers=self.headers) 

216 if res.ok: 

217 return res.json() 

218 res.raise_for_status() 

219 except requests.RequestException as err: 

220 self.logger.error(err) 

221 raise BaseHttpClientException( 

222 message=err.response.text, response=err.response 

223 ) from err 

224 

225 # CONTEXT MANAGEMENT API ENDPOINTS 

226 # Entity Operations 

227 def post_entity( 

228 self, 

229 entity: Union[ContextEntity, ContextEntityKeyValues], 

230 update: bool = False, 

231 patch: bool = False, 

232 override_metadata: bool = True, 

233 key_values: bool = False, 

234 ): 

235 """ 

236 Function registers an Object with the NGSI Context Broker, 

237 if it already exists it can be automatically updated (overwritten) 

238 if the update bool is True. 

239 First a post request with the entity is tried, if the response code 

240 is 422 the entity is uncrossable, as it already exists there are two 

241 options, either overwrite it, if the attribute have changed 

242 (e.g. at least one new/new values) (update = True) or leave 

243 it the way it is (update=False) 

244 If you only want to manipulate the entities values, you need to set 

245 patch argument. 

246 

247 Args: 

248 entity (ContextEntity/ContextEntityKeyValues): 

249 Context Entity Object 

250 update (bool): 

251 If the response.status_code is 422, whether the override and 

252 existing entity 

253 patch (bool): 

254 If the response.status_code is 422, whether to manipulate the 

255 existing entity. Omitted if update `True`. 

256 override_metadata: 

257 Only applies for patch equal to `True`. 

258 Whether to override or append the attribute's metadata. 

259 `True` for overwrite or `False` for update/append 

260 key_values(bool): 

261 By default False. If set to True, "options=keyValues" will 

262 be included in params of post request. The payload uses 

263 the keyValues simplified entity representation, i.e. 

264 ContextEntityKeyValues. 

265 """ 

266 url = urljoin(self.base_url, f"{self._url_version}/entities") 

267 headers = self.headers.copy() 

268 params = {} 

269 options = [] 

270 if key_values: 

271 assert isinstance(entity, ContextEntityKeyValues) 

272 options.append("keyValues") 

273 else: 

274 assert isinstance(entity, ContextEntity) 

275 if options: 

276 params.update({"options": ",".join(options)}) 

277 try: 

278 res = self.post( 

279 url=url, 

280 headers=headers, 

281 json=entity.model_dump(exclude_none=True), 

282 params=params, 

283 ) 

284 if res.ok: 

285 self.logger.info("Entity successfully posted!") 

286 return res.headers.get("Location") 

287 res.raise_for_status() 

288 except requests.RequestException as err: 

289 if err.response is not None: 

290 if update and err.response.status_code == 422: 

291 return self.override_entity(entity=entity, key_values=key_values) 

292 if patch and err.response.status_code == 422: 

293 return self.patch_entity( 

294 entity=entity, 

295 override_metadata=override_metadata, 

296 key_values=key_values, 

297 ) 

298 msg = f"Could not post entity {entity.id}" 

299 raise BaseHttpClientException(message=msg, response=err.response) from err 

300 

301 def get_entity_list( 

302 self, 

303 *, 

304 entity_ids: List[str] = None, 

305 entity_types: List[str] = None, 

306 id_pattern: str = None, 

307 type_pattern: str = None, 

308 q: Union[str, QueryString] = None, 

309 mq: Union[str, QueryString] = None, 

310 georel: str = None, 

311 geometry: str = None, 

312 coords: str = None, 

313 limit: PositiveInt = inf, 

314 attrs: List[str] = None, 

315 metadata: str = None, 

316 order_by: str = None, 

317 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

318 include_invalid: bool = False, 

319 ) -> Union[ 

320 List[Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]], 

321 ContextEntityValidationList, 

322 ContextEntityKeyValuesValidationList, 

323 ]: 

324 r""" 

325 Retrieves a list of context entities that match different criteria by 

326 id, type, pattern matching (either id or type) and/or those which 

327 match a query or geographical query (see Simple Query Language and 

328 Geographical Queries). A given entity has to match all the criteria 

329 to be retrieved (i.e., the criteria is combined in a logical AND 

330 way). Note that pattern matching query parameters are incompatible 

331 (i.e. mutually exclusive) with their corresponding exact matching 

332 parameters, i.e. idPattern with id and typePattern with type. 

333 

334 Args: 

335 entity_ids: A comma-separated list of elements. Retrieve entities 

336 whose ID matches one of the elements in the list. 

337 Incompatible with idPattern,e.g. Boe_Idarium 

338 entity_types: comma-separated list of elements. Retrieve entities 

339 whose type matches one of the elements in the list. 

340 Incompatible with typePattern. Example: Room. 

341 id_pattern: A correctly formatted regular expression. Retrieve 

342 entities whose ID matches the regular expression. Incompatible 

343 with id, e.g. ngsi-ld.* or sensor.* 

344 type_pattern: A correctly formatted regular expression. Retrieve 

345 entities whose type matches the regular expression. 

346 Incompatible with type, e.g. room.* 

347 q (SimpleQuery): A query expression, composed of a list of 

348 statements separated by ;, i.e., 

349 q=statement1;statement2;statement3. See Simple Query 

350 Language specification. Example: temperature>40. 

351 mq (SimpleQuery): A query expression for attribute metadata, 

352 composed of a list of statements separated by ;, i.e., 

353 mq=statement1;statement2;statement3. See Simple Query 

354 Language specification. Example: temperature.accuracy<0.9. 

355 georel: Spatial relationship between matching entities and a 

356 reference shape. See Geographical Queries. Example: 'near'. 

357 geometry: Geographical area to which the query is restricted. 

358 See Geographical Queries. Example: point. 

359 coords: List of latitude-longitude pairs of coordinates separated 

360 by ';'. See Geographical Queries. Example: 41.390205, 

361 2.154007;48.8566,2.3522. 

362 limit: Limits the number of entities to be retrieved Example: 20 

363 attrs: Comma-separated list of attribute names whose data are to 

364 be included in the response. The attributes are retrieved in 

365 the order specified by this parameter. If this parameter is 

366 not included, the attributes are retrieved in arbitrary 

367 order. See "Filtering out attributes and metadata" section in https://fiware-orion.readthedocs.io/en/master/orion-api.html#filtering-out-attributes-and-metadata 

368 for more detail. Example: seatNumber. 

369 metadata: A list of metadata names to include in the response. 

370 See "Filtering out attributes and metadata" section in https://fiware-orion.readthedocs.io/en/master/orion-api.html#filtering-out-attributes-and-metadata for more 

371 detail. Example: accuracy. 

372 order_by: Criteria for ordering results. See "Ordering Results" 

373 section for details. Example: temperature,!speed. 

374 response_format (AttrsFormat, str): Response Format. Note: That if 

375 'keyValues' or 'values' are used the response model will 

376 change to List[ContextEntityKeyValues] and to List[Dict[str, 

377 Any]], respectively. 

378 include_invalid: Specify if the returned list should also contain a list of invalid entity IDs or not. 

379 Returns: 

380 

381 """ 

382 url = urljoin(self.base_url, f"{self._url_version}/entities/") 

383 headers = self.headers.copy() 

384 params = {} 

385 

386 if entity_ids and id_pattern: 

387 raise ValueError 

388 if entity_types and type_pattern: 

389 raise ValueError 

390 if entity_ids: 

391 if not isinstance(entity_ids, list): 

392 entity_ids = [entity_ids] 

393 params.update({"id": ",".join(entity_ids)}) 

394 if id_pattern: 

395 try: 

396 re.compile(id_pattern) 

397 except re.error as err: 

398 raise ValueError(f"Invalid Pattern: {err}") from err 

399 params.update({"idPattern": id_pattern}) 

400 if entity_types: 

401 if not isinstance(entity_types, list): 

402 entity_types = [entity_types] 

403 params.update({"type": ",".join(entity_types)}) 

404 if type_pattern: 

405 try: 

406 re.compile(type_pattern) 

407 except re.error as err: 

408 raise ValueError(f"Invalid Pattern: {err.msg}") from err 

409 params.update({"typePattern": type_pattern}) 

410 if attrs: 

411 params.update({"attrs": ",".join(attrs)}) 

412 if metadata: 

413 params.update({"metadata": ",".join(metadata)}) 

414 if q: 

415 if isinstance(q, str): 

416 q = QueryString.parse_str(q) 

417 params.update({"q": str(q)}) 

418 if mq: 

419 params.update({"mq": str(mq)}) 

420 if geometry: 

421 params.update({"geometry": geometry}) 

422 if georel: 

423 params.update({"georel": georel}) 

424 if coords: 

425 params.update({"coords": coords}) 

426 if order_by: 

427 params.update({"orderBy": order_by}) 

428 if response_format not in list(AttrsFormat): 

429 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

430 response_format = ",".join(["count", response_format]) 

431 params.update({"options": response_format}) 

432 try: 

433 items = self.__pagination( 

434 method=PaginationMethod.GET, 

435 limit=limit, 

436 url=url, 

437 params=params, 

438 headers=headers, 

439 ) 

440 if include_invalid: 

441 valid_entities = [] 

442 invalid_entities = [] 

443 

444 if AttrsFormat.NORMALIZED in response_format: 

445 adapter = TypeAdapter(ContextEntity) 

446 

447 for entity in items: 

448 try: 

449 valid_entity = adapter.validate_python(entity) 

450 valid_entities.append(valid_entity) 

451 except ValidationError: 

452 invalid_entities.append(entity.get("id")) 

453 

454 return ContextEntityValidationList.model_validate( 

455 { 

456 "entities": valid_entities, 

457 "invalid_entities": invalid_entities, 

458 } 

459 ) 

460 elif AttrsFormat.KEY_VALUES in response_format: 

461 adapter = TypeAdapter(ContextEntityKeyValues) 

462 

463 for entity in items: 

464 try: 

465 valid_entity = adapter.validate_python(entity) 

466 valid_entities.append(valid_entity) 

467 except ValidationError: 

468 invalid_entities.append(entity.get("id")) 

469 

470 return ContextEntityKeyValuesValidationList.model_validate( 

471 { 

472 "entities": valid_entities, 

473 "invalid_entities": invalid_entities, 

474 } 

475 ) 

476 else: 

477 return items 

478 else: 

479 if AttrsFormat.NORMALIZED in response_format: 

480 return ContextEntityList.model_validate( 

481 {"entities": items} 

482 ).entities 

483 elif AttrsFormat.KEY_VALUES in response_format: 

484 return ContextEntityKeyValuesList.model_validate( 

485 {"entities": items} 

486 ).entities 

487 return items # in case of VALUES as response_format 

488 

489 except requests.RequestException as err: 

490 msg = "Could not load entities" 

491 raise BaseHttpClientException(message=msg, response=err.response) from err 

492 

493 def get_entity( 

494 self, 

495 entity_id: str, 

496 entity_type: str = None, 

497 attrs: List[str] = None, 

498 metadata: List[str] = None, 

499 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

500 ) -> Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]: 

501 """ 

502 This operation must return one entity element only, but there may be 

503 more than one entity with the same ID (e.g. entities with same ID but 

504 different types). In such case, an error message is returned, with 

505 the HTTP status code set to 409 Conflict. 

506 

507 Args: 

508 entity_id (String): Id of the entity to be retrieved 

509 entity_type (String): Entity type, to avoid ambiguity in case 

510 there are several entities with the same entity id. 

511 attrs (List of Strings): List of attribute names whose data must be 

512 included in the response. The attributes are retrieved in the 

513 order specified by this parameter. 

514 See "Filtering out attributes and metadata" section in https://fiware-orion.readthedocs.io/en/master/orion-api.html#filtering-out-attributes-and-metadata for more 

515 detail. If this parameter is not included, the attributes are 

516 retrieved in arbitrary order, and all the attributes of the 

517 entity are included in the response. 

518 Example: temperature, humidity. 

519 metadata (List of Strings): A list of metadata names to include in 

520 the response. See "Filtering out attributes and metadata" 

521 section for more detail. Example: accuracy. 

522 response_format (AttrsFormat, str): Representation format of 

523 response 

524 Returns: 

525 ContextEntity 

526 """ 

527 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}") 

528 headers = self.headers.copy() 

529 params = {} 

530 if entity_type: 

531 params.update({"type": entity_type}) 

532 if attrs: 

533 params.update({"attrs": ",".join(attrs)}) 

534 if metadata: 

535 params.update({"metadata": ",".join(metadata)}) 

536 if response_format not in list(AttrsFormat): 

537 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

538 params.update({"options": response_format}) 

539 

540 try: 

541 res = self.get(url=url, params=params, headers=headers) 

542 if res.ok: 

543 self.logger.info("Entity successfully retrieved!") 

544 self.logger.debug("Received: %s", res.json()) 

545 if response_format == AttrsFormat.NORMALIZED: 

546 return ContextEntity(**res.json()) 

547 if response_format == AttrsFormat.KEY_VALUES: 

548 return ContextEntityKeyValues(**res.json()) 

549 return res.json() 

550 res.raise_for_status() 

551 except requests.RequestException as err: 

552 msg = f"Could not load entity {entity_id}" 

553 raise BaseHttpClientException(message=msg, response=err.response) from err 

554 

555 def get_entity_attributes( 

556 self, 

557 entity_id: str, 

558 entity_type: str = None, 

559 attrs: List[str] = None, 

560 metadata: List[str] = None, 

561 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

562 ) -> Dict[str, ContextAttribute]: 

563 """ 

564 This request is similar to retrieving the whole entity, however this 

565 one omits the id and type fields. Just like the general request of 

566 getting an entire entity, this operation must return only one entity 

567 element. If more than one entity with the same ID is found (e.g. 

568 entities with same ID but different type), an error message is 

569 returned, with the HTTP status code set to 409 Conflict. 

570 

571 Args: 

572 entity_id (String): Id of the entity to be retrieved 

573 entity_type (String): Entity type, to avoid ambiguity in case 

574 there are several entities with the same entity id. 

575 attrs (List of Strings): List of attribute names whose data must be 

576 included in the response. The attributes are retrieved in the 

577 order specified by this parameter. 

578 See "Filtering out attributes and metadata" section in https://fiware-orion.readthedocs.io/en/master/orion-api.html#filtering-out-attributes-and-metadata for more 

579 detail. If this parameter is not included, the attributes are 

580 retrieved in arbitrary order, and all the attributes of the 

581 entity are included in the response. Example: temperature, 

582 humidity. 

583 metadata (List of Strings): A list of metadata names to include in 

584 the response. See "Filtering out attributes and metadata" 

585 section for more detail. Example: accuracy. 

586 response_format (AttrsFormat, str): Representation format of 

587 response 

588 Returns: 

589 Dict 

590 """ 

591 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs") 

592 headers = self.headers.copy() 

593 params = {} 

594 if entity_type: 

595 params.update({"type": entity_type}) 

596 if attrs: 

597 params.update({"attrs": ",".join(attrs)}) 

598 if metadata: 

599 params.update({"metadata": ",".join(metadata)}) 

600 if response_format not in list(AttrsFormat): 

601 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

602 params.update({"options": response_format}) 

603 try: 

604 res = self.get(url=url, params=params, headers=headers) 

605 if res.ok: 

606 if response_format == AttrsFormat.NORMALIZED: 

607 return { 

608 key: ContextAttribute(**values) 

609 for key, values in res.json().items() 

610 } 

611 return res.json() 

612 res.raise_for_status() 

613 except requests.RequestException as err: 

614 msg = f"Could not load attributes from entity {entity_id} !" 

615 raise BaseHttpClientException(message=msg, response=err.response) from err 

616 

617 def update_entity( 

618 self, 

619 entity: Union[ContextEntity, ContextEntityKeyValues, dict], 

620 append_strict: bool = False, 

621 key_values: bool = False, 

622 ): 

623 """ 

624 The request payload is an object representing the attributes to 

625 append or update. 

626 

627 Note: 

628 Update means overwriting the existing entity. If you want to 

629 manipulate you should rather use patch_entity. 

630 

631 Args: 

632 entity (ContextEntity): 

633 append_strict: If `False` the entity attributes are updated (if they 

634 previously exist) or appended (if they don't previously exist) 

635 with the ones in the payload. 

636 If `True` all the attributes in the payload not 

637 previously existing in the entity are appended. In addition 

638 to that, in case some of the attributes in the payload 

639 already exist in the entity, an error is returned. 

640 More precisely this means a strict append procedure. 

641 key_values: By default False. If set to True, the payload uses 

642 the keyValues simplified entity representation, i.e. 

643 ContextEntityKeyValues. 

644 Returns: 

645 None 

646 """ 

647 if key_values: 

648 if isinstance(entity, dict): 

649 entity = copy.deepcopy(entity) 

650 _id = entity.pop("id") 

651 _type = entity.pop("type") 

652 attrs = entity 

653 entity = ContextEntityKeyValues(id=_id, type=_type) 

654 else: 

655 attrs = entity.model_dump(exclude={"id", "type"}) 

656 else: 

657 attrs = entity.get_attributes() 

658 self.update_or_append_entity_attributes( 

659 entity_id=entity.id, 

660 entity_type=entity.type, 

661 attrs=attrs, 

662 append_strict=append_strict, 

663 key_values=key_values, 

664 ) 

665 

666 def update_entity_properties( 

667 self, entity: ContextEntity, append_strict: bool = False 

668 ): 

669 """ 

670 The request payload is an object representing the attributes, of any type 

671 but Relationship, to append or update. 

672 

673 Note: 

674 Update means overwriting the existing entity. If you want to 

675 manipulate you should rather use patch_entity. 

676 

677 Args: 

678 entity (ContextEntity): 

679 append_strict: If `False` the entity attributes are updated (if they 

680 previously exist) or appended (if they don't previously exist) 

681 with the ones in the payload. 

682 If `True` all the attributes in the payload not 

683 previously existing in the entity are appended. In addition 

684 to that, in case some of the attributes in the payload 

685 already exist in the entity, an error is returned. 

686 More precisely this means a strict append procedure. 

687 

688 Returns: 

689 None 

690 """ 

691 self.update_or_append_entity_attributes( 

692 entity_id=entity.id, 

693 entity_type=entity.type, 

694 attrs=entity.get_properties(), 

695 append_strict=append_strict, 

696 ) 

697 

698 def update_entity_relationships( 

699 self, entity: ContextEntity, append_strict: bool = False 

700 ): 

701 """ 

702 The request payload is an object representing only the attributes, of type 

703 Relationship, to append or update. 

704 

705 Note: 

706 Update means overwriting the existing entity. If you want to 

707 manipulate you should rather use patch_entity. 

708 

709 Args: 

710 entity (ContextEntity): 

711 append_strict: If `False` the entity attributes are updated (if they 

712 previously exist) or appended (if they don't previously exist) 

713 with the ones in the payload. 

714 If `True` all the attributes in the payload not 

715 previously existing in the entity are appended. In addition 

716 to that, in case some of the attributes in the payload 

717 already exist in the entity, an error is returned. 

718 More precisely this means a strict append procedure. 

719 

720 Returns: 

721 None 

722 """ 

723 self.update_or_append_entity_attributes( 

724 entity_id=entity.id, 

725 entity_type=entity.type, 

726 attrs=entity.get_relationships(), 

727 append_strict=append_strict, 

728 ) 

729 

730 def delete_entity( 

731 self, 

732 entity_id: str, 

733 entity_type: str = None, 

734 delete_devices: bool = False, 

735 iota_client: IoTAClient = None, 

736 iota_url: AnyHttpUrl = settings.IOTA_URL, 

737 ) -> None: 

738 """ 

739 Remove a entity from the context broker. No payload is required 

740 or received. 

741 

742 Args: 

743 entity_id: 

744 Id of the entity to be deleted 

745 entity_type: 

746 Entity type, to avoid ambiguity in case there are several 

747 entities with the same entity id. 

748 delete_devices: 

749 If True, also delete all devices that reference this 

750 entity (entity_id as entity_name) 

751 iota_client: 

752 Corresponding IoTA-Client used to access IoTA-Agent 

753 iota_url: 

754 URL of the corresponding IoT-Agent. This will autogenerate 

755 an IoTA-Client, mirroring the information of the 

756 ContextBrokerClient, e.g. FiwareHeader, and other headers 

757 

758 Returns: 

759 None 

760 """ 

761 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}") 

762 headers = self.headers.copy() 

763 if entity_type: 

764 params = {"type": entity_type} 

765 else: 

766 params = None 

767 try: 

768 res = self.delete(url=url, params=params, headers=headers) 

769 if res.ok: 

770 self.logger.info("Entity '%s' successfully deleted!", entity_id) 

771 else: 

772 res.raise_for_status() 

773 except requests.RequestException as err: 

774 msg = f"Could not delete entity {entity_id} !" 

775 raise BaseHttpClientException(message=msg, response=err.response) from err 

776 

777 if delete_devices: 

778 from filip.clients.ngsi_v2 import IoTAClient 

779 

780 if iota_client: 

781 iota_client_local = deepcopy(iota_client) 

782 else: 

783 warnings.warn( 

784 "No IoTA-Client object provided! " 

785 "Will try to generate one. " 

786 "This usage is not recommended." 

787 ) 

788 

789 iota_client_local = IoTAClient( 

790 url=iota_url, 

791 fiware_header=self.fiware_headers, 

792 headers=self.headers, 

793 ) 

794 

795 for device in iota_client_local.get_device_list(entity_names=[entity_id]): 

796 if entity_type: 

797 if device.entity_type == entity_type: 

798 iota_client_local.delete_device(device_id=device.device_id) 

799 else: 

800 iota_client_local.delete_device(device_id=device.device_id) 

801 iota_client_local.close() 

802 

803 def delete_entities(self, entities: List[ContextEntity]) -> None: 

804 """ 

805 Remove a list of entities from the context broker. This methode is 

806 more efficient than to call delete_entity() for each entity 

807 

808 Args: 

809 entities: List[ContextEntity]: List of entities to be deleted 

810 

811 Raises: 

812 Exception, if one of the entities is not in the ContextBroker 

813 

814 Returns: 

815 None 

816 """ 

817 

818 # update() delete, deletes all entities without attributes completely, 

819 # and removes the attributes for the other 

820 # The entities are sorted based on the fact if they have 

821 # attributes. 

822 limit = 1000 # max number of entities that will be deleted at once 

823 entities_with_attributes: List[ContextEntity] = [] 

824 for entity in entities: 

825 attribute_names = [ 

826 key 

827 for key in entity.model_dump() 

828 if key not in ContextEntity.model_fields 

829 ] 

830 if len(attribute_names) > 0: 

831 entities_with_attributes.append( 

832 ContextEntity(id=entity.id, type=entity.type) 

833 ) 

834 

835 # Post update_delete for those without attribute only once, 

836 # for the other post update_delete again but for the changed entity 

837 # in the ContextBroker (only id and type left) 

838 while len(entities) > 0: 

839 self.update(entities=entities[0:limit], action_type="delete") 

840 entities = entities[limit:] 

841 while len(entities_with_attributes) > 0: 

842 self.update( 

843 entities=entities_with_attributes[0:limit], action_type="delete" 

844 ) 

845 entities_with_attributes = entities_with_attributes[limit:] 

846 

847 def update_or_append_entity_attributes( 

848 self, 

849 entity_id: str, 

850 attrs: Union[ 

851 List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any] 

852 ], 

853 entity_type: str = None, 

854 append_strict: bool = False, 

855 forcedUpdate: bool = False, 

856 key_values: bool = False, 

857 ): 

858 """ 

859 The request payload is an object representing the attributes to 

860 append or update. This corresponds to a 'POST' request if append is 

861 set to 'False' 

862 

863 Note: 

864 Be careful not to update attributes that are 

865 provided via context registration, e.g. commands. Commands are 

866 removed before sending the request. To avoid breaking things. 

867 

868 Args: 

869 entity_id: Entity id to be updated 

870 entity_type: Entity type, to avoid ambiguity in case there are 

871 several entities with the same entity id. 

872 attrs: List of attributes to update or to append 

873 append_strict: If `False` the entity attributes are updated (if they 

874 previously exist) or appended (if they don't previously exist) 

875 with the ones in the payload. 

876 If `True` all the attributes in the payload not 

877 previously existing in the entity are appended. In addition 

878 to that, in case some of the attributes in the payload 

879 already exist in the entity, an error is returned. 

880 More precisely this means a strict append procedure. 

881 forcedUpdate: Update operation have to trigger any matching 

882 subscription, no matter if there is an actual attribute 

883 update or no instead of the default behavior, which is to 

884 updated only if attribute is effectively updated. 

885 key_values: By default False. If set to True, the payload uses 

886 the keyValues simplified entity representation, i.e. 

887 ContextEntityKeyValues. 

888 Returns: 

889 None 

890 

891 """ 

892 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs") 

893 headers = self.headers.copy() 

894 params = {} 

895 if entity_type: 

896 params.update({"type": entity_type}) 

897 else: 

898 entity_type = "dummy" 

899 

900 options = [] 

901 if append_strict: 

902 options.append("append") 

903 if forcedUpdate: 

904 options.append("forcedUpdate") 

905 if key_values: 

906 assert isinstance(attrs, dict), "for keyValues attrs has to be a dict" 

907 options.append("keyValues") 

908 if options: 

909 params.update({"options": ",".join(options)}) 

910 

911 if key_values: 

912 entity = ContextEntityKeyValues(id=entity_id, type=entity_type, **attrs) 

913 else: 

914 entity = ContextEntity(id=entity_id, type=entity_type) 

915 entity.add_attributes(attrs) 

916 # exclude commands from the send data, 

917 # as they live in the IoTA-agent 

918 excluded_keys = {"id", "type"} 

919 # excluded_keys.update( 

920 # entity.get_commands(response_format=PropertyFormat.DICT).keys() 

921 # ) 

922 try: 

923 res = self.post( 

924 url=url, 

925 headers=headers, 

926 json=entity.model_dump(exclude=excluded_keys, exclude_none=True), 

927 params=params, 

928 ) 

929 if res.ok: 

930 self.logger.info("Entity '%s' successfully " "updated!", entity.id) 

931 else: 

932 res.raise_for_status() 

933 except requests.RequestException as err: 

934 msg = f"Could not update or append attributes of entity" f" {entity.id} !" 

935 self.log_error(err=err, msg=msg) 

936 raise BaseHttpClientException(message=msg, response=err.response) from err 

937 

938 def update_existing_entity_attributes( 

939 self, 

940 entity_id: str, 

941 attrs: Union[ 

942 List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any] 

943 ], 

944 entity_type: str = None, 

945 forcedUpdate: bool = False, 

946 override_metadata: bool = False, 

947 key_values: bool = False, 

948 ): 

949 """ 

950 The entity attributes are updated with the ones in the payload. 

951 In addition to that, if one or more attributes in the payload doesn't 

952 exist in the entity, an error is returned. This corresponds to a 

953 'PATCH' request. 

954 

955 Args: 

956 entity_id: Entity id to be updated 

957 entity_type: Entity type, to avoid ambiguity in case there are 

958 several entities with the same entity id. 

959 attrs: List of attributes to update or to append 

960 forcedUpdate: Update operation have to trigger any matching 

961 subscription, no matter if there is an actual attribute 

962 update or no instead of the default behavior, which is to 

963 updated only if attribute is effectively updated. 

964 override_metadata: 

965 Bool,replace the existing metadata with the one provided in 

966 the request 

967 key_values: By default False. If set to True, the payload uses 

968 the keyValues simplified entity representation, i.e. 

969 ContextEntityKeyValues. 

970 Returns: 

971 None 

972 

973 """ 

974 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs") 

975 headers = self.headers.copy() 

976 if entity_type: 

977 params = {"type": entity_type} 

978 else: 

979 params = None 

980 entity_type = "dummy" 

981 

982 options = [] 

983 if override_metadata: 

984 options.append("overrideMetadata") 

985 if forcedUpdate: 

986 options.append("forcedUpdate") 

987 if key_values: 

988 assert isinstance(attrs, dict), "for keyValues the attrs must be dict" 

989 payload = attrs 

990 options.append("keyValues") 

991 else: 

992 entity = ContextEntity(id=entity_id, type=entity_type) 

993 entity.add_attributes(attrs) 

994 payload = entity.model_dump(exclude={"id", "type"}, exclude_none=True) 

995 if options: 

996 params.update({"options": ",".join(options)}) 

997 

998 try: 

999 res = self.patch( 

1000 url=url, 

1001 headers=headers, 

1002 json=payload, 

1003 params=params, 

1004 ) 

1005 if res.ok: 

1006 self.logger.info("Entity '%s' successfully " "updated!", entity_id) 

1007 else: 

1008 res.raise_for_status() 

1009 except requests.RequestException as err: 

1010 msg = f"Could not update attributes of entity" 

1011 raise BaseHttpClientException(message=msg, response=err.response) from err 

1012 

1013 def override_entity( 

1014 self, entity: Union[ContextEntity, ContextEntityKeyValues], **kwargs 

1015 ): 

1016 """ 

1017 The request payload is an object representing the attributes to 

1018 override the existing entity. 

1019 

1020 Note: 

1021 If you want to manipulate you should rather use patch_entity. 

1022 

1023 Args: 

1024 entity (ContextEntity or ContextEntityKeyValues): 

1025 Returns: 

1026 None 

1027 """ 

1028 return self.replace_entity_attributes( 

1029 entity_id=entity.id, 

1030 entity_type=entity.type, 

1031 attrs=entity.get_attributes(), 

1032 **kwargs, 

1033 ) 

1034 

1035 def replace_entity_attributes( 

1036 self, 

1037 entity_id: str, 

1038 attrs: Union[ 

1039 List[Union[NamedContextAttribute, Dict[str, ContextAttribute]]], Dict 

1040 ], 

1041 entity_type: str = None, 

1042 forcedUpdate: bool = False, 

1043 key_values: bool = False, 

1044 ): 

1045 """ 

1046 The attributes previously existing in the entity are removed and 

1047 replaced by the ones in the request. This corresponds to a 'PUT' 

1048 request. 

1049 

1050 Args: 

1051 entity_id: Entity id to be updated 

1052 entity_type: Entity type, to avoid ambiguity in case there are 

1053 several entities with the same entity id. 

1054 attrs: List of attributes to add to the entity or dict of 

1055 attributes in case of key_values=True. 

1056 forcedUpdate: Update operation have to trigger any matching 

1057 subscription, no matter if there is an actual attribute 

1058 update or no instead of the default behavior, which is to 

1059 updated only if attribute is effectively updated. 

1060 key_values(bool): 

1061 By default False. If set to True, "options=keyValues" will 

1062 be included in params of the request. The payload uses 

1063 the keyValues simplified entity representation, i.e. 

1064 ContextEntityKeyValues. 

1065 Returns: 

1066 None 

1067 """ 

1068 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs") 

1069 headers = self.headers.copy() 

1070 params = {} 

1071 options = [] 

1072 if entity_type: 

1073 params.update({"type": entity_type}) 

1074 else: 

1075 entity_type = "dummy" 

1076 

1077 if forcedUpdate: 

1078 options.append("forcedUpdate") 

1079 

1080 if key_values: 

1081 options.append("keyValues") 

1082 assert isinstance(attrs, dict) 

1083 else: 

1084 entity = ContextEntity(id=entity_id, type=entity_type) 

1085 entity.add_attributes(attrs) 

1086 attrs = entity.model_dump(exclude={"id", "type"}, exclude_none=True) 

1087 if options: 

1088 params.update({"options": ",".join(options)}) 

1089 

1090 try: 

1091 res = self.put( 

1092 url=url, 

1093 headers=headers, 

1094 json=attrs, 

1095 params=params, 

1096 ) 

1097 if res.ok: 

1098 self.logger.info("Entity '%s' successfully " "updated!", entity_id) 

1099 else: 

1100 res.raise_for_status() 

1101 except requests.RequestException as err: 

1102 msg = f"Could not replace attribute of entity {entity_id} !" 

1103 raise BaseHttpClientException(message=msg, response=err.response) from err 

1104 

1105 # Attribute operations 

1106 def get_attribute( 

1107 self, 

1108 entity_id: str, 

1109 attr_name: str, 

1110 entity_type: str = None, 

1111 metadata: str = None, 

1112 response_format="", 

1113 ) -> ContextAttribute: 

1114 """ 

1115 Retrieves a specified attribute from an entity. 

1116 

1117 Args: 

1118 entity_id: Id of the entity. Example: Bcn_Welt 

1119 attr_name: Name of the attribute to be retrieved. 

1120 entity_type (Optional): Type of the entity to retrieve 

1121 metadata (Optional): A list of metadata names to include in the 

1122 response. See "Filtering out attributes and metadata" section in https://fiware-orion.readthedocs.io/en/master/orion-api.html#filtering-out-attributes-and-metadata 

1123 for more detail. 

1124 

1125 Returns: 

1126 The content of the retrieved attribute as ContextAttribute 

1127 

1128 Raises: 

1129 Error 

1130 

1131 """ 

1132 url = urljoin( 

1133 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}" 

1134 ) 

1135 headers = self.headers.copy() 

1136 params = {} 

1137 if entity_type: 

1138 params.update({"type": entity_type}) 

1139 if metadata: 

1140 params.update({"metadata": ",".join(metadata)}) 

1141 try: 

1142 res = self.get(url=url, params=params, headers=headers) 

1143 if res.ok: 

1144 self.logger.debug("Received: %s", res.json()) 

1145 return ContextAttribute(**res.json()) 

1146 res.raise_for_status() 

1147 except requests.RequestException as err: 

1148 msg = ( 

1149 f"Could not load attribute '{attr_name}' from entity" f"'{entity_id}' " 

1150 ) 

1151 raise BaseHttpClientException(message=msg, response=err.response) from err 

1152 

1153 def update_entity_attribute( 

1154 self, 

1155 entity_id: str, 

1156 attr: Union[ContextAttribute, NamedContextAttribute], 

1157 *, 

1158 entity_type: str = None, 

1159 attr_name: str = None, 

1160 override_metadata: bool = True, 

1161 forcedUpdate: bool = False, 

1162 ): 

1163 """ 

1164 Updates a specified attribute from an entity. 

1165 

1166 Args: 

1167 attr: 

1168 context attribute to update 

1169 entity_id: 

1170 Id of the entity. Example: Bcn_Welt 

1171 entity_type: 

1172 Entity type, to avoid ambiguity in case there are 

1173 several entities with the same entity id. 

1174 forcedUpdate: Update operation have to trigger any matching 

1175 subscription, no matter if there is an actual attribute 

1176 update or no instead of the default behavior, which is to 

1177 updated only if attribute is effectively updated. 

1178 attr_name: 

1179 Name of the attribute to be updated. 

1180 override_metadata: 

1181 Bool, if set to `True` (default) the metadata will be 

1182 overwritten. This is for backwards compatibility reasons. 

1183 If `False` the metadata values will be either updated if 

1184 already existing or append if not. 

1185 See also: 

1186 https://fiware-orion.readthedocs.io/en/master/user/metadata.html 

1187 """ 

1188 headers = self.headers.copy() 

1189 if not isinstance(attr, NamedContextAttribute): 

1190 assert attr_name is not None, ( 

1191 "Missing name for attribute. " 

1192 "attr_name must be present if" 

1193 "attr is of type ContextAttribute" 

1194 ) 

1195 else: 

1196 assert attr_name is None, ( 

1197 "Invalid argument attr_name. Do not set " 

1198 "attr_name if attr is of type " 

1199 "NamedContextAttribute" 

1200 ) 

1201 attr_name = attr.name 

1202 

1203 url = urljoin( 

1204 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}" 

1205 ) 

1206 params = {} 

1207 if entity_type: 

1208 params.update({"type": entity_type}) 

1209 # set overrideMetadata option (we assure backwards compatibility here) 

1210 options = [] 

1211 if override_metadata: 

1212 options.append("overrideMetadata") 

1213 if forcedUpdate: 

1214 options.append("forcedUpdate") 

1215 if options: 

1216 params.update({"options": ",".join(options)}) 

1217 try: 

1218 res = self.put( 

1219 url=url, 

1220 headers=headers, 

1221 params=params, 

1222 json=attr.model_dump(exclude={"name"}, exclude_none=True), 

1223 ) 

1224 if res.ok: 

1225 self.logger.info( 

1226 "Attribute '%s' of '%s' " "successfully updated!", 

1227 attr_name, 

1228 entity_id, 

1229 ) 

1230 else: 

1231 res.raise_for_status() 

1232 except requests.RequestException as err: 

1233 msg = ( 

1234 f"Could not update attribute '{attr_name}' of entity" f"'{entity_id}' " 

1235 ) 

1236 raise BaseHttpClientException(message=msg, response=err.response) from err 

1237 

1238 def delete_entity_attribute( 

1239 self, entity_id: str, attr_name: str, entity_type: str = None 

1240 ) -> None: 

1241 """ 

1242 Removes a specified attribute from an entity. 

1243 

1244 Args: 

1245 entity_id: Id of the entity. 

1246 attr_name: Name of the attribute to be retrieved. 

1247 entity_type: Entity type, to avoid ambiguity in case there are 

1248 several entities with the same entity id. 

1249 Raises: 

1250 Error 

1251 

1252 """ 

1253 url = urljoin( 

1254 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}" 

1255 ) 

1256 headers = self.headers.copy() 

1257 params = {} 

1258 if entity_type: 

1259 params.update({"type": entity_type}) 

1260 try: 

1261 res = self.delete(url=url, headers=headers) 

1262 if res.ok: 

1263 self.logger.info( 

1264 "Attribute '%s' of '%s' " "successfully deleted!", 

1265 attr_name, 

1266 entity_id, 

1267 ) 

1268 else: 

1269 res.raise_for_status() 

1270 except requests.RequestException as err: 

1271 msg = f"Could not delete attribute '{attr_name}' of entity '{entity_id}'" 

1272 raise BaseHttpClientException(message=msg, response=err.response) from err 

1273 

1274 # Attribute value operations 

1275 def get_attribute_value( 

1276 self, entity_id: str, attr_name: str, entity_type: str = None 

1277 ) -> Any: 

1278 """ 

1279 This operation returns the value property with the value of the 

1280 attribute. 

1281 

1282 Args: 

1283 entity_id: Id of the entity. Example: Bcn_Welt 

1284 attr_name: Name of the attribute to be retrieved. 

1285 Example: temperature. 

1286 entity_type: Entity type, to avoid ambiguity in case there are 

1287 several entities with the same entity id. 

1288 

1289 Returns: 

1290 

1291 """ 

1292 url = urljoin( 

1293 self.base_url, 

1294 f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value", 

1295 ) 

1296 headers = self.headers.copy() 

1297 params = {} 

1298 if entity_type: 

1299 params.update({"type": entity_type}) 

1300 try: 

1301 res = self.get(url=url, params=params, headers=headers) 

1302 if res.ok: 

1303 self.logger.debug("Received: %s", res.json()) 

1304 return res.json() 

1305 res.raise_for_status() 

1306 except requests.RequestException as err: 

1307 msg = ( 

1308 f"Could not load value of attribute '{attr_name}' from " 

1309 f"entity'{entity_id}' " 

1310 ) 

1311 raise BaseHttpClientException(message=msg, response=err.response) from err 

1312 

1313 def update_attribute_value( 

1314 self, 

1315 *, 

1316 entity_id: str, 

1317 attr_name: str, 

1318 value: Any, 

1319 entity_type: str = None, 

1320 forcedUpdate: bool = False, 

1321 ): 

1322 """ 

1323 Updates the value of a specified attribute of an entity 

1324 

1325 Args: 

1326 value: update value 

1327 entity_id: Id of the entity. Example: Bcn_Welt 

1328 attr_name: Name of the attribute to be retrieved. 

1329 Example: temperature. 

1330 entity_type: Entity type, to avoid ambiguity in case there are 

1331 several entities with the same entity id. 

1332 forcedUpdate: Update operation have to trigger any matching 

1333 subscription, no matter if there is an actual attribute 

1334 update or no instead of the default behavior, which is to 

1335 updated only if attribute is effectively updated. 

1336 Returns: 

1337 

1338 """ 

1339 url = urljoin( 

1340 self.base_url, 

1341 f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value", 

1342 ) 

1343 headers = self.headers.copy() 

1344 params = {} 

1345 if entity_type: 

1346 params.update({"type": entity_type}) 

1347 options = [] 

1348 if forcedUpdate: 

1349 options.append("forcedUpdate") 

1350 if options: 

1351 params.update({"options": ",".join(options)}) 

1352 try: 

1353 if not isinstance(value, (dict, list)): 

1354 headers.update({"Content-Type": "text/plain"}) 

1355 if isinstance(value, str): 

1356 value = f"{value}" 

1357 res = self.put(url=url, headers=headers, json=value, params=params) 

1358 else: 

1359 res = self.put(url=url, headers=headers, json=value, params=params) 

1360 if res.ok: 

1361 self.logger.info( 

1362 "Attribute '%s' of '%s' " "successfully updated!", 

1363 attr_name, 

1364 entity_id, 

1365 ) 

1366 else: 

1367 res.raise_for_status() 

1368 except requests.RequestException as err: 

1369 msg = ( 

1370 f"Could not update value of attribute '{attr_name}' from " 

1371 f"entity '{entity_id}' " 

1372 ) 

1373 raise BaseHttpClientException(message=msg, response=err.response) from err 

1374 

1375 # Types Operations 

1376 def get_entity_types( 

1377 self, *, limit: int = None, offset: int = None, options: str = None 

1378 ) -> List[Dict[str, Any]]: 

1379 """ 

1380 

1381 Args: 

1382 limit: Limit the number of types to be retrieved. 

1383 offset: Skip a number of records. 

1384 options: Options dictionary. Allowed: count, values 

1385 

1386 Returns: 

1387 

1388 """ 

1389 url = urljoin(self.base_url, f"{self._url_version}/types") 

1390 headers = self.headers.copy() 

1391 params = {} 

1392 if limit: 

1393 params.update({"limit": limit}) 

1394 if offset: 

1395 params.update({"offset": offset}) 

1396 if options: 

1397 params.update({"options": options}) 

1398 try: 

1399 res = self.get(url=url, params=params, headers=headers) 

1400 if res.ok: 

1401 self.logger.debug("Received: %s", res.json()) 

1402 return res.json() 

1403 res.raise_for_status() 

1404 except requests.RequestException as err: 

1405 msg = "Could not load entity types!" 

1406 raise BaseHttpClientException(message=msg, response=err.response) from err 

1407 

1408 def get_entity_type(self, entity_type: str) -> Dict[str, Any]: 

1409 """ 

1410 

1411 Args: 

1412 entity_type: Entity Type. Example: Room 

1413 

1414 Returns: 

1415 

1416 """ 

1417 url = urljoin(self.base_url, f"{self._url_version}/types/{entity_type}") 

1418 headers = self.headers.copy() 

1419 params = {} 

1420 try: 

1421 res = self.get(url=url, params=params, headers=headers) 

1422 if res.ok: 

1423 self.logger.debug("Received: %s", res.json()) 

1424 return res.json() 

1425 res.raise_for_status() 

1426 except requests.RequestException as err: 

1427 msg = f"Could not load entities of type" f"'{entity_type}' " 

1428 raise BaseHttpClientException(message=msg, response=err.response) from err 

1429 

1430 # SUBSCRIPTION API ENDPOINTS 

1431 def get_subscription_list(self, limit: PositiveInt = inf) -> List[Subscription]: 

1432 """ 

1433 Returns a list of all the subscriptions present in the system. 

1434 Args: 

1435 limit: Limit the number of subscriptions to be retrieved 

1436 Returns: 

1437 list of subscriptions 

1438 """ 

1439 url = urljoin(self.base_url, f"{self._url_version}/subscriptions/") 

1440 headers = self.headers.copy() 

1441 params = {} 

1442 

1443 # We always use the 'count' option to check weather pagination is 

1444 # required 

1445 params.update({"options": "count"}) 

1446 try: 

1447 items = self.__pagination( 

1448 limit=limit, url=url, params=params, headers=headers 

1449 ) 

1450 adapter = TypeAdapter(List[Subscription]) 

1451 return adapter.validate_python(items) 

1452 except requests.RequestException as err: 

1453 msg = "Could not load subscriptions!" 

1454 raise BaseHttpClientException(message=msg, response=err.response) from err 

1455 

1456 def post_subscription( 

1457 self, 

1458 subscription: Subscription, 

1459 update: bool = False, 

1460 skip_initial_notification: bool = False, 

1461 ) -> str: 

1462 """ 

1463 Creates a new subscription. The subscription is represented by a 

1464 Subscription object defined in filip.cb.models. 

1465 

1466 If the subscription already exists, the adding is prevented and the id 

1467 of the existing subscription is returned. 

1468 

1469 A subscription is deemed as already existing if there exists a 

1470 subscription with the exact same subject and notification fields. All 

1471 optional fields are not considered. 

1472 

1473 Args: 

1474 subscription: Subscription 

1475 update: True - If the subscription already exists, update it 

1476 False- If the subscription already exists, throw warning 

1477 skip_initial_notification: True - Initial Notifications will be 

1478 sent to recipient containing the whole data. This is 

1479 deprecated and removed from version 3.0 of the context broker. 

1480 False - skip the initial notification 

1481 Returns: 

1482 str: Id of the (created) subscription 

1483 

1484 """ 

1485 existing_subscriptions = self.get_subscription_list() 

1486 

1487 sub_dict = subscription.model_dump( 

1488 include={"subject", "notification"}, 

1489 exclude={ 

1490 "notification": { 

1491 "lastSuccess": True, 

1492 "lastFailure": True, 

1493 "lastSuccessCode": True, 

1494 "lastFailureReason": True, 

1495 "mqtt": {"passwd"}, 

1496 "mqttCustom": {"passwd"}, 

1497 } 

1498 }, 

1499 ) 

1500 

1501 for ex_sub in existing_subscriptions: 

1502 if self._subscription_dicts_are_equal( 

1503 sub_dict, 

1504 ex_sub.model_dump( 

1505 include={"subject", "notification"}, 

1506 exclude={ 

1507 "notification": { 

1508 "lastSuccess": True, 

1509 "lastFailure": True, 

1510 "lastSuccessCode": True, 

1511 "lastFailureReason": True, 

1512 "mqtt": {"passwd"}, 

1513 "mqttCustom": {"passwd"}, 

1514 } 

1515 }, 

1516 ), 

1517 ): 

1518 self.logger.info("Subscription already exists") 

1519 if update: 

1520 self.logger.info("Updated subscription") 

1521 subscription.id = ex_sub.id 

1522 self.update_subscription(subscription) 

1523 else: 

1524 warnings.warn( 

1525 f"Subscription existed already with the id" f" {ex_sub.id}" 

1526 ) 

1527 return ex_sub.id 

1528 

1529 params = {} 

1530 if skip_initial_notification: 

1531 version = self.get_version()["orion"]["version"] 

1532 if parse_version(version) <= parse_version("3.1"): 

1533 params.update({"options": "skipInitialNotification"}) 

1534 else: 

1535 pass 

1536 warnings.warn( 

1537 f"Skip initial notifications is a deprecated " 

1538 f"feature of older versions <=3.1 of the context " 

1539 f"broker. The Context Broker that you requesting has " 

1540 f"version: {version}. For newer versions we " 

1541 f"automatically skip this option. Consider " 

1542 f"refactoring and updating your services", 

1543 DeprecationWarning, 

1544 ) 

1545 

1546 url = urljoin(self.base_url, "v2/subscriptions") 

1547 headers = self.headers.copy() 

1548 headers.update({"Content-Type": "application/json"}) 

1549 try: 

1550 res = self.post( 

1551 url=url, 

1552 headers=headers, 

1553 data=subscription.model_dump_json( 

1554 exclude={ 

1555 "id": True, 

1556 "notification": { 

1557 "lastSuccess", 

1558 "lastFailure", 

1559 "lastSuccessCode", 

1560 "lastFailureReason", 

1561 }, 

1562 }, 

1563 exclude_none=True, 

1564 ), 

1565 params=params, 

1566 ) 

1567 if res.ok: 

1568 self.logger.info("Subscription successfully created!") 

1569 return res.headers["Location"].split("/")[-1] 

1570 res.raise_for_status() 

1571 except requests.RequestException as err: 

1572 msg = "Could not send subscription!" 

1573 raise BaseHttpClientException(message=msg, response=err.response) from err 

1574 

1575 def get_subscription(self, subscription_id: str) -> Subscription: 

1576 """ 

1577 Retrieves a subscription from 

1578 Args: 

1579 subscription_id: id of the subscription 

1580 

1581 Returns: 

1582 

1583 """ 

1584 url = urljoin( 

1585 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}" 

1586 ) 

1587 headers = self.headers.copy() 

1588 try: 

1589 res = self.get(url=url, headers=headers) 

1590 if res.ok: 

1591 self.logger.debug("Received: %s", res.json()) 

1592 return Subscription(**res.json()) 

1593 res.raise_for_status() 

1594 except requests.RequestException as err: 

1595 msg = f"Could not load subscription {subscription_id}" 

1596 raise BaseHttpClientException(message=msg, response=err.response) from err 

1597 

1598 def update_subscription( 

1599 self, subscription: Subscription, skip_initial_notification: bool = False 

1600 ): 

1601 """ 

1602 Only the fields included in the request are updated in the subscription. 

1603 

1604 Args: 

1605 subscription: Subscription to update 

1606 skip_initial_notification: True - Initial Notifications will be 

1607 sent to recipient containing the whole data. This is 

1608 deprecated and removed from version 3.0 of the context broker. 

1609 False - skip the initial notification 

1610 

1611 Returns: 

1612 None 

1613 """ 

1614 params = {} 

1615 if skip_initial_notification: 

1616 version = self.get_version()["orion"]["version"] 

1617 if parse_version(version) <= parse_version("3.1"): 

1618 params.update({"options": "skipInitialNotification"}) 

1619 else: 

1620 pass 

1621 warnings.warn( 

1622 f"Skip initial notifications is a deprecated " 

1623 f"feature of older versions <3.1 of the context " 

1624 f"broker. The Context Broker that you requesting has " 

1625 f"version: {version}. For newer versions we " 

1626 f"automatically skip this option. Consider " 

1627 f"refactoring and updating your services", 

1628 DeprecationWarning, 

1629 ) 

1630 

1631 url = urljoin( 

1632 self.base_url, f"{self._url_version}/subscriptions/{subscription.id}" 

1633 ) 

1634 headers = self.headers.copy() 

1635 headers.update({"Content-Type": "application/json"}) 

1636 try: 

1637 res = self.patch( 

1638 url=url, 

1639 headers=headers, 

1640 data=subscription.model_dump_json( 

1641 exclude={ 

1642 "id": True, 

1643 "notification": { 

1644 "lastSuccess", 

1645 "lastFailure", 

1646 "lastSuccessCode", 

1647 "lastFailureReason", 

1648 }, 

1649 }, 

1650 exclude_none=True, 

1651 ), 

1652 ) 

1653 if res.ok: 

1654 self.logger.info("Subscription successfully updated!") 

1655 else: 

1656 res.raise_for_status() 

1657 except requests.RequestException as err: 

1658 msg = f"Could not update subscription {subscription.id}" 

1659 raise BaseHttpClientException(message=msg, response=err.response) from err 

1660 

1661 def delete_subscription(self, subscription_id: str) -> None: 

1662 """ 

1663 Deletes a subscription from a Context Broker 

1664 Args: 

1665 subscription_id: id of the subscription 

1666 """ 

1667 url = urljoin( 

1668 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}" 

1669 ) 

1670 headers = self.headers.copy() 

1671 try: 

1672 res = self.delete(url=url, headers=headers) 

1673 if res.ok: 

1674 self.logger.info( 

1675 f"Subscription '{subscription_id}' " f"successfully deleted!" 

1676 ) 

1677 else: 

1678 res.raise_for_status() 

1679 except requests.RequestException as err: 

1680 msg = f"Could not delete subscription {subscription_id}" 

1681 raise BaseHttpClientException(message=msg, response=err.response) from err 

1682 

1683 # Registration API 

1684 def get_registration_list(self, *, limit: PositiveInt = None) -> List[Registration]: 

1685 """ 

1686 Lists all the context provider registrations present in the system. 

1687 

1688 Args: 

1689 limit: Limit the number of registrations to be retrieved 

1690 Returns: 

1691 

1692 """ 

1693 url = urljoin(self.base_url, f"{self._url_version}/registrations/") 

1694 headers = self.headers.copy() 

1695 params = {} 

1696 

1697 # We always use the 'count' option to check weather pagination is 

1698 # required 

1699 params.update({"options": "count"}) 

1700 try: 

1701 items = self.__pagination( 

1702 limit=limit, url=url, params=params, headers=headers 

1703 ) 

1704 adapter = TypeAdapter(List[Registration]) 

1705 return adapter.validate_python(items) 

1706 except requests.RequestException as err: 

1707 msg = "Could not load registrations!" 

1708 raise BaseHttpClientException(message=msg, response=err.response) from err 

1709 

1710 def post_registration(self, registration: Registration): 

1711 """ 

1712 Creates a new context provider registration. This is typically used 

1713 for binding context sources as providers of certain data. The 

1714 registration is represented by cb.models.Registration 

1715 

1716 Args: 

1717 registration (Registration): 

1718 

1719 Returns: 

1720 

1721 """ 

1722 url = urljoin(self.base_url, f"{self._url_version}/registrations") 

1723 headers = self.headers.copy() 

1724 headers.update({"Content-Type": "application/json"}) 

1725 try: 

1726 res = self.post( 

1727 url=url, 

1728 headers=headers, 

1729 data=registration.model_dump_json(exclude={"id"}, exclude_none=True), 

1730 ) 

1731 if res.ok: 

1732 self.logger.info("Registration successfully created!") 

1733 return res.headers["Location"].split("/")[-1] 

1734 res.raise_for_status() 

1735 except requests.RequestException as err: 

1736 msg = f"Could not send registration {registration.id}!" 

1737 raise BaseHttpClientException(message=msg, response=err.response) from err 

1738 

1739 def get_registration(self, registration_id: str) -> Registration: 

1740 """ 

1741 Retrieves a registration from context broker by id 

1742 

1743 Args: 

1744 registration_id: id of the registration 

1745 

1746 Returns: 

1747 Registration 

1748 """ 

1749 url = urljoin( 

1750 self.base_url, f"{self._url_version}/registrations/{registration_id}" 

1751 ) 

1752 headers = self.headers.copy() 

1753 try: 

1754 res = self.get(url=url, headers=headers) 

1755 if res.ok: 

1756 self.logger.debug("Received: %s", res.json()) 

1757 return Registration(**res.json()) 

1758 res.raise_for_status() 

1759 except requests.RequestException as err: 

1760 msg = f"Could not load registration {registration_id} !" 

1761 raise BaseHttpClientException(message=msg, response=err.response) from err 

1762 

1763 def add_valid_relationships( 

1764 self, entities: List[ContextEntity] 

1765 ) -> List[ContextEntity]: 

1766 """ 

1767 Validate all attributes in the given entities. If the attribute value points to 

1768 an existing entity, it is assumed that this attribute is a relationship, and it 

1769 will be assigned with the attribute type "relationship" 

1770 

1771 Args: 

1772 entities: list of entities that need to be validated. 

1773 

1774 Returns: 

1775 updated entities 

1776 """ 

1777 updated_entities = [] 

1778 for entity in entities[:]: 

1779 for attr_name, attr_value in entity.model_dump( 

1780 exclude={"id", "type"} 

1781 ).items(): 

1782 if isinstance(attr_value, dict): 

1783 if self.validate_relationship(attr_value): 

1784 entity.update_attribute( 

1785 { 

1786 attr_name: ContextAttribute( 

1787 **{ 

1788 "type": DataType.RELATIONSHIP, 

1789 "value": attr_value.get("value"), 

1790 } 

1791 ) 

1792 } 

1793 ) 

1794 updated_entities.append(entity) 

1795 return updated_entities 

1796 

1797 def remove_invalid_relationships( 

1798 self, entities: List[ContextEntity], hard_remove: bool = True 

1799 ) -> List[ContextEntity]: 

1800 """ 

1801 Removes invalid relationships from the entities. An invalid relationship 

1802 is a relationship that has no destination entity. 

1803 

1804 Args: 

1805 entities: list of entities that need to be validated. 

1806 hard_remove: If True, invalid relationships will be deleted. 

1807 If False, invalid relationships will be changed to Text 

1808 attributes. 

1809 

1810 Returns: 

1811 updated entities 

1812 """ 

1813 updated_entities = [] 

1814 for entity in entities[:]: 

1815 for relationship in entity.get_relationships(): 

1816 if not self.validate_relationship(relationship): 

1817 if hard_remove: 

1818 entity.delete_attributes(attrs=[relationship]) 

1819 else: 

1820 # change the attribute type to "Text" 

1821 entity.update_attribute( 

1822 attrs=[ 

1823 NamedContextAttribute( 

1824 name=relationship.name, 

1825 type=DataType.TEXT, 

1826 value=relationship.value, 

1827 ) 

1828 ] 

1829 ) 

1830 updated_entities.append(entity) 

1831 return updated_entities 

1832 

1833 def validate_relationship( 

1834 self, relationship: Union[NamedContextAttribute, ContextAttribute, Dict] 

1835 ) -> bool: 

1836 """ 

1837 Validates a relationship. A relationship is valid if it points to an existing 

1838 entity. Otherwise, it is considered invalid 

1839 

1840 Args: 

1841 relationship: relationship to validate 

1842 Returns 

1843 True if the relationship is valid, False otherwise 

1844 """ 

1845 if isinstance(relationship, NamedContextAttribute) or isinstance( 

1846 relationship, ContextAttribute 

1847 ): 

1848 destination_id = relationship.value 

1849 elif isinstance(relationship, dict): 

1850 _sentinel = object() 

1851 destination_id = relationship.get("value", _sentinel) 

1852 if destination_id is _sentinel: 

1853 raise ValueError( 

1854 "Invalid relationship dictionary format\n" 

1855 "Expected format: {" 

1856 f'"type": "{DataType.RELATIONSHIP.value}", ' 

1857 '"value" "entity_id"}' 

1858 ) 

1859 else: 

1860 raise ValueError("Invalid relationship type.") 

1861 try: 

1862 destination_entity = self.get_entity(entity_id=destination_id) 

1863 return destination_entity.id == destination_id 

1864 except requests.RequestException as err: 

1865 if err.response.status_code == 404: 

1866 return False 

1867 

1868 def update_registration(self, registration: Registration): 

1869 """ 

1870 Only the fields included in the request are updated in the registration. 

1871 

1872 Args: 

1873 registration: Registration to update 

1874 Returns: 

1875 

1876 """ 

1877 url = urljoin( 

1878 self.base_url, f"{self._url_version}/registrations/{registration.id}" 

1879 ) 

1880 headers = self.headers.copy() 

1881 headers.update({"Content-Type": "application/json"}) 

1882 try: 

1883 res = self.patch( 

1884 url=url, 

1885 headers=headers, 

1886 data=registration.model_dump_json(exclude={"id"}, exclude_none=True), 

1887 ) 

1888 if res.ok: 

1889 self.logger.info("Registration successfully updated!") 

1890 else: 

1891 res.raise_for_status() 

1892 except requests.RequestException as err: 

1893 msg = f"Could not update registration {registration.id} !" 

1894 raise BaseHttpClientException(message=msg, response=err.response) from err 

1895 

1896 def delete_registration(self, registration_id: str) -> None: 

1897 """ 

1898 Deletes a subscription from a Context Broker 

1899 Args: 

1900 registration_id: id of the subscription 

1901 """ 

1902 url = urljoin( 

1903 self.base_url, f"{self._url_version}/registrations/{registration_id}" 

1904 ) 

1905 headers = self.headers.copy() 

1906 try: 

1907 res = self.delete(url=url, headers=headers) 

1908 if res.ok: 

1909 self.logger.info( 

1910 "Registration '%s' " "successfully deleted!", registration_id 

1911 ) 

1912 res.raise_for_status() 

1913 except requests.RequestException as err: 

1914 msg = f"Could not delete registration {registration_id} !" 

1915 raise BaseHttpClientException(message=msg, response=err.response) from err 

1916 

1917 # Batch operation API 

1918 def update( 

1919 self, 

1920 *, 

1921 entities: List[Union[ContextEntity, ContextEntityKeyValues]], 

1922 action_type: Union[ActionType, str], 

1923 update_format: str = None, 

1924 forcedUpdate: bool = False, 

1925 override_metadata: bool = False, 

1926 ) -> None: 

1927 """ 

1928 This operation allows to create, update and/or delete several entities 

1929 in a single batch operation. 

1930 

1931 This operation is split in as many individual operations as entities 

1932 in the entities vector, so the actionType is executed for each one of 

1933 them. Depending on the actionType, a mapping with regular non-batch 

1934 operations can be done: 

1935 

1936 append: maps to POST /v2/entities (if the entity does not already exist) 

1937 or POST /v2/entities/<id>/attrs (if the entity already exists). 

1938 

1939 appendStrict: maps to POST /v2/entities (if the entity does not 

1940 already exist) or POST /v2/entities/<id>/attrs?options=append (if the 

1941 entity already exists). 

1942 

1943 update: maps to PATCH /v2/entities/<id>/attrs. 

1944 

1945 delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every 

1946 attribute included in the entity or to DELETE /v2/entities/<id> if 

1947 no attribute were included in the entity. 

1948 

1949 replace: maps to PUT /v2/entities/<id>/attrs. 

1950 

1951 Args: 

1952 entities: "an array of entities, each entity specified using the " 

1953 "JSON entity representation format " 

1954 action_type (Update): "actionType, to specify the kind of update 

1955 action to do: either append, appendStrict, update, delete, 

1956 or replace. " 

1957 update_format (str): Optional 'keyValues' 

1958 forcedUpdate: Update operation have to trigger any matching 

1959 subscription, no matter if there is an actual attribute 

1960 update or no instead of the default behavior, which is to 

1961 updated only if attribute is effectively updated. 

1962 override_metadata: 

1963 Bool, replace the existing metadata with the one provided in 

1964 the request 

1965 Returns: 

1966 

1967 """ 

1968 

1969 url = urljoin(self.base_url, f"{self._url_version}/op/update") 

1970 headers = self.headers.copy() 

1971 headers.update({"Content-Type": "application/json"}) 

1972 params = {} 

1973 options = [] 

1974 if override_metadata: 

1975 options.append("overrideMetadata") 

1976 if forcedUpdate: 

1977 options.append("forcedUpdate") 

1978 if update_format: 

1979 assert ( 

1980 update_format == AttrsFormat.KEY_VALUES.value 

1981 ), "Only 'keyValues' is allowed as update format" 

1982 options.append("keyValues") 

1983 if options: 

1984 params.update({"options": ",".join(options)}) 

1985 update = Update(actionType=action_type, entities=entities) 

1986 try: 

1987 res = self.post( 

1988 url=url, 

1989 headers=headers, 

1990 params=params, 

1991 json=update.model_dump(by_alias=True), 

1992 ) 

1993 if res.ok: 

1994 self.logger.info("Update operation '%s' succeeded!", action_type) 

1995 else: 

1996 res.raise_for_status() 

1997 except requests.RequestException as err: 

1998 msg = f"Update operation '{action_type}' failed!" 

1999 raise BaseHttpClientException(message=msg, response=err.response) from err 

2000 

2001 def query( 

2002 self, 

2003 *, 

2004 query: Query, 

2005 limit: PositiveInt = None, 

2006 order_by: str = None, 

2007 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

2008 ) -> List[Any]: 

2009 """ 

2010 Generate api query 

2011 Args: 

2012 query (Query): 

2013 limit (PositiveInt): 

2014 order_by (str): 

2015 response_format (AttrsFormat, str): 

2016 Returns: 

2017 The response payload is an Array containing one object per matching 

2018 entity, or an empty array [] if no entities are found. The entities 

2019 follow the JSON entity representation format (described in the 

2020 section "JSON Entity Representation"). 

2021 """ 

2022 url = urljoin(self.base_url, f"{self._url_version}/op/query") 

2023 headers = self.headers.copy() 

2024 headers.update({"Content-Type": "application/json"}) 

2025 params = {"options": "count"} 

2026 

2027 if response_format: 

2028 if response_format not in list(AttrsFormat): 

2029 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

2030 params["options"] = ",".join([response_format, "count"]) 

2031 try: 

2032 items = self.__pagination( 

2033 method=PaginationMethod.POST, 

2034 url=url, 

2035 headers=headers, 

2036 params=params, 

2037 data=query.model_dump_json(exclude_none=True), 

2038 limit=limit, 

2039 ) 

2040 if response_format == AttrsFormat.NORMALIZED: 

2041 adapter = TypeAdapter(List[ContextEntity]) 

2042 return adapter.validate_python(items) 

2043 if response_format == AttrsFormat.KEY_VALUES: 

2044 adapter = TypeAdapter(List[ContextEntityKeyValues]) 

2045 return adapter.validate_python(items) 

2046 return items 

2047 except requests.RequestException as err: 

2048 msg = "Query operation failed!" 

2049 raise BaseHttpClientException(message=msg, response=err.response) from err 

2050 

2051 def notify(self, message: Message) -> None: 

2052 """ 

2053 This operation is intended to consume a notification payload so that 

2054 all the entity data included by such notification is persisted, 

2055 overwriting if necessary. This operation is useful when one NGSIv2 

2056 endpoint is subscribed to another NGSIv2 endpoint (federation 

2057 scenarios). The request payload must be an NGSIv2 notification 

2058 payload. The behaviour must be exactly the same as 'update' 

2059 with 'action_type' equal to append. 

2060 

2061 Args: 

2062 message: Notification message 

2063 

2064 Returns: 

2065 None 

2066 """ 

2067 url = urljoin(self.base_url, "v2/op/notify") 

2068 headers = self.headers.copy() 

2069 headers.update({"Content-Type": "application/json"}) 

2070 params = {} 

2071 try: 

2072 res = self.post( 

2073 url=url, 

2074 headers=headers, 

2075 params=params, 

2076 data=message.model_dump_json(by_alias=True), 

2077 ) 

2078 if res.ok: 

2079 self.logger.info("Notification message sent!") 

2080 else: 

2081 res.raise_for_status() 

2082 except requests.RequestException as err: 

2083 msg = ( 

2084 f"Sending notifcation message failed! \n " 

2085 f"{message.model_dump_json(indent=2)}" 

2086 ) 

2087 raise BaseHttpClientException(message=msg, response=err.response) from err 

2088 

2089 def post_command( 

2090 self, 

2091 *, 

2092 entity_id: str, 

2093 command: Union[Command, NamedCommand, Dict], 

2094 entity_type: str = None, 

2095 command_name: str = None, 

2096 ) -> None: 

2097 """ 

2098 Post a command to a context entity this corresponds to 'PATCH' of the 

2099 specified command attribute. 

2100 

2101 Args: 

2102 entity_id: Entity identifier 

2103 command: Command 

2104 entity_type: Entity type 

2105 command_name: Name of the command in the entity 

2106 

2107 Returns: 

2108 None 

2109 """ 

2110 if command_name: 

2111 assert isinstance(command, (Command, dict)) 

2112 if isinstance(command, dict): 

2113 command = Command(**command) 

2114 command = {command_name: command.model_dump()} 

2115 else: 

2116 assert isinstance(command, (NamedCommand, dict)) 

2117 if isinstance(command, dict): 

2118 command = NamedCommand(**command) 

2119 

2120 self.update_existing_entity_attributes( 

2121 entity_id=entity_id, entity_type=entity_type, attrs=[command] 

2122 ) 

2123 

2124 def does_entity_exist(self, entity_id: str, entity_type: str) -> bool: 

2125 """ 

2126 Test if an entity with given id and type is present in the CB 

2127 

2128 Args: 

2129 entity_id: Entity id 

2130 entity_type: Entity type 

2131 

2132 Returns: 

2133 bool; True if entity exists 

2134 

2135 Raises: 

2136 RequestException, if any error occurs (e.g: No Connection), 

2137 except that the entity is not found 

2138 """ 

2139 url = urljoin(self.base_url, f"v2/entities/{entity_id}") 

2140 headers = self.headers.copy() 

2141 params = {"type": entity_type} 

2142 

2143 try: 

2144 res = self.get(url=url, params=params, headers=headers) 

2145 if res.ok: 

2146 return True 

2147 res.raise_for_status() 

2148 except requests.RequestException as err: 

2149 if err.response is None or not err.response.status_code == 404: 

2150 self.log_error(err=err, msg="Checking entity existence failed!") 

2151 raise 

2152 return False 

2153 

2154 def patch_entity( 

2155 self, 

2156 entity: Union[ContextEntity, ContextEntityKeyValues], 

2157 key_values: bool = False, 

2158 forcedUpdate: bool = False, 

2159 override_metadata: bool = False, 

2160 ) -> None: 

2161 """ 

2162 Takes a given entity and updates the state in the CB to match it. 

2163 It is an extended equivalent to the HTTP method PATCH, which applies 

2164 partial modifications to a resource. 

2165 

2166 Args: 

2167 entity: Entity to update 

2168 key_values: If True, the entity is updated in key-values format. 

2169 forcedUpdate: Update operation have to trigger any matching 

2170 subscription, no matter if there is an actual attribute 

2171 update or no instead of the default behavior, which is to 

2172 updated only if attribute is effectively updated. 

2173 override_metadata: If True, the existing metadata of the entity 

2174 is replaced 

2175 Returns: 

2176 None 

2177 """ 

2178 attributes = entity.get_attributes() 

2179 

2180 self.update_existing_entity_attributes( 

2181 entity_id=entity.id, 

2182 entity_type=entity.type, 

2183 attrs=attributes, 

2184 key_values=key_values, 

2185 forcedUpdate=forcedUpdate, 

2186 override_metadata=override_metadata, 

2187 ) 

2188 

2189 @staticmethod 

2190 def compare_lists_ignore_order(list_a, list_b): 

2191 """ 

2192 Compares two lists ignoring order. 

2193 Handles unhashable types like dictionaries and mixed types. 

2194 """ 

2195 # 1. Quick check: if lengths differ, they are not equal 

2196 if len(list_a) != len(list_b): 

2197 return False 

2198 

2199 # 2. Define a helper to create a comparable string signature for any item 

2200 def get_canonical_key(item): 

2201 # For lists, we recursively sort them so nested lists are also order-independent 

2202 if isinstance(item, list): 

2203 return json.dumps(sorted(item, key=lambda x: get_canonical_key(x))) 

2204 

2205 # For everything else (int, str, dict, obj), just use the string representation 

2206 return str(item) 

2207 

2208 # 3. Sort both lists using the canonical key and compare 

2209 return sorted(list_a, key=get_canonical_key) == sorted( 

2210 list_b, key=get_canonical_key 

2211 ) 

2212 

2213 def _subscription_dicts_are_equal(self, first: dict, second: dict): 

2214 """ 

2215 Check if two dictionaries and all sub-dictionaries are equal. 

2216 Logs a warning if the keys are not equal, but ignores the 

2217 comparison of such keys. 

2218 

2219 Args: 

2220 first dict: Dictionary of first subscription 

2221 second dict: Dictionary of second subscription 

2222 

2223 Returns: 

2224 True if equal, else False 

2225 """ 

2226 

2227 def _value_is_not_none(value): 

2228 """ 

2229 Recursive function to check if a value equals none. 

2230 If the value is a dict and any value of the dict is not none, 

2231 the value is not none. 

2232 If the value is a list and any item is not none, the value is not none. 

2233 If it's neither dict nore list, bool is used. 

2234 """ 

2235 if isinstance(value, dict): 

2236 return any([_value_is_not_none(value=_v) for _v in value.values()]) 

2237 if isinstance(value, list): 

2238 return any([_value_is_not_none(value=_v) for _v in value]) 

2239 else: 

2240 return bool(value) 

2241 

2242 if first.keys() != second.keys(): 

2243 warnings.warn( 

2244 "Subscriptions contain a different set of fields. " 

2245 "Only comparing to new fields of the new one." 

2246 ) 

2247 for k, v in first.items(): 

2248 ex_value = second.get(k, None) 

2249 if isinstance(v, dict) and isinstance(ex_value, dict): 

2250 equal = self._subscription_dicts_are_equal(v, ex_value) 

2251 if equal: 

2252 continue 

2253 else: 

2254 return False 

2255 elif isinstance(v, list) and isinstance(ex_value, list): 

2256 equal = self.compare_lists_ignore_order(v, ex_value) 

2257 if equal: 

2258 continue 

2259 else: 

2260 return False 

2261 else: 

2262 equal = v == ex_value 

2263 if equal: 

2264 continue 

2265 else: 

2266 self.logger.debug( 

2267 f"Not equal fields for key {k}: ({v}, {ex_value})" 

2268 ) 

2269 if ( 

2270 not _value_is_not_none(v) 

2271 and not _value_is_not_none(ex_value) 

2272 or k == "timesSent" 

2273 ): 

2274 continue 

2275 return False 

2276 return True