Coverage for filip/clients/ngsi_v2/cb.py: 80%

686 statements  

« prev     ^ index     » next       coverage.py v7.10.5, created at 2025-08-26 14:36 +0000

1""" 

2Context Broker Module for API Client 

3""" 

4 

5from __future__ import annotations 

6 

7import copy 

8from copy import deepcopy 

9from enum import Enum 

10from math import inf 

11from packaging.version import parse as parse_version 

12from packaging import version 

13from pydantic import PositiveInt, PositiveFloat, AnyHttpUrl, ValidationError 

14from pydantic.type_adapter import TypeAdapter 

15from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union 

16import re 

17import requests 

18from urllib.parse import urljoin 

19import warnings 

20from requests import RequestException 

21from filip.clients.base_http_client import BaseHttpClient, NgsiURLVersion 

22from filip.config import settings 

23from filip.models.base import FiwareHeader, PaginationMethod, DataType 

24from filip.utils.simple_ql import QueryString 

25from filip.models.ngsi_v2.context import ( 

26 ActionType, 

27 Command, 

28 ContextEntity, 

29 ContextEntityKeyValues, 

30 ContextAttribute, 

31 NamedCommand, 

32 NamedContextAttribute, 

33 Query, 

34 Update, 

35 PropertyFormat, 

36 ContextEntityList, 

37 ContextEntityKeyValuesList, 

38 ContextEntityValidationList, 

39 ContextEntityKeyValuesValidationList, 

40) 

41from filip.models.ngsi_v2.base import AttrsFormat 

42from filip.models.ngsi_v2.subscriptions import Subscription, Message 

43from filip.models.ngsi_v2.registrations import Registration 

44from filip.clients.exceptions import BaseHttpClientException 

45 

46if TYPE_CHECKING: 

47 from filip.clients.ngsi_v2.iota import IoTAClient 

48 

49 

50class ContextBrokerClient(BaseHttpClient): 

51 """ 

52 Implementation of NGSI Context Broker functionalities, such as creating 

53 entities and subscriptions; retrieving, updating and deleting data. 

54 Further documentation: 

55 https://fiware-orion.readthedocs.io/en/master/ 

56 

57 Api specifications for v2 are located here: 

58 https://telefonicaid.github.io/fiware-orion/api/v2/stable/ 

59 

60 Note: 

61 We use the reference implementation for development. Therefore, some 

62 other brokers may show slightly different behavior! 

63 """ 

64 

65 def __init__( 

66 self, 

67 url: str = None, 

68 *, 

69 session: requests.Session = None, 

70 fiware_header: FiwareHeader = None, 

71 **kwargs, 

72 ): 

73 """ 

74 

75 Args: 

76 url: Url of context broker server 

77 session (requests.Session): 

78 fiware_header (FiwareHeader): fiware service and fiware service path 

79 **kwargs (Optional): Optional arguments that ``request`` takes. 

80 """ 

81 # set service url 

82 url = url or settings.CB_URL 

83 self._url_version = NgsiURLVersion.v2_url.value 

84 super().__init__( 

85 url=url, session=session, fiware_header=fiware_header, **kwargs 

86 ) 

87 self._check_correct_cb_version() 

88 

89 def __pagination( 

90 self, 

91 *, 

92 method: PaginationMethod = PaginationMethod.GET, 

93 url: str, 

94 headers: Dict, 

95 limit: Union[PositiveInt, PositiveFloat] = None, 

96 params: Dict = None, 

97 data: str = None, 

98 ) -> List[Dict]: 

99 """ 

100 NGSIv2 implements a pagination mechanism in order to help clients to 

101 retrieve large sets of resources. This mechanism works for all listing 

102 operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions, 

103 POST /v2/op/query, etc.). This function helps getting datasets that are 

104 larger than the limit for the different GET operations. 

105 

106 https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html 

107 

108 Args: 

109 url: Information about the url, obtained from the original function 

110 headers: The headers from the original function 

111 params: 

112 limit: 

113 

114 Returns: 

115 object: 

116 

117 """ 

118 if limit is None: 

119 limit = inf 

120 if limit > 1000: 

121 params["limit"] = 1000 # maximum items per request 

122 else: 

123 params["limit"] = limit 

124 

125 if self.session: 

126 session = self.session 

127 else: 

128 session = requests.Session() 

129 with session: 

130 res = session.request( 

131 method=method, url=url, params=params, headers=headers, data=data 

132 ) 

133 if res.ok: 

134 items = res.json() 

135 # do pagination 

136 count = int(res.headers["Fiware-Total-Count"]) 

137 

138 while len(items) < limit and len(items) < count: 

139 # Establishing the offset from where entities are retrieved 

140 params["offset"] = len(items) 

141 params["limit"] = min(1000, (limit - len(items))) 

142 res = session.request( 

143 method=method, 

144 url=url, 

145 params=params, 

146 headers=headers, 

147 data=data, 

148 ) 

149 if res.ok: 

150 items.extend(res.json()) 

151 else: 

152 res.raise_for_status() 

153 self.logger.debug("Received: %s", items) 

154 return items 

155 res.raise_for_status() 

156 

157 # MANAGEMENT API 

158 def get_version(self) -> Dict: 

159 """ 

160 Gets version of IoT Agent 

161 Returns: 

162 Dictionary with response 

163 """ 

164 url = urljoin(self.base_url, "version") 

165 try: 

166 res = self.get(url=url, headers=self.headers) 

167 if res.ok: 

168 return res.json() 

169 res.raise_for_status() 

170 except requests.RequestException as err: 

171 self.logger.error(err) 

172 msg = f"Fetch version fails, reason: {err.args}" 

173 raise BaseHttpClientException(message=msg, response=err.response) from err 

174 

175 def _check_correct_cb_version(self) -> None: 

176 """ 

177 Checks whether the used Orion version is greater or equal than the minimum required orion version of 

178 the current filip version 

179 """ 

180 orion_version = self.get_version()["orion"]["version"] 

181 if version.parse(orion_version) < version.parse(settings.MINIMUM_ORION_VERSION): 

182 self.logger.warning( 

183 f"You are using orion version {orion_version}. There was a breaking change in Orion Version " 

184 f"{settings.MINIMUM_ORION_VERSION}, therefore functionality is not assured when using " 

185 f"version {orion_version}." 

186 ) 

187 

188 def get_resources(self) -> Dict: 

189 """ 

190 Gets reo 

191 

192 Returns: 

193 Dict 

194 """ 

195 url = urljoin(self.base_url, self._url_version) 

196 try: 

197 res = self.get(url=url, headers=self.headers) 

198 if res.ok: 

199 return res.json() 

200 res.raise_for_status() 

201 except requests.RequestException as err: 

202 self.logger.error(err) 

203 raise RequestException(response=err.response) from err 

204 

205 # STATISTICS API 

206 def get_statistics(self) -> Dict: 

207 """ 

208 Gets statistics of context broker 

209 Returns: 

210 Dictionary with response 

211 """ 

212 url = urljoin(self.base_url, "statistics") 

213 try: 

214 res = self.get(url=url, headers=self.headers) 

215 if res.ok: 

216 return res.json() 

217 res.raise_for_status() 

218 except requests.RequestException as err: 

219 self.logger.error(err) 

220 raise BaseHttpClientException( 

221 message=err.response.text, response=err.response 

222 ) from err 

223 

224 # CONTEXT MANAGEMENT API ENDPOINTS 

225 # Entity Operations 

226 def post_entity( 

227 self, 

228 entity: Union[ContextEntity, ContextEntityKeyValues], 

229 update: bool = False, 

230 patch: bool = False, 

231 override_metadata: bool = True, 

232 key_values: bool = False, 

233 ): 

234 """ 

235 Function registers an Object with the NGSI Context Broker, 

236 if it already exists it can be automatically updated (overwritten) 

237 if the update bool is True. 

238 First a post request with the entity is tried, if the response code 

239 is 422 the entity is uncrossable, as it already exists there are two 

240 options, either overwrite it, if the attribute have changed 

241 (e.g. at least one new/new values) (update = True) or leave 

242 it the way it is (update=False) 

243 If you only want to manipulate the entities values, you need to set 

244 patch argument. 

245 

246 Args: 

247 entity (ContextEntity/ContextEntityKeyValues): 

248 Context Entity Object 

249 update (bool): 

250 If the response.status_code is 422, whether the override and 

251 existing entity 

252 patch (bool): 

253 If the response.status_code is 422, whether to manipulate the 

254 existing entity. Omitted if update `True`. 

255 override_metadata: 

256 Only applies for patch equal to `True`. 

257 Whether to override or append the attribute's metadata. 

258 `True` for overwrite or `False` for update/append 

259 key_values(bool): 

260 By default False. If set to True, "options=keyValues" will 

261 be included in params of post request. The payload uses 

262 the keyValues simplified entity representation, i.e. 

263 ContextEntityKeyValues. 

264 """ 

265 url = urljoin(self.base_url, f"{self._url_version}/entities") 

266 headers = self.headers.copy() 

267 params = {} 

268 options = [] 

269 if key_values: 

270 assert isinstance(entity, ContextEntityKeyValues) 

271 options.append("keyValues") 

272 else: 

273 assert isinstance(entity, ContextEntity) 

274 if options: 

275 params.update({"options": ",".join(options)}) 

276 try: 

277 res = self.post( 

278 url=url, 

279 headers=headers, 

280 json=entity.model_dump(exclude_none=True), 

281 params=params, 

282 ) 

283 if res.ok: 

284 self.logger.info("Entity successfully posted!") 

285 return res.headers.get("Location") 

286 res.raise_for_status() 

287 except requests.RequestException as err: 

288 if err.response is not None: 

289 if update and err.response.status_code == 422: 

290 return self.override_entity(entity=entity, key_values=key_values) 

291 if patch and err.response.status_code == 422: 

292 return self.patch_entity( 

293 entity=entity, 

294 override_metadata=override_metadata, 

295 key_values=key_values, 

296 ) 

297 msg = f"Could not post entity {entity.id}" 

298 raise BaseHttpClientException(message=msg, response=err.response) from err 

299 

300 def get_entity_list( 

301 self, 

302 *, 

303 entity_ids: List[str] = None, 

304 entity_types: List[str] = None, 

305 id_pattern: str = None, 

306 type_pattern: str = None, 

307 q: Union[str, QueryString] = None, 

308 mq: Union[str, QueryString] = None, 

309 georel: str = None, 

310 geometry: str = None, 

311 coords: str = None, 

312 limit: PositiveInt = inf, 

313 attrs: List[str] = None, 

314 metadata: str = None, 

315 order_by: str = None, 

316 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

317 include_invalid: bool = False, 

318 ) -> Union[ 

319 List[Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]], 

320 ContextEntityValidationList, 

321 ContextEntityKeyValuesValidationList, 

322 ]: 

323 r""" 

324 Retrieves a list of context entities that match different criteria by 

325 id, type, pattern matching (either id or type) and/or those which 

326 match a query or geographical query (see Simple Query Language and 

327 Geographical Queries). A given entity has to match all the criteria 

328 to be retrieved (i.e., the criteria is combined in a logical AND 

329 way). Note that pattern matching query parameters are incompatible 

330 (i.e. mutually exclusive) with their corresponding exact matching 

331 parameters, i.e. idPattern with id and typePattern with type. 

332 

333 Args: 

334 entity_ids: A comma-separated list of elements. Retrieve entities 

335 whose ID matches one of the elements in the list. 

336 Incompatible with idPattern,e.g. Boe_Idarium 

337 entity_types: comma-separated list of elements. Retrieve entities 

338 whose type matches one of the elements in the list. 

339 Incompatible with typePattern. Example: Room. 

340 id_pattern: A correctly formatted regular expression. Retrieve 

341 entities whose ID matches the regular expression. Incompatible 

342 with id, e.g. ngsi-ld.* or sensor.* 

343 type_pattern: A correctly formatted regular expression. Retrieve 

344 entities whose type matches the regular expression. 

345 Incompatible with type, e.g. room.* 

346 q (SimpleQuery): A query expression, composed of a list of 

347 statements separated by ;, i.e., 

348 q=statement1;statement2;statement3. See Simple Query 

349 Language specification. Example: temperature>40. 

350 mq (SimpleQuery): A query expression for attribute metadata, 

351 composed of a list of statements separated by ;, i.e., 

352 mq=statement1;statement2;statement3. See Simple Query 

353 Language specification. Example: temperature.accuracy<0.9. 

354 georel: Spatial relationship between matching entities and a 

355 reference shape. See Geographical Queries. Example: 'near'. 

356 geometry: Geographical area to which the query is restricted. 

357 See Geographical Queries. Example: point. 

358 coords: List of latitude-longitude pairs of coordinates separated 

359 by ';'. See Geographical Queries. Example: 41.390205, 

360 2.154007;48.8566,2.3522. 

361 limit: Limits the number of entities to be retrieved Example: 20 

362 attrs: Comma-separated list of attribute names whose data are to 

363 be included in the response. The attributes are retrieved in 

364 the order specified by this parameter. If this parameter is 

365 not included, the attributes are retrieved in arbitrary 

366 order. See "Filtering out attributes and metadata" section 

367 for more detail. Example: seatNumber. 

368 metadata: A list of metadata names to include in the response. 

369 See "Filtering out attributes and metadata" section for more 

370 detail. Example: accuracy. 

371 order_by: Criteria for ordering results. See "Ordering Results" 

372 section for details. Example: temperature,!speed. 

373 response_format (AttrsFormat, str): Response Format. Note: That if 

374 'keyValues' or 'values' are used the response model will 

375 change to List[ContextEntityKeyValues] and to List[Dict[str, 

376 Any]], respectively. 

377 include_invalid: Specify if the returned list should also contain a list of invalid entity IDs or not. 

378 Returns: 

379 

380 """ 

381 url = urljoin(self.base_url, f"{self._url_version}/entities/") 

382 headers = self.headers.copy() 

383 params = {} 

384 

385 if entity_ids and id_pattern: 

386 raise ValueError 

387 if entity_types and type_pattern: 

388 raise ValueError 

389 if entity_ids: 

390 if not isinstance(entity_ids, list): 

391 entity_ids = [entity_ids] 

392 params.update({"id": ",".join(entity_ids)}) 

393 if id_pattern: 

394 try: 

395 re.compile(id_pattern) 

396 except re.error as err: 

397 raise ValueError(f"Invalid Pattern: {err}") from err 

398 params.update({"idPattern": id_pattern}) 

399 if entity_types: 

400 if not isinstance(entity_types, list): 

401 entity_types = [entity_types] 

402 params.update({"type": ",".join(entity_types)}) 

403 if type_pattern: 

404 try: 

405 re.compile(type_pattern) 

406 except re.error as err: 

407 raise ValueError(f"Invalid Pattern: {err.msg}") from err 

408 params.update({"typePattern": type_pattern}) 

409 if attrs: 

410 params.update({"attrs": ",".join(attrs)}) 

411 if metadata: 

412 params.update({"metadata": ",".join(metadata)}) 

413 if q: 

414 if isinstance(q, str): 

415 q = QueryString.parse_str(q) 

416 params.update({"q": str(q)}) 

417 if mq: 

418 params.update({"mq": str(mq)}) 

419 if geometry: 

420 params.update({"geometry": geometry}) 

421 if georel: 

422 params.update({"georel": georel}) 

423 if coords: 

424 params.update({"coords": coords}) 

425 if order_by: 

426 params.update({"orderBy": order_by}) 

427 if response_format not in list(AttrsFormat): 

428 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

429 response_format = ",".join(["count", response_format]) 

430 params.update({"options": response_format}) 

431 try: 

432 items = self.__pagination( 

433 method=PaginationMethod.GET, 

434 limit=limit, 

435 url=url, 

436 params=params, 

437 headers=headers, 

438 ) 

439 if include_invalid: 

440 valid_entities = [] 

441 invalid_entities = [] 

442 

443 if AttrsFormat.NORMALIZED in response_format: 

444 adapter = TypeAdapter(ContextEntity) 

445 

446 for entity in items: 

447 try: 

448 valid_entity = adapter.validate_python(entity) 

449 valid_entities.append(valid_entity) 

450 except ValidationError: 

451 invalid_entities.append(entity.get("id")) 

452 

453 return ContextEntityValidationList.model_validate( 

454 { 

455 "entities": valid_entities, 

456 "invalid_entities": invalid_entities, 

457 } 

458 ) 

459 elif AttrsFormat.KEY_VALUES in response_format: 

460 adapter = TypeAdapter(ContextEntityKeyValues) 

461 

462 for entity in items: 

463 try: 

464 valid_entity = adapter.validate_python(entity) 

465 valid_entities.append(valid_entity) 

466 except ValidationError: 

467 invalid_entities.append(entity.get("id")) 

468 

469 return ContextEntityKeyValuesValidationList.model_validate( 

470 { 

471 "entities": valid_entities, 

472 "invalid_entities": invalid_entities, 

473 } 

474 ) 

475 else: 

476 return items 

477 else: 

478 if AttrsFormat.NORMALIZED in response_format: 

479 return ContextEntityList.model_validate( 

480 {"entities": items} 

481 ).entities 

482 elif AttrsFormat.KEY_VALUES in response_format: 

483 return ContextEntityKeyValuesList.model_validate( 

484 {"entities": items} 

485 ).entities 

486 return items # in case of VALUES as response_format 

487 

488 except requests.RequestException as err: 

489 msg = "Could not load entities" 

490 raise BaseHttpClientException(message=msg, response=err.response) from err 

491 

492 def get_entity( 

493 self, 

494 entity_id: str, 

495 entity_type: str = None, 

496 attrs: List[str] = None, 

497 metadata: List[str] = None, 

498 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

499 ) -> Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]: 

500 """ 

501 This operation must return one entity element only, but there may be 

502 more than one entity with the same ID (e.g. entities with same ID but 

503 different types). In such case, an error message is returned, with 

504 the HTTP status code set to 409 Conflict. 

505 

506 Args: 

507 entity_id (String): Id of the entity to be retrieved 

508 entity_type (String): Entity type, to avoid ambiguity in case 

509 there are several entities with the same entity id. 

510 attrs (List of Strings): List of attribute names whose data must be 

511 included in the response. The attributes are retrieved in the 

512 order specified by this parameter. 

513 See "Filtering out attributes and metadata" section for more 

514 detail. If this parameter is not included, the attributes are 

515 retrieved in arbitrary order, and all the attributes of the 

516 entity are included in the response. 

517 Example: temperature, humidity. 

518 metadata (List of Strings): A list of metadata names to include in 

519 the response. See "Filtering out attributes and metadata" 

520 section for more detail. Example: accuracy. 

521 response_format (AttrsFormat, str): Representation format of 

522 response 

523 Returns: 

524 ContextEntity 

525 """ 

526 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}") 

527 headers = self.headers.copy() 

528 params = {} 

529 if entity_type: 

530 params.update({"type": entity_type}) 

531 if attrs: 

532 params.update({"attrs": ",".join(attrs)}) 

533 if metadata: 

534 params.update({"metadata": ",".join(metadata)}) 

535 if response_format not in list(AttrsFormat): 

536 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

537 params.update({"options": response_format}) 

538 

539 try: 

540 res = self.get(url=url, params=params, headers=headers) 

541 if res.ok: 

542 self.logger.info("Entity successfully retrieved!") 

543 self.logger.debug("Received: %s", res.json()) 

544 if response_format == AttrsFormat.NORMALIZED: 

545 return ContextEntity(**res.json()) 

546 if response_format == AttrsFormat.KEY_VALUES: 

547 return ContextEntityKeyValues(**res.json()) 

548 return res.json() 

549 res.raise_for_status() 

550 except requests.RequestException as err: 

551 msg = f"Could not load entity {entity_id}" 

552 raise BaseHttpClientException(message=msg, response=err.response) from err 

553 

554 def get_entity_attributes( 

555 self, 

556 entity_id: str, 

557 entity_type: str = None, 

558 attrs: List[str] = None, 

559 metadata: List[str] = None, 

560 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

561 ) -> Dict[str, ContextAttribute]: 

562 """ 

563 This request is similar to retrieving the whole entity, however this 

564 one omits the id and type fields. Just like the general request of 

565 getting an entire entity, this operation must return only one entity 

566 element. If more than one entity with the same ID is found (e.g. 

567 entities with same ID but different type), an error message is 

568 returned, with the HTTP status code set to 409 Conflict. 

569 

570 Args: 

571 entity_id (String): Id of the entity to be retrieved 

572 entity_type (String): Entity type, to avoid ambiguity in case 

573 there are several entities with the same entity id. 

574 attrs (List of Strings): List of attribute names whose data must be 

575 included in the response. The attributes are retrieved in the 

576 order specified by this parameter. 

577 See "Filtering out attributes and metadata" section for more 

578 detail. If this parameter is not included, the attributes are 

579 retrieved in arbitrary order, and all the attributes of the 

580 entity are included in the response. Example: temperature, 

581 humidity. 

582 metadata (List of Strings): A list of metadata names to include in 

583 the response. See "Filtering out attributes and metadata" 

584 section for more detail. Example: accuracy. 

585 response_format (AttrsFormat, str): Representation format of 

586 response 

587 Returns: 

588 Dict 

589 """ 

590 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs") 

591 headers = self.headers.copy() 

592 params = {} 

593 if entity_type: 

594 params.update({"type": entity_type}) 

595 if attrs: 

596 params.update({"attrs": ",".join(attrs)}) 

597 if metadata: 

598 params.update({"metadata": ",".join(metadata)}) 

599 if response_format not in list(AttrsFormat): 

600 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

601 params.update({"options": response_format}) 

602 try: 

603 res = self.get(url=url, params=params, headers=headers) 

604 if res.ok: 

605 if response_format == AttrsFormat.NORMALIZED: 

606 return { 

607 key: ContextAttribute(**values) 

608 for key, values in res.json().items() 

609 } 

610 return res.json() 

611 res.raise_for_status() 

612 except requests.RequestException as err: 

613 msg = f"Could not load attributes from entity {entity_id} !" 

614 raise BaseHttpClientException(message=msg, response=err.response) from err 

615 

616 def update_entity( 

617 self, 

618 entity: Union[ContextEntity, ContextEntityKeyValues, dict], 

619 append_strict: bool = False, 

620 key_values: bool = False, 

621 ): 

622 """ 

623 The request payload is an object representing the attributes to 

624 append or update. 

625 

626 Note: 

627 Update means overwriting the existing entity. If you want to 

628 manipulate you should rather use patch_entity. 

629 

630 Args: 

631 entity (ContextEntity): 

632 append_strict: If `False` the entity attributes are updated (if they 

633 previously exist) or appended (if they don't previously exist) 

634 with the ones in the payload. 

635 If `True` all the attributes in the payload not 

636 previously existing in the entity are appended. In addition 

637 to that, in case some of the attributes in the payload 

638 already exist in the entity, an error is returned. 

639 More precisely this means a strict append procedure. 

640 key_values: By default False. If set to True, the payload uses 

641 the keyValues simplified entity representation, i.e. 

642 ContextEntityKeyValues. 

643 Returns: 

644 None 

645 """ 

646 if key_values: 

647 if isinstance(entity, dict): 

648 entity = copy.deepcopy(entity) 

649 _id = entity.pop("id") 

650 _type = entity.pop("type") 

651 attrs = entity 

652 entity = ContextEntityKeyValues(id=_id, type=_type) 

653 else: 

654 attrs = entity.model_dump(exclude={"id", "type"}) 

655 else: 

656 attrs = entity.get_attributes() 

657 self.update_or_append_entity_attributes( 

658 entity_id=entity.id, 

659 entity_type=entity.type, 

660 attrs=attrs, 

661 append_strict=append_strict, 

662 key_values=key_values, 

663 ) 

664 

665 def update_entity_properties( 

666 self, entity: ContextEntity, append_strict: bool = False 

667 ): 

668 """ 

669 The request payload is an object representing the attributes, of any type 

670 but Relationship, to append or update. 

671 

672 Note: 

673 Update means overwriting the existing entity. If you want to 

674 manipulate you should rather use patch_entity. 

675 

676 Args: 

677 entity (ContextEntity): 

678 append_strict: If `False` the entity attributes are updated (if they 

679 previously exist) or appended (if they don't previously exist) 

680 with the ones in the payload. 

681 If `True` all the attributes in the payload not 

682 previously existing in the entity are appended. In addition 

683 to that, in case some of the attributes in the payload 

684 already exist in the entity, an error is returned. 

685 More precisely this means a strict append procedure. 

686 

687 Returns: 

688 None 

689 """ 

690 self.update_or_append_entity_attributes( 

691 entity_id=entity.id, 

692 entity_type=entity.type, 

693 attrs=entity.get_properties(), 

694 append_strict=append_strict, 

695 ) 

696 

697 def update_entity_relationships( 

698 self, entity: ContextEntity, append_strict: bool = False 

699 ): 

700 """ 

701 The request payload is an object representing only the attributes, of type 

702 Relationship, to append or update. 

703 

704 Note: 

705 Update means overwriting the existing entity. If you want to 

706 manipulate you should rather use patch_entity. 

707 

708 Args: 

709 entity (ContextEntity): 

710 append_strict: If `False` the entity attributes are updated (if they 

711 previously exist) or appended (if they don't previously exist) 

712 with the ones in the payload. 

713 If `True` all the attributes in the payload not 

714 previously existing in the entity are appended. In addition 

715 to that, in case some of the attributes in the payload 

716 already exist in the entity, an error is returned. 

717 More precisely this means a strict append procedure. 

718 

719 Returns: 

720 None 

721 """ 

722 self.update_or_append_entity_attributes( 

723 entity_id=entity.id, 

724 entity_type=entity.type, 

725 attrs=entity.get_relationships(), 

726 append_strict=append_strict, 

727 ) 

728 

729 def delete_entity( 

730 self, 

731 entity_id: str, 

732 entity_type: str = None, 

733 delete_devices: bool = False, 

734 iota_client: IoTAClient = None, 

735 iota_url: AnyHttpUrl = settings.IOTA_URL, 

736 ) -> None: 

737 """ 

738 Remove a entity from the context broker. No payload is required 

739 or received. 

740 

741 Args: 

742 entity_id: 

743 Id of the entity to be deleted 

744 entity_type: 

745 Entity type, to avoid ambiguity in case there are several 

746 entities with the same entity id. 

747 delete_devices: 

748 If True, also delete all devices that reference this 

749 entity (entity_id as entity_name) 

750 iota_client: 

751 Corresponding IoTA-Client used to access IoTA-Agent 

752 iota_url: 

753 URL of the corresponding IoT-Agent. This will autogenerate 

754 an IoTA-Client, mirroring the information of the 

755 ContextBrokerClient, e.g. FiwareHeader, and other headers 

756 

757 Returns: 

758 None 

759 """ 

760 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}") 

761 headers = self.headers.copy() 

762 if entity_type: 

763 params = {"type": entity_type} 

764 else: 

765 params = None 

766 try: 

767 res = self.delete(url=url, params=params, headers=headers) 

768 if res.ok: 

769 self.logger.info("Entity '%s' successfully deleted!", entity_id) 

770 else: 

771 res.raise_for_status() 

772 except requests.RequestException as err: 

773 msg = f"Could not delete entity {entity_id} !" 

774 raise BaseHttpClientException(message=msg, response=err.response) from err 

775 

776 if delete_devices: 

777 from filip.clients.ngsi_v2 import IoTAClient 

778 

779 if iota_client: 

780 iota_client_local = deepcopy(iota_client) 

781 else: 

782 warnings.warn( 

783 "No IoTA-Client object provided! " 

784 "Will try to generate one. " 

785 "This usage is not recommended." 

786 ) 

787 

788 iota_client_local = IoTAClient( 

789 url=iota_url, 

790 fiware_header=self.fiware_headers, 

791 headers=self.headers, 

792 ) 

793 

794 for device in iota_client_local.get_device_list(entity_names=[entity_id]): 

795 if entity_type: 

796 if device.entity_type == entity_type: 

797 iota_client_local.delete_device(device_id=device.device_id) 

798 else: 

799 iota_client_local.delete_device(device_id=device.device_id) 

800 iota_client_local.close() 

801 

802 def delete_entities(self, entities: List[ContextEntity]) -> None: 

803 """ 

804 Remove a list of entities from the context broker. This methode is 

805 more efficient than to call delete_entity() for each entity 

806 

807 Args: 

808 entities: List[ContextEntity]: List of entities to be deleted 

809 

810 Raises: 

811 Exception, if one of the entities is not in the ContextBroker 

812 

813 Returns: 

814 None 

815 """ 

816 

817 # update() delete, deletes all entities without attributes completely, 

818 # and removes the attributes for the other 

819 # The entities are sorted based on the fact if they have 

820 # attributes. 

821 limit = 1000 # max number of entities that will be deleted at once 

822 entities_with_attributes: List[ContextEntity] = [] 

823 for entity in entities: 

824 attribute_names = [ 

825 key 

826 for key in entity.model_dump() 

827 if key not in ContextEntity.model_fields 

828 ] 

829 if len(attribute_names) > 0: 

830 entities_with_attributes.append( 

831 ContextEntity(id=entity.id, type=entity.type) 

832 ) 

833 

834 # Post update_delete for those without attribute only once, 

835 # for the other post update_delete again but for the changed entity 

836 # in the ContextBroker (only id and type left) 

837 while len(entities) > 0: 

838 self.update(entities=entities[0:limit], action_type="delete") 

839 entities = entities[limit:] 

840 while len(entities_with_attributes) > 0: 

841 self.update( 

842 entities=entities_with_attributes[0:limit], action_type="delete" 

843 ) 

844 entities_with_attributes = entities_with_attributes[limit:] 

845 

846 def update_or_append_entity_attributes( 

847 self, 

848 entity_id: str, 

849 attrs: Union[ 

850 List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any] 

851 ], 

852 entity_type: str = None, 

853 append_strict: bool = False, 

854 forcedUpdate: bool = False, 

855 key_values: bool = False, 

856 ): 

857 """ 

858 The request payload is an object representing the attributes to 

859 append or update. This corresponds to a 'POST' request if append is 

860 set to 'False' 

861 

862 Note: 

863 Be careful not to update attributes that are 

864 provided via context registration, e.g. commands. Commands are 

865 removed before sending the request. To avoid breaking things. 

866 

867 Args: 

868 entity_id: Entity id to be updated 

869 entity_type: Entity type, to avoid ambiguity in case there are 

870 several entities with the same entity id. 

871 attrs: List of attributes to update or to append 

872 append_strict: If `False` the entity attributes are updated (if they 

873 previously exist) or appended (if they don't previously exist) 

874 with the ones in the payload. 

875 If `True` all the attributes in the payload not 

876 previously existing in the entity are appended. In addition 

877 to that, in case some of the attributes in the payload 

878 already exist in the entity, an error is returned. 

879 More precisely this means a strict append procedure. 

880 forcedUpdate: Update operation have to trigger any matching 

881 subscription, no matter if there is an actual attribute 

882 update or no instead of the default behavior, which is to 

883 updated only if attribute is effectively updated. 

884 key_values: By default False. If set to True, the payload uses 

885 the keyValues simplified entity representation, i.e. 

886 ContextEntityKeyValues. 

887 Returns: 

888 None 

889 

890 """ 

891 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs") 

892 headers = self.headers.copy() 

893 params = {} 

894 if entity_type: 

895 params.update({"type": entity_type}) 

896 else: 

897 entity_type = "dummy" 

898 

899 options = [] 

900 if append_strict: 

901 options.append("append") 

902 if forcedUpdate: 

903 options.append("forcedUpdate") 

904 if key_values: 

905 assert isinstance(attrs, dict), "for keyValues attrs has to be a dict" 

906 options.append("keyValues") 

907 if options: 

908 params.update({"options": ",".join(options)}) 

909 

910 if key_values: 

911 entity = ContextEntityKeyValues(id=entity_id, type=entity_type, **attrs) 

912 else: 

913 entity = ContextEntity(id=entity_id, type=entity_type) 

914 entity.add_attributes(attrs) 

915 # exclude commands from the send data, 

916 # as they live in the IoTA-agent 

917 excluded_keys = {"id", "type"} 

918 # excluded_keys.update( 

919 # entity.get_commands(response_format=PropertyFormat.DICT).keys() 

920 # ) 

921 try: 

922 res = self.post( 

923 url=url, 

924 headers=headers, 

925 json=entity.model_dump(exclude=excluded_keys, exclude_none=True), 

926 params=params, 

927 ) 

928 if res.ok: 

929 self.logger.info("Entity '%s' successfully " "updated!", entity.id) 

930 else: 

931 res.raise_for_status() 

932 except requests.RequestException as err: 

933 msg = f"Could not update or append attributes of entity" f" {entity.id} !" 

934 self.log_error(err=err, msg=msg) 

935 raise BaseHttpClientException(message=msg, response=err.response) from err 

936 

937 def update_existing_entity_attributes( 

938 self, 

939 entity_id: str, 

940 attrs: Union[ 

941 List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any] 

942 ], 

943 entity_type: str = None, 

944 forcedUpdate: bool = False, 

945 override_metadata: bool = False, 

946 key_values: bool = False, 

947 ): 

948 """ 

949 The entity attributes are updated with the ones in the payload. 

950 In addition to that, if one or more attributes in the payload doesn't 

951 exist in the entity, an error is returned. This corresponds to a 

952 'PATCH' request. 

953 

954 Args: 

955 entity_id: Entity id to be updated 

956 entity_type: Entity type, to avoid ambiguity in case there are 

957 several entities with the same entity id. 

958 attrs: List of attributes to update or to append 

959 forcedUpdate: Update operation have to trigger any matching 

960 subscription, no matter if there is an actual attribute 

961 update or no instead of the default behavior, which is to 

962 updated only if attribute is effectively updated. 

963 override_metadata: 

964 Bool,replace the existing metadata with the one provided in 

965 the request 

966 key_values: By default False. If set to True, the payload uses 

967 the keyValues simplified entity representation, i.e. 

968 ContextEntityKeyValues. 

969 Returns: 

970 None 

971 

972 """ 

973 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs") 

974 headers = self.headers.copy() 

975 if entity_type: 

976 params = {"type": entity_type} 

977 else: 

978 params = None 

979 entity_type = "dummy" 

980 

981 options = [] 

982 if override_metadata: 

983 options.append("overrideMetadata") 

984 if forcedUpdate: 

985 options.append("forcedUpdate") 

986 if key_values: 

987 assert isinstance(attrs, dict), "for keyValues the attrs must be dict" 

988 payload = attrs 

989 options.append("keyValues") 

990 else: 

991 entity = ContextEntity(id=entity_id, type=entity_type) 

992 entity.add_attributes(attrs) 

993 payload = entity.model_dump(exclude={"id", "type"}, exclude_none=True) 

994 if options: 

995 params.update({"options": ",".join(options)}) 

996 

997 try: 

998 res = self.patch( 

999 url=url, 

1000 headers=headers, 

1001 json=payload, 

1002 params=params, 

1003 ) 

1004 if res.ok: 

1005 self.logger.info("Entity '%s' successfully " "updated!", entity_id) 

1006 else: 

1007 res.raise_for_status() 

1008 except requests.RequestException as err: 

1009 msg = f"Could not update attributes of entity" 

1010 raise BaseHttpClientException(message=msg, response=err.response) from err 

1011 

1012 def override_entity( 

1013 self, entity: Union[ContextEntity, ContextEntityKeyValues], **kwargs 

1014 ): 

1015 """ 

1016 The request payload is an object representing the attributes to 

1017 override the existing entity. 

1018 

1019 Note: 

1020 If you want to manipulate you should rather use patch_entity. 

1021 

1022 Args: 

1023 entity (ContextEntity or ContextEntityKeyValues): 

1024 Returns: 

1025 None 

1026 """ 

1027 return self.replace_entity_attributes( 

1028 entity_id=entity.id, 

1029 entity_type=entity.type, 

1030 attrs=entity.get_attributes(), 

1031 **kwargs, 

1032 ) 

1033 

1034 def replace_entity_attributes( 

1035 self, 

1036 entity_id: str, 

1037 attrs: Union[ 

1038 List[Union[NamedContextAttribute, Dict[str, ContextAttribute]]], Dict 

1039 ], 

1040 entity_type: str = None, 

1041 forcedUpdate: bool = False, 

1042 key_values: bool = False, 

1043 ): 

1044 """ 

1045 The attributes previously existing in the entity are removed and 

1046 replaced by the ones in the request. This corresponds to a 'PUT' 

1047 request. 

1048 

1049 Args: 

1050 entity_id: Entity id to be updated 

1051 entity_type: Entity type, to avoid ambiguity in case there are 

1052 several entities with the same entity id. 

1053 attrs: List of attributes to add to the entity or dict of 

1054 attributes in case of key_values=True. 

1055 forcedUpdate: Update operation have to trigger any matching 

1056 subscription, no matter if there is an actual attribute 

1057 update or no instead of the default behavior, which is to 

1058 updated only if attribute is effectively updated. 

1059 key_values(bool): 

1060 By default False. If set to True, "options=keyValues" will 

1061 be included in params of the request. The payload uses 

1062 the keyValues simplified entity representation, i.e. 

1063 ContextEntityKeyValues. 

1064 Returns: 

1065 None 

1066 """ 

1067 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs") 

1068 headers = self.headers.copy() 

1069 params = {} 

1070 options = [] 

1071 if entity_type: 

1072 params.update({"type": entity_type}) 

1073 else: 

1074 entity_type = "dummy" 

1075 

1076 if forcedUpdate: 

1077 options.append("forcedUpdate") 

1078 

1079 if key_values: 

1080 options.append("keyValues") 

1081 assert isinstance(attrs, dict) 

1082 else: 

1083 entity = ContextEntity(id=entity_id, type=entity_type) 

1084 entity.add_attributes(attrs) 

1085 attrs = entity.model_dump(exclude={"id", "type"}, exclude_none=True) 

1086 if options: 

1087 params.update({"options": ",".join(options)}) 

1088 

1089 try: 

1090 res = self.put( 

1091 url=url, 

1092 headers=headers, 

1093 json=attrs, 

1094 params=params, 

1095 ) 

1096 if res.ok: 

1097 self.logger.info("Entity '%s' successfully " "updated!", entity_id) 

1098 else: 

1099 res.raise_for_status() 

1100 except requests.RequestException as err: 

1101 msg = f"Could not replace attribute of entity {entity_id} !" 

1102 raise BaseHttpClientException(message=msg, response=err.response) from err 

1103 

1104 # Attribute operations 

1105 def get_attribute( 

1106 self, 

1107 entity_id: str, 

1108 attr_name: str, 

1109 entity_type: str = None, 

1110 metadata: str = None, 

1111 response_format="", 

1112 ) -> ContextAttribute: 

1113 """ 

1114 Retrieves a specified attribute from an entity. 

1115 

1116 Args: 

1117 entity_id: Id of the entity. Example: Bcn_Welt 

1118 attr_name: Name of the attribute to be retrieved. 

1119 entity_type (Optional): Type of the entity to retrieve 

1120 metadata (Optional): A list of metadata names to include in the 

1121 response. See "Filtering out attributes and metadata" section 

1122 for more detail. 

1123 

1124 Returns: 

1125 The content of the retrieved attribute as ContextAttribute 

1126 

1127 Raises: 

1128 Error 

1129 

1130 """ 

1131 url = urljoin( 

1132 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}" 

1133 ) 

1134 headers = self.headers.copy() 

1135 params = {} 

1136 if entity_type: 

1137 params.update({"type": entity_type}) 

1138 if metadata: 

1139 params.update({"metadata": ",".join(metadata)}) 

1140 try: 

1141 res = self.get(url=url, params=params, headers=headers) 

1142 if res.ok: 

1143 self.logger.debug("Received: %s", res.json()) 

1144 return ContextAttribute(**res.json()) 

1145 res.raise_for_status() 

1146 except requests.RequestException as err: 

1147 msg = ( 

1148 f"Could not load attribute '{attr_name}' from entity" f"'{entity_id}' " 

1149 ) 

1150 raise BaseHttpClientException(message=msg, response=err.response) from err 

1151 

1152 def update_entity_attribute( 

1153 self, 

1154 entity_id: str, 

1155 attr: Union[ContextAttribute, NamedContextAttribute], 

1156 *, 

1157 entity_type: str = None, 

1158 attr_name: str = None, 

1159 override_metadata: bool = True, 

1160 forcedUpdate: bool = False, 

1161 ): 

1162 """ 

1163 Updates a specified attribute from an entity. 

1164 

1165 Args: 

1166 attr: 

1167 context attribute to update 

1168 entity_id: 

1169 Id of the entity. Example: Bcn_Welt 

1170 entity_type: 

1171 Entity type, to avoid ambiguity in case there are 

1172 several entities with the same entity id. 

1173 forcedUpdate: Update operation have to trigger any matching 

1174 subscription, no matter if there is an actual attribute 

1175 update or no instead of the default behavior, which is to 

1176 updated only if attribute is effectively updated. 

1177 attr_name: 

1178 Name of the attribute to be updated. 

1179 override_metadata: 

1180 Bool, if set to `True` (default) the metadata will be 

1181 overwritten. This is for backwards compatibility reasons. 

1182 If `False` the metadata values will be either updated if 

1183 already existing or append if not. 

1184 See also: 

1185 https://fiware-orion.readthedocs.io/en/master/user/metadata.html 

1186 """ 

1187 headers = self.headers.copy() 

1188 if not isinstance(attr, NamedContextAttribute): 

1189 assert attr_name is not None, ( 

1190 "Missing name for attribute. " 

1191 "attr_name must be present if" 

1192 "attr is of type ContextAttribute" 

1193 ) 

1194 else: 

1195 assert attr_name is None, ( 

1196 "Invalid argument attr_name. Do not set " 

1197 "attr_name if attr is of type " 

1198 "NamedContextAttribute" 

1199 ) 

1200 attr_name = attr.name 

1201 

1202 url = urljoin( 

1203 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}" 

1204 ) 

1205 params = {} 

1206 if entity_type: 

1207 params.update({"type": entity_type}) 

1208 # set overrideMetadata option (we assure backwards compatibility here) 

1209 options = [] 

1210 if override_metadata: 

1211 options.append("overrideMetadata") 

1212 if forcedUpdate: 

1213 options.append("forcedUpdate") 

1214 if options: 

1215 params.update({"options": ",".join(options)}) 

1216 try: 

1217 res = self.put( 

1218 url=url, 

1219 headers=headers, 

1220 params=params, 

1221 json=attr.model_dump(exclude={"name"}, exclude_none=True), 

1222 ) 

1223 if res.ok: 

1224 self.logger.info( 

1225 "Attribute '%s' of '%s' " "successfully updated!", 

1226 attr_name, 

1227 entity_id, 

1228 ) 

1229 else: 

1230 res.raise_for_status() 

1231 except requests.RequestException as err: 

1232 msg = ( 

1233 f"Could not update attribute '{attr_name}' of entity" f"'{entity_id}' " 

1234 ) 

1235 raise BaseHttpClientException(message=msg, response=err.response) from err 

1236 

1237 def delete_entity_attribute( 

1238 self, entity_id: str, attr_name: str, entity_type: str = None 

1239 ) -> None: 

1240 """ 

1241 Removes a specified attribute from an entity. 

1242 

1243 Args: 

1244 entity_id: Id of the entity. 

1245 attr_name: Name of the attribute to be retrieved. 

1246 entity_type: Entity type, to avoid ambiguity in case there are 

1247 several entities with the same entity id. 

1248 Raises: 

1249 Error 

1250 

1251 """ 

1252 url = urljoin( 

1253 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}" 

1254 ) 

1255 headers = self.headers.copy() 

1256 params = {} 

1257 if entity_type: 

1258 params.update({"type": entity_type}) 

1259 try: 

1260 res = self.delete(url=url, headers=headers) 

1261 if res.ok: 

1262 self.logger.info( 

1263 "Attribute '%s' of '%s' " "successfully deleted!", 

1264 attr_name, 

1265 entity_id, 

1266 ) 

1267 else: 

1268 res.raise_for_status() 

1269 except requests.RequestException as err: 

1270 msg = f"Could not delete attribute '{attr_name}' of entity '{entity_id}'" 

1271 raise BaseHttpClientException(message=msg, response=err.response) from err 

1272 

1273 # Attribute value operations 

1274 def get_attribute_value( 

1275 self, entity_id: str, attr_name: str, entity_type: str = None 

1276 ) -> Any: 

1277 """ 

1278 This operation returns the value property with the value of the 

1279 attribute. 

1280 

1281 Args: 

1282 entity_id: Id of the entity. Example: Bcn_Welt 

1283 attr_name: Name of the attribute to be retrieved. 

1284 Example: temperature. 

1285 entity_type: Entity type, to avoid ambiguity in case there are 

1286 several entities with the same entity id. 

1287 

1288 Returns: 

1289 

1290 """ 

1291 url = urljoin( 

1292 self.base_url, 

1293 f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value", 

1294 ) 

1295 headers = self.headers.copy() 

1296 params = {} 

1297 if entity_type: 

1298 params.update({"type": entity_type}) 

1299 try: 

1300 res = self.get(url=url, params=params, headers=headers) 

1301 if res.ok: 

1302 self.logger.debug("Received: %s", res.json()) 

1303 return res.json() 

1304 res.raise_for_status() 

1305 except requests.RequestException as err: 

1306 msg = ( 

1307 f"Could not load value of attribute '{attr_name}' from " 

1308 f"entity'{entity_id}' " 

1309 ) 

1310 raise BaseHttpClientException(message=msg, response=err.response) from err 

1311 

1312 def update_attribute_value( 

1313 self, 

1314 *, 

1315 entity_id: str, 

1316 attr_name: str, 

1317 value: Any, 

1318 entity_type: str = None, 

1319 forcedUpdate: bool = False, 

1320 ): 

1321 """ 

1322 Updates the value of a specified attribute of an entity 

1323 

1324 Args: 

1325 value: update value 

1326 entity_id: Id of the entity. Example: Bcn_Welt 

1327 attr_name: Name of the attribute to be retrieved. 

1328 Example: temperature. 

1329 entity_type: Entity type, to avoid ambiguity in case there are 

1330 several entities with the same entity id. 

1331 forcedUpdate: Update operation have to trigger any matching 

1332 subscription, no matter if there is an actual attribute 

1333 update or no instead of the default behavior, which is to 

1334 updated only if attribute is effectively updated. 

1335 Returns: 

1336 

1337 """ 

1338 url = urljoin( 

1339 self.base_url, 

1340 f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value", 

1341 ) 

1342 headers = self.headers.copy() 

1343 params = {} 

1344 if entity_type: 

1345 params.update({"type": entity_type}) 

1346 options = [] 

1347 if forcedUpdate: 

1348 options.append("forcedUpdate") 

1349 if options: 

1350 params.update({"options": ",".join(options)}) 

1351 try: 

1352 if not isinstance(value, (dict, list)): 

1353 headers.update({"Content-Type": "text/plain"}) 

1354 if isinstance(value, str): 

1355 value = f"{value}" 

1356 res = self.put(url=url, headers=headers, json=value, params=params) 

1357 else: 

1358 res = self.put(url=url, headers=headers, json=value, params=params) 

1359 if res.ok: 

1360 self.logger.info( 

1361 "Attribute '%s' of '%s' " "successfully updated!", 

1362 attr_name, 

1363 entity_id, 

1364 ) 

1365 else: 

1366 res.raise_for_status() 

1367 except requests.RequestException as err: 

1368 msg = ( 

1369 f"Could not update value of attribute '{attr_name}' from " 

1370 f"entity '{entity_id}' " 

1371 ) 

1372 raise BaseHttpClientException(message=msg, response=err.response) from err 

1373 

1374 # Types Operations 

1375 def get_entity_types( 

1376 self, *, limit: int = None, offset: int = None, options: str = None 

1377 ) -> List[Dict[str, Any]]: 

1378 """ 

1379 

1380 Args: 

1381 limit: Limit the number of types to be retrieved. 

1382 offset: Skip a number of records. 

1383 options: Options dictionary. Allowed: count, values 

1384 

1385 Returns: 

1386 

1387 """ 

1388 url = urljoin(self.base_url, f"{self._url_version}/types") 

1389 headers = self.headers.copy() 

1390 params = {} 

1391 if limit: 

1392 params.update({"limit": limit}) 

1393 if offset: 

1394 params.update({"offset": offset}) 

1395 if options: 

1396 params.update({"options": options}) 

1397 try: 

1398 res = self.get(url=url, params=params, headers=headers) 

1399 if res.ok: 

1400 self.logger.debug("Received: %s", res.json()) 

1401 return res.json() 

1402 res.raise_for_status() 

1403 except requests.RequestException as err: 

1404 msg = "Could not load entity types!" 

1405 raise BaseHttpClientException(message=msg, response=err.response) from err 

1406 

1407 def get_entity_type(self, entity_type: str) -> Dict[str, Any]: 

1408 """ 

1409 

1410 Args: 

1411 entity_type: Entity Type. Example: Room 

1412 

1413 Returns: 

1414 

1415 """ 

1416 url = urljoin(self.base_url, f"{self._url_version}/types/{entity_type}") 

1417 headers = self.headers.copy() 

1418 params = {} 

1419 try: 

1420 res = self.get(url=url, params=params, headers=headers) 

1421 if res.ok: 

1422 self.logger.debug("Received: %s", res.json()) 

1423 return res.json() 

1424 res.raise_for_status() 

1425 except requests.RequestException as err: 

1426 msg = f"Could not load entities of type" f"'{entity_type}' " 

1427 raise BaseHttpClientException(message=msg, response=err.response) from err 

1428 

1429 # SUBSCRIPTION API ENDPOINTS 

1430 def get_subscription_list(self, limit: PositiveInt = inf) -> List[Subscription]: 

1431 """ 

1432 Returns a list of all the subscriptions present in the system. 

1433 Args: 

1434 limit: Limit the number of subscriptions to be retrieved 

1435 Returns: 

1436 list of subscriptions 

1437 """ 

1438 url = urljoin(self.base_url, f"{self._url_version}/subscriptions/") 

1439 headers = self.headers.copy() 

1440 params = {} 

1441 

1442 # We always use the 'count' option to check weather pagination is 

1443 # required 

1444 params.update({"options": "count"}) 

1445 try: 

1446 items = self.__pagination( 

1447 limit=limit, url=url, params=params, headers=headers 

1448 ) 

1449 adapter = TypeAdapter(List[Subscription]) 

1450 return adapter.validate_python(items) 

1451 except requests.RequestException as err: 

1452 msg = "Could not load subscriptions!" 

1453 raise BaseHttpClientException(message=msg, response=err.response) from err 

1454 

1455 def post_subscription( 

1456 self, 

1457 subscription: Subscription, 

1458 update: bool = False, 

1459 skip_initial_notification: bool = False, 

1460 ) -> str: 

1461 """ 

1462 Creates a new subscription. The subscription is represented by a 

1463 Subscription object defined in filip.cb.models. 

1464 

1465 If the subscription already exists, the adding is prevented and the id 

1466 of the existing subscription is returned. 

1467 

1468 A subscription is deemed as already existing if there exists a 

1469 subscription with the exact same subject and notification fields. All 

1470 optional fields are not considered. 

1471 

1472 Args: 

1473 subscription: Subscription 

1474 update: True - If the subscription already exists, update it 

1475 False- If the subscription already exists, throw warning 

1476 skip_initial_notification: True - Initial Notifications will be 

1477 sent to recipient containing the whole data. This is 

1478 deprecated and removed from version 3.0 of the context broker. 

1479 False - skip the initial notification 

1480 Returns: 

1481 str: Id of the (created) subscription 

1482 

1483 """ 

1484 existing_subscriptions = self.get_subscription_list() 

1485 

1486 sub_dict = subscription.model_dump( 

1487 include={"subject", "notification"}, 

1488 exclude={ 

1489 "notification": { 

1490 "lastSuccess", 

1491 "lastFailure", 

1492 "lastSuccessCode", 

1493 "lastFailureReason", 

1494 } 

1495 }, 

1496 ) 

1497 for ex_sub in existing_subscriptions: 

1498 if self._subscription_dicts_are_equal( 

1499 sub_dict, 

1500 ex_sub.model_dump( 

1501 include={"subject", "notification"}, 

1502 exclude={ 

1503 "lastSuccess", 

1504 "lastFailure", 

1505 "lastSuccessCode", 

1506 "lastFailureReason", 

1507 }, 

1508 ), 

1509 ): 

1510 self.logger.info("Subscription already exists") 

1511 if update: 

1512 self.logger.info("Updated subscription") 

1513 subscription.id = ex_sub.id 

1514 self.update_subscription(subscription) 

1515 else: 

1516 warnings.warn( 

1517 f"Subscription existed already with the id" f" {ex_sub.id}" 

1518 ) 

1519 return ex_sub.id 

1520 

1521 params = {} 

1522 if skip_initial_notification: 

1523 version = self.get_version()["orion"]["version"] 

1524 if parse_version(version) <= parse_version("3.1"): 

1525 params.update({"options": "skipInitialNotification"}) 

1526 else: 

1527 pass 

1528 warnings.warn( 

1529 f"Skip initial notifications is a deprecated " 

1530 f"feature of older versions <=3.1 of the context " 

1531 f"broker. The Context Broker that you requesting has " 

1532 f"version: {version}. For newer versions we " 

1533 f"automatically skip this option. Consider " 

1534 f"refactoring and updating your services", 

1535 DeprecationWarning, 

1536 ) 

1537 

1538 url = urljoin(self.base_url, "v2/subscriptions") 

1539 headers = self.headers.copy() 

1540 headers.update({"Content-Type": "application/json"}) 

1541 try: 

1542 res = self.post( 

1543 url=url, 

1544 headers=headers, 

1545 data=subscription.model_dump_json( 

1546 exclude={ 

1547 "id": True, 

1548 "notification": { 

1549 "lastSuccess", 

1550 "lastFailure", 

1551 "lastSuccessCode", 

1552 "lastFailureReason", 

1553 }, 

1554 }, 

1555 exclude_none=True, 

1556 ), 

1557 params=params, 

1558 ) 

1559 if res.ok: 

1560 self.logger.info("Subscription successfully created!") 

1561 return res.headers["Location"].split("/")[-1] 

1562 res.raise_for_status() 

1563 except requests.RequestException as err: 

1564 msg = "Could not send subscription!" 

1565 raise BaseHttpClientException(message=msg, response=err.response) from err 

1566 

1567 def get_subscription(self, subscription_id: str) -> Subscription: 

1568 """ 

1569 Retrieves a subscription from 

1570 Args: 

1571 subscription_id: id of the subscription 

1572 

1573 Returns: 

1574 

1575 """ 

1576 url = urljoin( 

1577 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}" 

1578 ) 

1579 headers = self.headers.copy() 

1580 try: 

1581 res = self.get(url=url, headers=headers) 

1582 if res.ok: 

1583 self.logger.debug("Received: %s", res.json()) 

1584 return Subscription(**res.json()) 

1585 res.raise_for_status() 

1586 except requests.RequestException as err: 

1587 msg = f"Could not load subscription {subscription_id}" 

1588 raise BaseHttpClientException(message=msg, response=err.response) from err 

1589 

1590 def update_subscription( 

1591 self, subscription: Subscription, skip_initial_notification: bool = False 

1592 ): 

1593 """ 

1594 Only the fields included in the request are updated in the subscription. 

1595 

1596 Args: 

1597 subscription: Subscription to update 

1598 skip_initial_notification: True - Initial Notifications will be 

1599 sent to recipient containing the whole data. This is 

1600 deprecated and removed from version 3.0 of the context broker. 

1601 False - skip the initial notification 

1602 

1603 Returns: 

1604 None 

1605 """ 

1606 params = {} 

1607 if skip_initial_notification: 

1608 version = self.get_version()["orion"]["version"] 

1609 if parse_version(version) <= parse_version("3.1"): 

1610 params.update({"options": "skipInitialNotification"}) 

1611 else: 

1612 pass 

1613 warnings.warn( 

1614 f"Skip initial notifications is a deprecated " 

1615 f"feature of older versions <3.1 of the context " 

1616 f"broker. The Context Broker that you requesting has " 

1617 f"version: {version}. For newer versions we " 

1618 f"automatically skip this option. Consider " 

1619 f"refactoring and updating your services", 

1620 DeprecationWarning, 

1621 ) 

1622 

1623 url = urljoin( 

1624 self.base_url, f"{self._url_version}/subscriptions/{subscription.id}" 

1625 ) 

1626 headers = self.headers.copy() 

1627 headers.update({"Content-Type": "application/json"}) 

1628 try: 

1629 res = self.patch( 

1630 url=url, 

1631 headers=headers, 

1632 data=subscription.model_dump_json( 

1633 exclude={ 

1634 "id": True, 

1635 "notification": { 

1636 "lastSuccess", 

1637 "lastFailure", 

1638 "lastSuccessCode", 

1639 "lastFailureReason", 

1640 }, 

1641 }, 

1642 exclude_none=True, 

1643 ), 

1644 ) 

1645 if res.ok: 

1646 self.logger.info("Subscription successfully updated!") 

1647 else: 

1648 res.raise_for_status() 

1649 except requests.RequestException as err: 

1650 msg = f"Could not update subscription {subscription.id}" 

1651 raise BaseHttpClientException(message=msg, response=err.response) from err 

1652 

1653 def delete_subscription(self, subscription_id: str) -> None: 

1654 """ 

1655 Deletes a subscription from a Context Broker 

1656 Args: 

1657 subscription_id: id of the subscription 

1658 """ 

1659 url = urljoin( 

1660 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}" 

1661 ) 

1662 headers = self.headers.copy() 

1663 try: 

1664 res = self.delete(url=url, headers=headers) 

1665 if res.ok: 

1666 self.logger.info( 

1667 f"Subscription '{subscription_id}' " f"successfully deleted!" 

1668 ) 

1669 else: 

1670 res.raise_for_status() 

1671 except requests.RequestException as err: 

1672 msg = f"Could not delete subscription {subscription_id}" 

1673 raise BaseHttpClientException(message=msg, response=err.response) from err 

1674 

1675 # Registration API 

1676 def get_registration_list(self, *, limit: PositiveInt = None) -> List[Registration]: 

1677 """ 

1678 Lists all the context provider registrations present in the system. 

1679 

1680 Args: 

1681 limit: Limit the number of registrations to be retrieved 

1682 Returns: 

1683 

1684 """ 

1685 url = urljoin(self.base_url, f"{self._url_version}/registrations/") 

1686 headers = self.headers.copy() 

1687 params = {} 

1688 

1689 # We always use the 'count' option to check weather pagination is 

1690 # required 

1691 params.update({"options": "count"}) 

1692 try: 

1693 items = self.__pagination( 

1694 limit=limit, url=url, params=params, headers=headers 

1695 ) 

1696 adapter = TypeAdapter(List[Registration]) 

1697 return adapter.validate_python(items) 

1698 except requests.RequestException as err: 

1699 msg = "Could not load registrations!" 

1700 raise BaseHttpClientException(message=msg, response=err.response) from err 

1701 

1702 def post_registration(self, registration: Registration): 

1703 """ 

1704 Creates a new context provider registration. This is typically used 

1705 for binding context sources as providers of certain data. The 

1706 registration is represented by cb.models.Registration 

1707 

1708 Args: 

1709 registration (Registration): 

1710 

1711 Returns: 

1712 

1713 """ 

1714 url = urljoin(self.base_url, f"{self._url_version}/registrations") 

1715 headers = self.headers.copy() 

1716 headers.update({"Content-Type": "application/json"}) 

1717 try: 

1718 res = self.post( 

1719 url=url, 

1720 headers=headers, 

1721 data=registration.model_dump_json(exclude={"id"}, exclude_none=True), 

1722 ) 

1723 if res.ok: 

1724 self.logger.info("Registration successfully created!") 

1725 return res.headers["Location"].split("/")[-1] 

1726 res.raise_for_status() 

1727 except requests.RequestException as err: 

1728 msg = f"Could not send registration {registration.id}!" 

1729 raise BaseHttpClientException(message=msg, response=err.response) from err 

1730 

1731 def get_registration(self, registration_id: str) -> Registration: 

1732 """ 

1733 Retrieves a registration from context broker by id 

1734 

1735 Args: 

1736 registration_id: id of the registration 

1737 

1738 Returns: 

1739 Registration 

1740 """ 

1741 url = urljoin( 

1742 self.base_url, f"{self._url_version}/registrations/{registration_id}" 

1743 ) 

1744 headers = self.headers.copy() 

1745 try: 

1746 res = self.get(url=url, headers=headers) 

1747 if res.ok: 

1748 self.logger.debug("Received: %s", res.json()) 

1749 return Registration(**res.json()) 

1750 res.raise_for_status() 

1751 except requests.RequestException as err: 

1752 msg = f"Could not load registration {registration_id} !" 

1753 raise BaseHttpClientException(message=msg, response=err.response) from err 

1754 

1755 def add_valid_relationships( 

1756 self, entities: List[ContextEntity] 

1757 ) -> List[ContextEntity]: 

1758 """ 

1759 Validate all attributes in the given entities. If the attribute value points to 

1760 an existing entity, it is assumed that this attribute is a relationship, and it 

1761 will be assigned with the attribute type "relationship" 

1762 

1763 Args: 

1764 entities: list of entities that need to be validated. 

1765 

1766 Returns: 

1767 updated entities 

1768 """ 

1769 updated_entities = [] 

1770 for entity in entities[:]: 

1771 for attr_name, attr_value in entity.model_dump( 

1772 exclude={"id", "type"} 

1773 ).items(): 

1774 if isinstance(attr_value, dict): 

1775 if self.validate_relationship(attr_value): 

1776 entity.update_attribute( 

1777 { 

1778 attr_name: ContextAttribute( 

1779 **{ 

1780 "type": DataType.RELATIONSHIP, 

1781 "value": attr_value.get("value"), 

1782 } 

1783 ) 

1784 } 

1785 ) 

1786 updated_entities.append(entity) 

1787 return updated_entities 

1788 

1789 def remove_invalid_relationships( 

1790 self, entities: List[ContextEntity], hard_remove: bool = True 

1791 ) -> List[ContextEntity]: 

1792 """ 

1793 Removes invalid relationships from the entities. An invalid relationship 

1794 is a relationship that has no destination entity. 

1795 

1796 Args: 

1797 entities: list of entities that need to be validated. 

1798 hard_remove: If True, invalid relationships will be deleted. 

1799 If False, invalid relationships will be changed to Text 

1800 attributes. 

1801 

1802 Returns: 

1803 updated entities 

1804 """ 

1805 updated_entities = [] 

1806 for entity in entities[:]: 

1807 for relationship in entity.get_relationships(): 

1808 if not self.validate_relationship(relationship): 

1809 if hard_remove: 

1810 entity.delete_attributes(attrs=[relationship]) 

1811 else: 

1812 # change the attribute type to "Text" 

1813 entity.update_attribute( 

1814 attrs=[ 

1815 NamedContextAttribute( 

1816 name=relationship.name, 

1817 type=DataType.TEXT, 

1818 value=relationship.value, 

1819 ) 

1820 ] 

1821 ) 

1822 updated_entities.append(entity) 

1823 return updated_entities 

1824 

1825 def validate_relationship( 

1826 self, relationship: Union[NamedContextAttribute, ContextAttribute, Dict] 

1827 ) -> bool: 

1828 """ 

1829 Validates a relationship. A relationship is valid if it points to an existing 

1830 entity. Otherwise, it is considered invalid 

1831 

1832 Args: 

1833 relationship: relationship to validate 

1834 Returns 

1835 True if the relationship is valid, False otherwise 

1836 """ 

1837 if isinstance(relationship, NamedContextAttribute) or isinstance( 

1838 relationship, ContextAttribute 

1839 ): 

1840 destination_id = relationship.value 

1841 elif isinstance(relationship, dict): 

1842 _sentinel = object() 

1843 destination_id = relationship.get("value", _sentinel) 

1844 if destination_id is _sentinel: 

1845 raise ValueError( 

1846 "Invalid relationship dictionary format\n" 

1847 "Expected format: {" 

1848 f'"type": "{DataType.RELATIONSHIP.value}", ' 

1849 '"value" "entity_id"}' 

1850 ) 

1851 else: 

1852 raise ValueError("Invalid relationship type.") 

1853 try: 

1854 destination_entity = self.get_entity(entity_id=destination_id) 

1855 return destination_entity.id == destination_id 

1856 except requests.RequestException as err: 

1857 if err.response.status_code == 404: 

1858 return False 

1859 

1860 def update_registration(self, registration: Registration): 

1861 """ 

1862 Only the fields included in the request are updated in the registration. 

1863 

1864 Args: 

1865 registration: Registration to update 

1866 Returns: 

1867 

1868 """ 

1869 url = urljoin( 

1870 self.base_url, f"{self._url_version}/registrations/{registration.id}" 

1871 ) 

1872 headers = self.headers.copy() 

1873 headers.update({"Content-Type": "application/json"}) 

1874 try: 

1875 res = self.patch( 

1876 url=url, 

1877 headers=headers, 

1878 data=registration.model_dump_json(exclude={"id"}, exclude_none=True), 

1879 ) 

1880 if res.ok: 

1881 self.logger.info("Registration successfully updated!") 

1882 else: 

1883 res.raise_for_status() 

1884 except requests.RequestException as err: 

1885 msg = f"Could not update registration {registration.id} !" 

1886 raise BaseHttpClientException(message=msg, response=err.response) from err 

1887 

1888 def delete_registration(self, registration_id: str) -> None: 

1889 """ 

1890 Deletes a subscription from a Context Broker 

1891 Args: 

1892 registration_id: id of the subscription 

1893 """ 

1894 url = urljoin( 

1895 self.base_url, f"{self._url_version}/registrations/{registration_id}" 

1896 ) 

1897 headers = self.headers.copy() 

1898 try: 

1899 res = self.delete(url=url, headers=headers) 

1900 if res.ok: 

1901 self.logger.info( 

1902 "Registration '%s' " "successfully deleted!", registration_id 

1903 ) 

1904 res.raise_for_status() 

1905 except requests.RequestException as err: 

1906 msg = f"Could not delete registration {registration_id} !" 

1907 raise BaseHttpClientException(message=msg, response=err.response) from err 

1908 

1909 # Batch operation API 

1910 def update( 

1911 self, 

1912 *, 

1913 entities: List[Union[ContextEntity, ContextEntityKeyValues]], 

1914 action_type: Union[ActionType, str], 

1915 update_format: str = None, 

1916 forcedUpdate: bool = False, 

1917 override_metadata: bool = False, 

1918 ) -> None: 

1919 """ 

1920 This operation allows to create, update and/or delete several entities 

1921 in a single batch operation. 

1922 

1923 This operation is split in as many individual operations as entities 

1924 in the entities vector, so the actionType is executed for each one of 

1925 them. Depending on the actionType, a mapping with regular non-batch 

1926 operations can be done: 

1927 

1928 append: maps to POST /v2/entities (if the entity does not already exist) 

1929 or POST /v2/entities/<id>/attrs (if the entity already exists). 

1930 

1931 appendStrict: maps to POST /v2/entities (if the entity does not 

1932 already exist) or POST /v2/entities/<id>/attrs?options=append (if the 

1933 entity already exists). 

1934 

1935 update: maps to PATCH /v2/entities/<id>/attrs. 

1936 

1937 delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every 

1938 attribute included in the entity or to DELETE /v2/entities/<id> if 

1939 no attribute were included in the entity. 

1940 

1941 replace: maps to PUT /v2/entities/<id>/attrs. 

1942 

1943 Args: 

1944 entities: "an array of entities, each entity specified using the " 

1945 "JSON entity representation format " 

1946 action_type (Update): "actionType, to specify the kind of update 

1947 action to do: either append, appendStrict, update, delete, 

1948 or replace. " 

1949 update_format (str): Optional 'keyValues' 

1950 forcedUpdate: Update operation have to trigger any matching 

1951 subscription, no matter if there is an actual attribute 

1952 update or no instead of the default behavior, which is to 

1953 updated only if attribute is effectively updated. 

1954 override_metadata: 

1955 Bool, replace the existing metadata with the one provided in 

1956 the request 

1957 Returns: 

1958 

1959 """ 

1960 

1961 url = urljoin(self.base_url, f"{self._url_version}/op/update") 

1962 headers = self.headers.copy() 

1963 headers.update({"Content-Type": "application/json"}) 

1964 params = {} 

1965 options = [] 

1966 if override_metadata: 

1967 options.append("overrideMetadata") 

1968 if forcedUpdate: 

1969 options.append("forcedUpdate") 

1970 if update_format: 

1971 assert ( 

1972 update_format == AttrsFormat.KEY_VALUES.value 

1973 ), "Only 'keyValues' is allowed as update format" 

1974 options.append("keyValues") 

1975 if options: 

1976 params.update({"options": ",".join(options)}) 

1977 update = Update(actionType=action_type, entities=entities) 

1978 try: 

1979 res = self.post( 

1980 url=url, 

1981 headers=headers, 

1982 params=params, 

1983 json=update.model_dump(by_alias=True), 

1984 ) 

1985 if res.ok: 

1986 self.logger.info("Update operation '%s' succeeded!", action_type) 

1987 else: 

1988 res.raise_for_status() 

1989 except requests.RequestException as err: 

1990 msg = f"Update operation '{action_type}' failed!" 

1991 raise BaseHttpClientException(message=msg, response=err.response) from err 

1992 

1993 def query( 

1994 self, 

1995 *, 

1996 query: Query, 

1997 limit: PositiveInt = None, 

1998 order_by: str = None, 

1999 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

2000 ) -> List[Any]: 

2001 """ 

2002 Generate api query 

2003 Args: 

2004 query (Query): 

2005 limit (PositiveInt): 

2006 order_by (str): 

2007 response_format (AttrsFormat, str): 

2008 Returns: 

2009 The response payload is an Array containing one object per matching 

2010 entity, or an empty array [] if no entities are found. The entities 

2011 follow the JSON entity representation format (described in the 

2012 section "JSON Entity Representation"). 

2013 """ 

2014 url = urljoin(self.base_url, f"{self._url_version}/op/query") 

2015 headers = self.headers.copy() 

2016 headers.update({"Content-Type": "application/json"}) 

2017 params = {"options": "count"} 

2018 

2019 if response_format: 

2020 if response_format not in list(AttrsFormat): 

2021 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

2022 params["options"] = ",".join([response_format, "count"]) 

2023 try: 

2024 items = self.__pagination( 

2025 method=PaginationMethod.POST, 

2026 url=url, 

2027 headers=headers, 

2028 params=params, 

2029 data=query.model_dump_json(exclude_none=True), 

2030 limit=limit, 

2031 ) 

2032 if response_format == AttrsFormat.NORMALIZED: 

2033 adapter = TypeAdapter(List[ContextEntity]) 

2034 return adapter.validate_python(items) 

2035 if response_format == AttrsFormat.KEY_VALUES: 

2036 adapter = TypeAdapter(List[ContextEntityKeyValues]) 

2037 return adapter.validate_python(items) 

2038 return items 

2039 except requests.RequestException as err: 

2040 msg = "Query operation failed!" 

2041 raise BaseHttpClientException(message=msg, response=err.response) from err 

2042 

2043 def notify(self, message: Message) -> None: 

2044 """ 

2045 This operation is intended to consume a notification payload so that 

2046 all the entity data included by such notification is persisted, 

2047 overwriting if necessary. This operation is useful when one NGSIv2 

2048 endpoint is subscribed to another NGSIv2 endpoint (federation 

2049 scenarios). The request payload must be an NGSIv2 notification 

2050 payload. The behaviour must be exactly the same as 'update' 

2051 with 'action_type' equal to append. 

2052 

2053 Args: 

2054 message: Notification message 

2055 

2056 Returns: 

2057 None 

2058 """ 

2059 url = urljoin(self.base_url, "v2/op/notify") 

2060 headers = self.headers.copy() 

2061 headers.update({"Content-Type": "application/json"}) 

2062 params = {} 

2063 try: 

2064 res = self.post( 

2065 url=url, 

2066 headers=headers, 

2067 params=params, 

2068 data=message.model_dump_json(by_alias=True), 

2069 ) 

2070 if res.ok: 

2071 self.logger.info("Notification message sent!") 

2072 else: 

2073 res.raise_for_status() 

2074 except requests.RequestException as err: 

2075 msg = ( 

2076 f"Sending notifcation message failed! \n " 

2077 f"{message.model_dump_json(indent=2)}" 

2078 ) 

2079 raise BaseHttpClientException(message=msg, response=err.response) from err 

2080 

2081 def post_command( 

2082 self, 

2083 *, 

2084 entity_id: str, 

2085 command: Union[Command, NamedCommand, Dict], 

2086 entity_type: str = None, 

2087 command_name: str = None, 

2088 ) -> None: 

2089 """ 

2090 Post a command to a context entity this corresponds to 'PATCH' of the 

2091 specified command attribute. 

2092 

2093 Args: 

2094 entity_id: Entity identifier 

2095 command: Command 

2096 entity_type: Entity type 

2097 command_name: Name of the command in the entity 

2098 

2099 Returns: 

2100 None 

2101 """ 

2102 if command_name: 

2103 assert isinstance(command, (Command, dict)) 

2104 if isinstance(command, dict): 

2105 command = Command(**command) 

2106 command = {command_name: command.model_dump()} 

2107 else: 

2108 assert isinstance(command, (NamedCommand, dict)) 

2109 if isinstance(command, dict): 

2110 command = NamedCommand(**command) 

2111 

2112 self.update_existing_entity_attributes( 

2113 entity_id=entity_id, entity_type=entity_type, attrs=[command] 

2114 ) 

2115 

2116 def does_entity_exist(self, entity_id: str, entity_type: str) -> bool: 

2117 """ 

2118 Test if an entity with given id and type is present in the CB 

2119 

2120 Args: 

2121 entity_id: Entity id 

2122 entity_type: Entity type 

2123 

2124 Returns: 

2125 bool; True if entity exists 

2126 

2127 Raises: 

2128 RequestException, if any error occurs (e.g: No Connection), 

2129 except that the entity is not found 

2130 """ 

2131 url = urljoin(self.base_url, f"v2/entities/{entity_id}") 

2132 headers = self.headers.copy() 

2133 params = {"type": entity_type} 

2134 

2135 try: 

2136 res = self.get(url=url, params=params, headers=headers) 

2137 if res.ok: 

2138 return True 

2139 res.raise_for_status() 

2140 except requests.RequestException as err: 

2141 if err.response is None or not err.response.status_code == 404: 

2142 self.log_error(err=err, msg="Checking entity existence failed!") 

2143 raise 

2144 return False 

2145 

2146 def patch_entity( 

2147 self, 

2148 entity: Union[ContextEntity, ContextEntityKeyValues], 

2149 key_values: bool = False, 

2150 forcedUpdate: bool = False, 

2151 override_metadata: bool = False, 

2152 ) -> None: 

2153 """ 

2154 Takes a given entity and updates the state in the CB to match it. 

2155 It is an extended equivalent to the HTTP method PATCH, which applies 

2156 partial modifications to a resource. 

2157 

2158 Args: 

2159 entity: Entity to update 

2160 key_values: If True, the entity is updated in key-values format. 

2161 forcedUpdate: Update operation have to trigger any matching 

2162 subscription, no matter if there is an actual attribute 

2163 update or no instead of the default behavior, which is to 

2164 updated only if attribute is effectively updated. 

2165 override_metadata: If True, the existing metadata of the entity 

2166 is replaced 

2167 Returns: 

2168 None 

2169 """ 

2170 attributes = entity.get_attributes() 

2171 

2172 self.update_existing_entity_attributes( 

2173 entity_id=entity.id, 

2174 entity_type=entity.type, 

2175 attrs=attributes, 

2176 key_values=key_values, 

2177 forcedUpdate=forcedUpdate, 

2178 override_metadata=override_metadata, 

2179 ) 

2180 

2181 def _subscription_dicts_are_equal(self, first: dict, second: dict): 

2182 """ 

2183 Check if two dictionaries and all sub-dictionaries are equal. 

2184 Logs a warning if the keys are not equal, but ignores the 

2185 comparison of such keys. 

2186 

2187 Args: 

2188 first dict: Dictionary of first subscription 

2189 second dict: Dictionary of second subscription 

2190 

2191 Returns: 

2192 True if equal, else False 

2193 """ 

2194 

2195 def _value_is_not_none(value): 

2196 """ 

2197 Recursive function to check if a value equals none. 

2198 If the value is a dict and any value of the dict is not none, 

2199 the value is not none. 

2200 If the value is a list and any item is not none, the value is not none. 

2201 If it's neither dict nore list, bool is used. 

2202 """ 

2203 if isinstance(value, dict): 

2204 return any([_value_is_not_none(value=_v) for _v in value.values()]) 

2205 if isinstance(value, list): 

2206 return any([_value_is_not_none(value=_v) for _v in value]) 

2207 else: 

2208 return bool(value) 

2209 

2210 if first.keys() != second.keys(): 

2211 warnings.warn( 

2212 "Subscriptions contain a different set of fields. " 

2213 "Only comparing to new fields of the new one." 

2214 ) 

2215 for k, v in first.items(): 

2216 ex_value = second.get(k, None) 

2217 if isinstance(v, dict) and isinstance(ex_value, dict): 

2218 equal = self._subscription_dicts_are_equal(v, ex_value) 

2219 if equal: 

2220 continue 

2221 else: 

2222 return False 

2223 if v != ex_value: 

2224 self.logger.debug(f"Not equal fields for key {k}: ({v}, {ex_value})") 

2225 if ( 

2226 not _value_is_not_none(v) 

2227 and not _value_is_not_none(ex_value) 

2228 or k == "timesSent" 

2229 ): 

2230 continue 

2231 return False 

2232 return True 

2233 

2234 

2235# 

2236# 

2237# def check_duplicate_subscription(self, subscription_body, limit: int = 20): 

2238# """ 

2239# Function compares the subject of the subscription body, on whether a subscription 

2240# already exists for a device / entity. 

2241# :param subscription_body: the body of the new subscripton 

2242# :param limit: pagination parameter, to set the number of 

2243# subscriptions bodies the get request should grab 

2244# :return: exists, boolean -> True, if such a subscription allready 

2245# exists 

2246# """ 

2247# exists = False 

2248# subscription_subject = json.loads(subscription_body)["subject"] 

2249# # Exact keys depend on subscription body 

2250# try: 

2251# subscription_url = json.loads(subscription_body)[ 

2252# "notification"]["httpCustom"]["url"] 

2253# except KeyError: 

2254# subscription_url = json.loads(subscription_body)[ 

2255# "notification"]["http"]["url"] 

2256# 

2257# # If the number of subscriptions is larger then the limit, 

2258# paginations methods have to be used 

2259# url = self.url + '/v2/subscriptions?limit=' + str(limit) + 

2260# '&options=count' 

2261# response = self.session.get(url, headers=self.get_header()) 

2262# 

2263# sub_count = float(response.headers["Fiware-Total-Count"]) 

2264# response = json.loads(response.text) 

2265# if sub_count >= limit: 

2266# response = self.get_pagination(url=url, headers=self.get_header(), 

2267# limit=limit, count=sub_count) 

2268# response = json.loads(response) 

2269# 

2270# for existing_subscription in response: 

2271# # check whether the exact same subscriptions already exists 

2272# if existing_subscription["subject"] == subscription_subject: 

2273# exists = True 

2274# break 

2275# try: 

2276# existing_url = existing_subscription["notification"][ 

2277# "http"]["url"] 

2278# except KeyError: 

2279# existing_url = existing_subscription["notification"][ 

2280# "httpCustom"]["url"] 

2281# # check whether both subscriptions notify to the same path 

2282# if existing_url != subscription_url: 

2283# continue 

2284# else: 

2285# # iterate over all entities included in the subscription object 

2286# for entity in subscription_subject["entities"]: 

2287# if 'type' in entity.keys(): 

2288# subscription_type = entity['type'] 

2289# else: 

2290# subscription_type = entity['typePattern'] 

2291# if 'id' in entity.keys(): 

2292# subscription_id = entity['id'] 

2293# else: 

2294# subscription_id = entity["idPattern"] 

2295# # iterate over all entities included in the exisiting 

2296# subscriptions 

2297# for existing_entity in existing_subscription["subject"][ 

2298# "entities"]: 

2299# if "type" in entity.keys(): 

2300# type_existing = entity["type"] 

2301# else: 

2302# type_existing = entity["typePattern"] 

2303# if "id" in entity.keys(): 

2304# id_existing = entity["id"] 

2305# else: 

2306# id_existing = entity["idPattern"] 

2307# # as the ID field is non optional, it has to match 

2308# # check whether the type match 

2309# # if the type field is empty, they match all types 

2310# if (type_existing == subscription_type) or\ 

2311# ('*' in subscription_type) or \ 

2312# ('*' in type_existing)\ 

2313# or (type_existing == "") or ( 

2314# subscription_type == ""): 

2315# # check if on of the subscriptions is a pattern, 

2316# or if they both refer to the same id 

2317# # Get the attrs first, to avoid code duplication 

2318# # last thing to compare is the attributes 

2319# # Assumption -> position is the same as the 

2320# entities _list 

2321# # i == j 

2322# i = subscription_subject["entities"].index(entity) 

2323# j = existing_subscription["subject"][ 

2324# "entities"].index(existing_entity) 

2325# try: 

2326# subscription_attrs = subscription_subject[ 

2327# "condition"]["attrs"][i] 

2328# except (KeyError, IndexError): 

2329# subscription_attrs = [] 

2330# try: 

2331# existing_attrs = existing_subscription[ 

2332# "subject"]["condition"]["attrs"][j] 

2333# except (KeyError, IndexError): 

2334# existing_attrs = [] 

2335# 

2336# if (".*" in subscription_id) or ('.*' in 

2337# id_existing) or (subscription_id == id_existing): 

2338# # Attributes have to match, or the have to 

2339# be an empty array 

2340# if (subscription_attrs == existing_attrs) or 

2341# (subscription_attrs == []) or (existing_attrs == []): 

2342# exists = True 

2343# # if they do not match completely or subscribe 

2344# to all ids they have to match up to a certain position 

2345# elif ("*" in subscription_id) or ('*' in 

2346# id_existing): 

2347# regex_existing = id_existing.find('*') 

2348# regex_subscription = 

2349# subscription_id.find('*') 

2350# # slice the strings to compare 

2351# if (id_existing[:regex_existing] in 

2352# subscription_id) or (subscription_id[:regex_subscription] in id_existing) or \ 

2353# (id_existing[regex_existing:] in 

2354# subscription_id) or (subscription_id[regex_subscription:] in id_existing): 

2355# if (subscription_attrs == 

2356# existing_attrs) or (subscription_attrs == []) or (existing_attrs == []): 

2357# exists = True 

2358# else: 

2359# continue 

2360# else: 

2361# continue 

2362# else: 

2363# continue 

2364# else: 

2365# continue 

2366# else: 

2367# continue 

2368# return exists 

2369#