Coverage for filip/clients/ngsi_v2/cb.py: 80%

680 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-05 11:07 +0000

1""" 

2Context Broker Module for API Client 

3""" 

4 

5from __future__ import annotations 

6 

7import copy 

8from copy import deepcopy 

9from enum import Enum 

10from math import inf 

11from packaging.version import parse as parse_version 

12from pydantic import PositiveInt, PositiveFloat, AnyHttpUrl, ValidationError 

13from pydantic.type_adapter import TypeAdapter 

14from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union 

15import re 

16import requests 

17from urllib.parse import urljoin 

18import warnings 

19from requests import RequestException 

20from filip.clients.base_http_client import BaseHttpClient, NgsiURLVersion 

21from filip.config import settings 

22from filip.models.base import FiwareHeader, PaginationMethod, DataType 

23from filip.utils.simple_ql import QueryString 

24from filip.models.ngsi_v2.context import ( 

25 ActionType, 

26 Command, 

27 ContextEntity, 

28 ContextEntityKeyValues, 

29 ContextAttribute, 

30 NamedCommand, 

31 NamedContextAttribute, 

32 Query, 

33 Update, 

34 PropertyFormat, 

35 ContextEntityList, 

36 ContextEntityKeyValuesList, 

37 ContextEntityValidationList, 

38 ContextEntityKeyValuesValidationList, 

39) 

40from filip.models.ngsi_v2.base import AttrsFormat 

41from filip.models.ngsi_v2.subscriptions import Subscription, Message 

42from filip.models.ngsi_v2.registrations import Registration 

43from filip.clients.exceptions import BaseHttpClientException 

44 

45if TYPE_CHECKING: 

46 from filip.clients.ngsi_v2.iota import IoTAClient 

47 

48 

49class ContextBrokerClient(BaseHttpClient): 

50 """ 

51 Implementation of NGSI Context Broker functionalities, such as creating 

52 entities and subscriptions; retrieving, updating and deleting data. 

53 Further documentation: 

54 https://fiware-orion.readthedocs.io/en/master/ 

55 

56 Api specifications for v2 are located here: 

57 https://telefonicaid.github.io/fiware-orion/api/v2/stable/ 

58 

59 Note: 

60 We use the reference implementation for development. Therefore, some 

61 other brokers may show slightly different behavior! 

62 """ 

63 

64 def __init__( 

65 self, 

66 url: str = None, 

67 *, 

68 session: requests.Session = None, 

69 fiware_header: FiwareHeader = None, 

70 **kwargs, 

71 ): 

72 """ 

73 

74 Args: 

75 url: Url of context broker server 

76 session (requests.Session): 

77 fiware_header (FiwareHeader): fiware service and fiware service path 

78 **kwargs (Optional): Optional arguments that ``request`` takes. 

79 """ 

80 # set service url 

81 url = url or settings.CB_URL 

82 self._url_version = NgsiURLVersion.v2_url.value 

83 super().__init__( 

84 url=url, session=session, fiware_header=fiware_header, **kwargs 

85 ) 

86 

87 def __pagination( 

88 self, 

89 *, 

90 method: PaginationMethod = PaginationMethod.GET, 

91 url: str, 

92 headers: Dict, 

93 limit: Union[PositiveInt, PositiveFloat] = None, 

94 params: Dict = None, 

95 data: str = None, 

96 ) -> List[Dict]: 

97 """ 

98 NGSIv2 implements a pagination mechanism in order to help clients to 

99 retrieve large sets of resources. This mechanism works for all listing 

100 operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions, 

101 POST /v2/op/query, etc.). This function helps getting datasets that are 

102 larger than the limit for the different GET operations. 

103 

104 https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html 

105 

106 Args: 

107 url: Information about the url, obtained from the original function 

108 headers: The headers from the original function 

109 params: 

110 limit: 

111 

112 Returns: 

113 object: 

114 

115 """ 

116 if limit is None: 

117 limit = inf 

118 if limit > 1000: 

119 params["limit"] = 1000 # maximum items per request 

120 else: 

121 params["limit"] = limit 

122 

123 if self.session: 

124 session = self.session 

125 else: 

126 session = requests.Session() 

127 with session: 

128 res = session.request( 

129 method=method, url=url, params=params, headers=headers, data=data 

130 ) 

131 if res.ok: 

132 items = res.json() 

133 # do pagination 

134 count = int(res.headers["Fiware-Total-Count"]) 

135 

136 while len(items) < limit and len(items) < count: 

137 # Establishing the offset from where entities are retrieved 

138 params["offset"] = len(items) 

139 params["limit"] = min(1000, (limit - len(items))) 

140 res = session.request( 

141 method=method, 

142 url=url, 

143 params=params, 

144 headers=headers, 

145 data=data, 

146 ) 

147 if res.ok: 

148 items.extend(res.json()) 

149 else: 

150 res.raise_for_status() 

151 self.logger.debug("Received: %s", items) 

152 return items 

153 res.raise_for_status() 

154 

155 # MANAGEMENT API 

156 def get_version(self) -> Dict: 

157 """ 

158 Gets version of IoT Agent 

159 Returns: 

160 Dictionary with response 

161 """ 

162 url = urljoin(self.base_url, "version") 

163 try: 

164 res = self.get(url=url, headers=self.headers) 

165 if res.ok: 

166 return res.json() 

167 res.raise_for_status() 

168 except requests.RequestException as err: 

169 self.logger.error(err) 

170 raise RequestException(response=err.response) from err 

171 

172 def get_resources(self) -> Dict: 

173 """ 

174 Gets reo 

175 

176 Returns: 

177 Dict 

178 """ 

179 url = urljoin(self.base_url, self._url_version) 

180 try: 

181 res = self.get(url=url, headers=self.headers) 

182 if res.ok: 

183 return res.json() 

184 res.raise_for_status() 

185 except requests.RequestException as err: 

186 self.logger.error(err) 

187 raise RequestException(response=err.response) from err 

188 

189 # STATISTICS API 

190 def get_statistics(self) -> Dict: 

191 """ 

192 Gets statistics of context broker 

193 Returns: 

194 Dictionary with response 

195 """ 

196 url = urljoin(self.base_url, "statistics") 

197 try: 

198 res = self.get(url=url, headers=self.headers) 

199 if res.ok: 

200 return res.json() 

201 res.raise_for_status() 

202 except requests.RequestException as err: 

203 self.logger.error(err) 

204 raise BaseHttpClientException( 

205 message=err.response.text, response=err.response 

206 ) from err 

207 

208 # CONTEXT MANAGEMENT API ENDPOINTS 

209 # Entity Operations 

210 def post_entity( 

211 self, 

212 entity: Union[ContextEntity, ContextEntityKeyValues], 

213 update: bool = False, 

214 patch: bool = False, 

215 override_metadata: bool = True, 

216 key_values: bool = False, 

217 ): 

218 """ 

219 Function registers an Object with the NGSI Context Broker, 

220 if it already exists it can be automatically updated (overwritten) 

221 if the update bool is True. 

222 First a post request with the entity is tried, if the response code 

223 is 422 the entity is uncrossable, as it already exists there are two 

224 options, either overwrite it, if the attribute have changed 

225 (e.g. at least one new/new values) (update = True) or leave 

226 it the way it is (update=False) 

227 If you only want to manipulate the entities values, you need to set 

228 patch argument. 

229 

230 Args: 

231 entity (ContextEntity/ContextEntityKeyValues): 

232 Context Entity Object 

233 update (bool): 

234 If the response.status_code is 422, whether the override and 

235 existing entity 

236 patch (bool): 

237 If the response.status_code is 422, whether to manipulate the 

238 existing entity. Omitted if update `True`. 

239 override_metadata: 

240 Only applies for patch equal to `True`. 

241 Whether to override or append the attribute's metadata. 

242 `True` for overwrite or `False` for update/append 

243 key_values(bool): 

244 By default False. If set to True, "options=keyValues" will 

245 be included in params of post request. The payload uses 

246 the keyValues simplified entity representation, i.e. 

247 ContextEntityKeyValues. 

248 """ 

249 url = urljoin(self.base_url, f"{self._url_version}/entities") 

250 headers = self.headers.copy() 

251 params = {} 

252 options = [] 

253 if key_values: 

254 assert isinstance(entity, ContextEntityKeyValues) 

255 options.append("keyValues") 

256 else: 

257 assert isinstance(entity, ContextEntity) 

258 if options: 

259 params.update({"options": ",".join(options)}) 

260 try: 

261 res = self.post( 

262 url=url, 

263 headers=headers, 

264 json=entity.model_dump(exclude_none=True), 

265 params=params, 

266 ) 

267 if res.ok: 

268 self.logger.info("Entity successfully posted!") 

269 return res.headers.get("Location") 

270 res.raise_for_status() 

271 except requests.RequestException as err: 

272 if err.response is not None: 

273 if update and err.response.status_code == 422: 

274 return self.override_entity(entity=entity, key_values=key_values) 

275 if patch and err.response.status_code == 422: 

276 return self.patch_entity( 

277 entity=entity, 

278 override_metadata=override_metadata, 

279 key_values=key_values, 

280 ) 

281 msg = f"Could not post entity {entity.id}" 

282 raise BaseHttpClientException(message=msg, response=err.response) from err 

283 

284 def get_entity_list( 

285 self, 

286 *, 

287 entity_ids: List[str] = None, 

288 entity_types: List[str] = None, 

289 id_pattern: str = None, 

290 type_pattern: str = None, 

291 q: Union[str, QueryString] = None, 

292 mq: Union[str, QueryString] = None, 

293 georel: str = None, 

294 geometry: str = None, 

295 coords: str = None, 

296 limit: PositiveInt = inf, 

297 attrs: List[str] = None, 

298 metadata: str = None, 

299 order_by: str = None, 

300 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

301 include_invalid: bool = False, 

302 ) -> Union[ 

303 List[Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]], 

304 ContextEntityValidationList, 

305 ContextEntityKeyValuesValidationList, 

306 ]: 

307 r""" 

308 Retrieves a list of context entities that match different criteria by 

309 id, type, pattern matching (either id or type) and/or those which 

310 match a query or geographical query (see Simple Query Language and 

311 Geographical Queries). A given entity has to match all the criteria 

312 to be retrieved (i.e., the criteria is combined in a logical AND 

313 way). Note that pattern matching query parameters are incompatible 

314 (i.e. mutually exclusive) with their corresponding exact matching 

315 parameters, i.e. idPattern with id and typePattern with type. 

316 

317 Args: 

318 entity_ids: A comma-separated list of elements. Retrieve entities 

319 whose ID matches one of the elements in the list. 

320 Incompatible with idPattern,e.g. Boe_Idarium 

321 entity_types: comma-separated list of elements. Retrieve entities 

322 whose type matches one of the elements in the list. 

323 Incompatible with typePattern. Example: Room. 

324 id_pattern: A correctly formatted regular expression. Retrieve 

325 entities whose ID matches the regular expression. Incompatible 

326 with id, e.g. ngsi-ld.* or sensor.* 

327 type_pattern: A correctly formatted regular expression. Retrieve 

328 entities whose type matches the regular expression. 

329 Incompatible with type, e.g. room.* 

330 q (SimpleQuery): A query expression, composed of a list of 

331 statements separated by ;, i.e., 

332 q=statement1;statement2;statement3. See Simple Query 

333 Language specification. Example: temperature>40. 

334 mq (SimpleQuery): A query expression for attribute metadata, 

335 composed of a list of statements separated by ;, i.e., 

336 mq=statement1;statement2;statement3. See Simple Query 

337 Language specification. Example: temperature.accuracy<0.9. 

338 georel: Spatial relationship between matching entities and a 

339 reference shape. See Geographical Queries. Example: 'near'. 

340 geometry: Geographical area to which the query is restricted. 

341 See Geographical Queries. Example: point. 

342 coords: List of latitude-longitude pairs of coordinates separated 

343 by ';'. See Geographical Queries. Example: 41.390205, 

344 2.154007;48.8566,2.3522. 

345 limit: Limits the number of entities to be retrieved Example: 20 

346 attrs: Comma-separated list of attribute names whose data are to 

347 be included in the response. The attributes are retrieved in 

348 the order specified by this parameter. If this parameter is 

349 not included, the attributes are retrieved in arbitrary 

350 order. See "Filtering out attributes and metadata" section 

351 for more detail. Example: seatNumber. 

352 metadata: A list of metadata names to include in the response. 

353 See "Filtering out attributes and metadata" section for more 

354 detail. Example: accuracy. 

355 order_by: Criteria for ordering results. See "Ordering Results" 

356 section for details. Example: temperature,!speed. 

357 response_format (AttrsFormat, str): Response Format. Note: That if 

358 'keyValues' or 'values' are used the response model will 

359 change to List[ContextEntityKeyValues] and to List[Dict[str, 

360 Any]], respectively. 

361 include_invalid: Specify if the returned list should also contain a list of invalid entity IDs or not. 

362 Returns: 

363 

364 """ 

365 url = urljoin(self.base_url, f"{self._url_version}/entities/") 

366 headers = self.headers.copy() 

367 params = {} 

368 

369 if entity_ids and id_pattern: 

370 raise ValueError 

371 if entity_types and type_pattern: 

372 raise ValueError 

373 if entity_ids: 

374 if not isinstance(entity_ids, list): 

375 entity_ids = [entity_ids] 

376 params.update({"id": ",".join(entity_ids)}) 

377 if id_pattern: 

378 try: 

379 re.compile(id_pattern) 

380 except re.error as err: 

381 raise ValueError(f"Invalid Pattern: {err}") from err 

382 params.update({"idPattern": id_pattern}) 

383 if entity_types: 

384 if not isinstance(entity_types, list): 

385 entity_types = [entity_types] 

386 params.update({"type": ",".join(entity_types)}) 

387 if type_pattern: 

388 try: 

389 re.compile(type_pattern) 

390 except re.error as err: 

391 raise ValueError(f"Invalid Pattern: {err.msg}") from err 

392 params.update({"typePattern": type_pattern}) 

393 if attrs: 

394 params.update({"attrs": ",".join(attrs)}) 

395 if metadata: 

396 params.update({"metadata": ",".join(metadata)}) 

397 if q: 

398 if isinstance(q, str): 

399 q = QueryString.parse_str(q) 

400 params.update({"q": str(q)}) 

401 if mq: 

402 params.update({"mq": str(mq)}) 

403 if geometry: 

404 params.update({"geometry": geometry}) 

405 if georel: 

406 params.update({"georel": georel}) 

407 if coords: 

408 params.update({"coords": coords}) 

409 if order_by: 

410 params.update({"orderBy": order_by}) 

411 if response_format not in list(AttrsFormat): 

412 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

413 response_format = ",".join(["count", response_format]) 

414 params.update({"options": response_format}) 

415 try: 

416 items = self.__pagination( 

417 method=PaginationMethod.GET, 

418 limit=limit, 

419 url=url, 

420 params=params, 

421 headers=headers, 

422 ) 

423 if include_invalid: 

424 valid_entities = [] 

425 invalid_entities = [] 

426 

427 if AttrsFormat.NORMALIZED in response_format: 

428 adapter = TypeAdapter(ContextEntity) 

429 

430 for entity in items: 

431 try: 

432 valid_entity = adapter.validate_python(entity) 

433 valid_entities.append(valid_entity) 

434 except ValidationError: 

435 invalid_entities.append(entity.get("id")) 

436 

437 return ContextEntityValidationList.model_validate( 

438 { 

439 "entities": valid_entities, 

440 "invalid_entities": invalid_entities, 

441 } 

442 ) 

443 elif AttrsFormat.KEY_VALUES in response_format: 

444 adapter = TypeAdapter(ContextEntityKeyValues) 

445 

446 for entity in items: 

447 try: 

448 valid_entity = adapter.validate_python(entity) 

449 valid_entities.append(valid_entity) 

450 except ValidationError: 

451 invalid_entities.append(entity.get("id")) 

452 

453 return ContextEntityKeyValuesValidationList.model_validate( 

454 { 

455 "entities": valid_entities, 

456 "invalid_entities": invalid_entities, 

457 } 

458 ) 

459 else: 

460 return items 

461 else: 

462 if AttrsFormat.NORMALIZED in response_format: 

463 return ContextEntityList.model_validate( 

464 {"entities": items} 

465 ).entities 

466 elif AttrsFormat.KEY_VALUES in response_format: 

467 return ContextEntityKeyValuesList.model_validate( 

468 {"entities": items} 

469 ).entities 

470 return items # in case of VALUES as response_format 

471 

472 except requests.RequestException as err: 

473 msg = "Could not load entities" 

474 raise BaseHttpClientException(message=msg, response=err.response) from err 

475 

476 def get_entity( 

477 self, 

478 entity_id: str, 

479 entity_type: str = None, 

480 attrs: List[str] = None, 

481 metadata: List[str] = None, 

482 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

483 ) -> Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]: 

484 """ 

485 This operation must return one entity element only, but there may be 

486 more than one entity with the same ID (e.g. entities with same ID but 

487 different types). In such case, an error message is returned, with 

488 the HTTP status code set to 409 Conflict. 

489 

490 Args: 

491 entity_id (String): Id of the entity to be retrieved 

492 entity_type (String): Entity type, to avoid ambiguity in case 

493 there are several entities with the same entity id. 

494 attrs (List of Strings): List of attribute names whose data must be 

495 included in the response. The attributes are retrieved in the 

496 order specified by this parameter. 

497 See "Filtering out attributes and metadata" section for more 

498 detail. If this parameter is not included, the attributes are 

499 retrieved in arbitrary order, and all the attributes of the 

500 entity are included in the response. 

501 Example: temperature, humidity. 

502 metadata (List of Strings): A list of metadata names to include in 

503 the response. See "Filtering out attributes and metadata" 

504 section for more detail. Example: accuracy. 

505 response_format (AttrsFormat, str): Representation format of 

506 response 

507 Returns: 

508 ContextEntity 

509 """ 

510 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}") 

511 headers = self.headers.copy() 

512 params = {} 

513 if entity_type: 

514 params.update({"type": entity_type}) 

515 if attrs: 

516 params.update({"attrs": ",".join(attrs)}) 

517 if metadata: 

518 params.update({"metadata": ",".join(metadata)}) 

519 if response_format not in list(AttrsFormat): 

520 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

521 params.update({"options": response_format}) 

522 

523 try: 

524 res = self.get(url=url, params=params, headers=headers) 

525 if res.ok: 

526 self.logger.info("Entity successfully retrieved!") 

527 self.logger.debug("Received: %s", res.json()) 

528 if response_format == AttrsFormat.NORMALIZED: 

529 return ContextEntity(**res.json()) 

530 if response_format == AttrsFormat.KEY_VALUES: 

531 return ContextEntityKeyValues(**res.json()) 

532 return res.json() 

533 res.raise_for_status() 

534 except requests.RequestException as err: 

535 msg = f"Could not load entity {entity_id}" 

536 raise BaseHttpClientException(message=msg, response=err.response) from err 

537 

538 def get_entity_attributes( 

539 self, 

540 entity_id: str, 

541 entity_type: str = None, 

542 attrs: List[str] = None, 

543 metadata: List[str] = None, 

544 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

545 ) -> Dict[str, ContextAttribute]: 

546 """ 

547 This request is similar to retrieving the whole entity, however this 

548 one omits the id and type fields. Just like the general request of 

549 getting an entire entity, this operation must return only one entity 

550 element. If more than one entity with the same ID is found (e.g. 

551 entities with same ID but different type), an error message is 

552 returned, with the HTTP status code set to 409 Conflict. 

553 

554 Args: 

555 entity_id (String): Id of the entity to be retrieved 

556 entity_type (String): Entity type, to avoid ambiguity in case 

557 there are several entities with the same entity id. 

558 attrs (List of Strings): List of attribute names whose data must be 

559 included in the response. The attributes are retrieved in the 

560 order specified by this parameter. 

561 See "Filtering out attributes and metadata" section for more 

562 detail. If this parameter is not included, the attributes are 

563 retrieved in arbitrary order, and all the attributes of the 

564 entity are included in the response. Example: temperature, 

565 humidity. 

566 metadata (List of Strings): A list of metadata names to include in 

567 the response. See "Filtering out attributes and metadata" 

568 section for more detail. Example: accuracy. 

569 response_format (AttrsFormat, str): Representation format of 

570 response 

571 Returns: 

572 Dict 

573 """ 

574 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs") 

575 headers = self.headers.copy() 

576 params = {} 

577 if entity_type: 

578 params.update({"type": entity_type}) 

579 if attrs: 

580 params.update({"attrs": ",".join(attrs)}) 

581 if metadata: 

582 params.update({"metadata": ",".join(metadata)}) 

583 if response_format not in list(AttrsFormat): 

584 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

585 params.update({"options": response_format}) 

586 try: 

587 res = self.get(url=url, params=params, headers=headers) 

588 if res.ok: 

589 if response_format == AttrsFormat.NORMALIZED: 

590 return { 

591 key: ContextAttribute(**values) 

592 for key, values in res.json().items() 

593 } 

594 return res.json() 

595 res.raise_for_status() 

596 except requests.RequestException as err: 

597 msg = f"Could not load attributes from entity {entity_id} !" 

598 raise BaseHttpClientException(message=msg, response=err.response) from err 

599 

600 def update_entity( 

601 self, 

602 entity: Union[ContextEntity, ContextEntityKeyValues, dict], 

603 append_strict: bool = False, 

604 key_values: bool = False, 

605 ): 

606 """ 

607 The request payload is an object representing the attributes to 

608 append or update. 

609 

610 Note: 

611 Update means overwriting the existing entity. If you want to 

612 manipulate you should rather use patch_entity. 

613 

614 Args: 

615 entity (ContextEntity): 

616 append_strict: If `False` the entity attributes are updated (if they 

617 previously exist) or appended (if they don't previously exist) 

618 with the ones in the payload. 

619 If `True` all the attributes in the payload not 

620 previously existing in the entity are appended. In addition 

621 to that, in case some of the attributes in the payload 

622 already exist in the entity, an error is returned. 

623 More precisely this means a strict append procedure. 

624 key_values: By default False. If set to True, the payload uses 

625 the keyValues simplified entity representation, i.e. 

626 ContextEntityKeyValues. 

627 Returns: 

628 None 

629 """ 

630 if key_values: 

631 if isinstance(entity, dict): 

632 entity = copy.deepcopy(entity) 

633 _id = entity.pop("id") 

634 _type = entity.pop("type") 

635 attrs = entity 

636 entity = ContextEntityKeyValues(id=_id, type=_type) 

637 else: 

638 attrs = entity.model_dump(exclude={"id", "type"}) 

639 else: 

640 attrs = entity.get_attributes() 

641 self.update_or_append_entity_attributes( 

642 entity_id=entity.id, 

643 entity_type=entity.type, 

644 attrs=attrs, 

645 append_strict=append_strict, 

646 key_values=key_values, 

647 ) 

648 

649 def update_entity_properties( 

650 self, entity: ContextEntity, append_strict: bool = False 

651 ): 

652 """ 

653 The request payload is an object representing the attributes, of any type 

654 but Relationship, to append or update. 

655 

656 Note: 

657 Update means overwriting the existing entity. If you want to 

658 manipulate you should rather use patch_entity. 

659 

660 Args: 

661 entity (ContextEntity): 

662 append_strict: If `False` the entity attributes are updated (if they 

663 previously exist) or appended (if they don't previously exist) 

664 with the ones in the payload. 

665 If `True` all the attributes in the payload not 

666 previously existing in the entity are appended. In addition 

667 to that, in case some of the attributes in the payload 

668 already exist in the entity, an error is returned. 

669 More precisely this means a strict append procedure. 

670 

671 Returns: 

672 None 

673 """ 

674 self.update_or_append_entity_attributes( 

675 entity_id=entity.id, 

676 entity_type=entity.type, 

677 attrs=entity.get_properties(), 

678 append_strict=append_strict, 

679 ) 

680 

681 def update_entity_relationships( 

682 self, entity: ContextEntity, append_strict: bool = False 

683 ): 

684 """ 

685 The request payload is an object representing only the attributes, of type 

686 Relationship, to append or update. 

687 

688 Note: 

689 Update means overwriting the existing entity. If you want to 

690 manipulate you should rather use patch_entity. 

691 

692 Args: 

693 entity (ContextEntity): 

694 append_strict: If `False` the entity attributes are updated (if they 

695 previously exist) or appended (if they don't previously exist) 

696 with the ones in the payload. 

697 If `True` all the attributes in the payload not 

698 previously existing in the entity are appended. In addition 

699 to that, in case some of the attributes in the payload 

700 already exist in the entity, an error is returned. 

701 More precisely this means a strict append procedure. 

702 

703 Returns: 

704 None 

705 """ 

706 self.update_or_append_entity_attributes( 

707 entity_id=entity.id, 

708 entity_type=entity.type, 

709 attrs=entity.get_relationships(), 

710 append_strict=append_strict, 

711 ) 

712 

713 def delete_entity( 

714 self, 

715 entity_id: str, 

716 entity_type: str = None, 

717 delete_devices: bool = False, 

718 iota_client: IoTAClient = None, 

719 iota_url: AnyHttpUrl = settings.IOTA_URL, 

720 ) -> None: 

721 """ 

722 Remove a entity from the context broker. No payload is required 

723 or received. 

724 

725 Args: 

726 entity_id: 

727 Id of the entity to be deleted 

728 entity_type: 

729 Entity type, to avoid ambiguity in case there are several 

730 entities with the same entity id. 

731 delete_devices: 

732 If True, also delete all devices that reference this 

733 entity (entity_id as entity_name) 

734 iota_client: 

735 Corresponding IoTA-Client used to access IoTA-Agent 

736 iota_url: 

737 URL of the corresponding IoT-Agent. This will autogenerate 

738 an IoTA-Client, mirroring the information of the 

739 ContextBrokerClient, e.g. FiwareHeader, and other headers 

740 

741 Returns: 

742 None 

743 """ 

744 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}") 

745 headers = self.headers.copy() 

746 if entity_type: 

747 params = {"type": entity_type} 

748 else: 

749 params = None 

750 try: 

751 res = self.delete(url=url, params=params, headers=headers) 

752 if res.ok: 

753 self.logger.info("Entity '%s' successfully deleted!", entity_id) 

754 else: 

755 res.raise_for_status() 

756 except requests.RequestException as err: 

757 msg = f"Could not delete entity {entity_id} !" 

758 raise BaseHttpClientException(message=msg, response=err.response) from err 

759 

760 if delete_devices: 

761 from filip.clients.ngsi_v2 import IoTAClient 

762 

763 if iota_client: 

764 iota_client_local = deepcopy(iota_client) 

765 else: 

766 warnings.warn( 

767 "No IoTA-Client object provided! " 

768 "Will try to generate one. " 

769 "This usage is not recommended." 

770 ) 

771 

772 iota_client_local = IoTAClient( 

773 url=iota_url, 

774 fiware_header=self.fiware_headers, 

775 headers=self.headers, 

776 ) 

777 

778 for device in iota_client_local.get_device_list(entity_names=[entity_id]): 

779 if entity_type: 

780 if device.entity_type == entity_type: 

781 iota_client_local.delete_device(device_id=device.device_id) 

782 else: 

783 iota_client_local.delete_device(device_id=device.device_id) 

784 iota_client_local.close() 

785 

786 def delete_entities(self, entities: List[ContextEntity]) -> None: 

787 """ 

788 Remove a list of entities from the context broker. This methode is 

789 more efficient than to call delete_entity() for each entity 

790 

791 Args: 

792 entities: List[ContextEntity]: List of entities to be deleted 

793 

794 Raises: 

795 Exception, if one of the entities is not in the ContextBroker 

796 

797 Returns: 

798 None 

799 """ 

800 

801 # update() delete, deletes all entities without attributes completely, 

802 # and removes the attributes for the other 

803 # The entities are sorted based on the fact if they have 

804 # attributes. 

805 limit = 1000 # max number of entities that will be deleted at once 

806 entities_with_attributes: List[ContextEntity] = [] 

807 for entity in entities: 

808 attribute_names = [ 

809 key 

810 for key in entity.model_dump() 

811 if key not in ContextEntity.model_fields 

812 ] 

813 if len(attribute_names) > 0: 

814 entities_with_attributes.append( 

815 ContextEntity(id=entity.id, type=entity.type) 

816 ) 

817 

818 # Post update_delete for those without attribute only once, 

819 # for the other post update_delete again but for the changed entity 

820 # in the ContextBroker (only id and type left) 

821 while len(entities) > 0: 

822 self.update(entities=entities[0:limit], action_type="delete") 

823 entities = entities[limit:] 

824 while len(entities_with_attributes) > 0: 

825 self.update( 

826 entities=entities_with_attributes[0:limit], action_type="delete" 

827 ) 

828 entities_with_attributes = entities_with_attributes[limit:] 

829 

830 def update_or_append_entity_attributes( 

831 self, 

832 entity_id: str, 

833 attrs: Union[ 

834 List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any] 

835 ], 

836 entity_type: str = None, 

837 append_strict: bool = False, 

838 forcedUpdate: bool = False, 

839 key_values: bool = False, 

840 ): 

841 """ 

842 The request payload is an object representing the attributes to 

843 append or update. This corresponds to a 'POST' request if append is 

844 set to 'False' 

845 

846 Note: 

847 Be careful not to update attributes that are 

848 provided via context registration, e.g. commands. Commands are 

849 removed before sending the request. To avoid breaking things. 

850 

851 Args: 

852 entity_id: Entity id to be updated 

853 entity_type: Entity type, to avoid ambiguity in case there are 

854 several entities with the same entity id. 

855 attrs: List of attributes to update or to append 

856 append_strict: If `False` the entity attributes are updated (if they 

857 previously exist) or appended (if they don't previously exist) 

858 with the ones in the payload. 

859 If `True` all the attributes in the payload not 

860 previously existing in the entity are appended. In addition 

861 to that, in case some of the attributes in the payload 

862 already exist in the entity, an error is returned. 

863 More precisely this means a strict append procedure. 

864 forcedUpdate: Update operation have to trigger any matching 

865 subscription, no matter if there is an actual attribute 

866 update or no instead of the default behavior, which is to 

867 updated only if attribute is effectively updated. 

868 key_values: By default False. If set to True, the payload uses 

869 the keyValues simplified entity representation, i.e. 

870 ContextEntityKeyValues. 

871 Returns: 

872 None 

873 

874 """ 

875 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs") 

876 headers = self.headers.copy() 

877 params = {} 

878 if entity_type: 

879 params.update({"type": entity_type}) 

880 else: 

881 entity_type = "dummy" 

882 

883 options = [] 

884 if append_strict: 

885 options.append("append") 

886 if forcedUpdate: 

887 options.append("forcedUpdate") 

888 if key_values: 

889 assert isinstance(attrs, dict), "for keyValues attrs has to be a dict" 

890 options.append("keyValues") 

891 if options: 

892 params.update({"options": ",".join(options)}) 

893 

894 if key_values: 

895 entity = ContextEntityKeyValues(id=entity_id, type=entity_type, **attrs) 

896 else: 

897 entity = ContextEntity(id=entity_id, type=entity_type) 

898 entity.add_attributes(attrs) 

899 # exclude commands from the send data, 

900 # as they live in the IoTA-agent 

901 excluded_keys = {"id", "type"} 

902 # excluded_keys.update( 

903 # entity.get_commands(response_format=PropertyFormat.DICT).keys() 

904 # ) 

905 try: 

906 res = self.post( 

907 url=url, 

908 headers=headers, 

909 json=entity.model_dump(exclude=excluded_keys, exclude_none=True), 

910 params=params, 

911 ) 

912 if res.ok: 

913 self.logger.info("Entity '%s' successfully " "updated!", entity.id) 

914 else: 

915 res.raise_for_status() 

916 except requests.RequestException as err: 

917 msg = f"Could not update or append attributes of entity" f" {entity.id} !" 

918 self.log_error(err=err, msg=msg) 

919 raise BaseHttpClientException(message=msg, response=err.response) from err 

920 

921 def update_existing_entity_attributes( 

922 self, 

923 entity_id: str, 

924 attrs: Union[ 

925 List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any] 

926 ], 

927 entity_type: str = None, 

928 forcedUpdate: bool = False, 

929 override_metadata: bool = False, 

930 key_values: bool = False, 

931 ): 

932 """ 

933 The entity attributes are updated with the ones in the payload. 

934 In addition to that, if one or more attributes in the payload doesn't 

935 exist in the entity, an error is returned. This corresponds to a 

936 'PATCH' request. 

937 

938 Args: 

939 entity_id: Entity id to be updated 

940 entity_type: Entity type, to avoid ambiguity in case there are 

941 several entities with the same entity id. 

942 attrs: List of attributes to update or to append 

943 forcedUpdate: Update operation have to trigger any matching 

944 subscription, no matter if there is an actual attribute 

945 update or no instead of the default behavior, which is to 

946 updated only if attribute is effectively updated. 

947 override_metadata: 

948 Bool,replace the existing metadata with the one provided in 

949 the request 

950 key_values: By default False. If set to True, the payload uses 

951 the keyValues simplified entity representation, i.e. 

952 ContextEntityKeyValues. 

953 Returns: 

954 None 

955 

956 """ 

957 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs") 

958 headers = self.headers.copy() 

959 if entity_type: 

960 params = {"type": entity_type} 

961 else: 

962 params = None 

963 entity_type = "dummy" 

964 

965 options = [] 

966 if override_metadata: 

967 options.append("overrideMetadata") 

968 if forcedUpdate: 

969 options.append("forcedUpdate") 

970 if key_values: 

971 assert isinstance(attrs, dict), "for keyValues the attrs must be dict" 

972 payload = attrs 

973 options.append("keyValues") 

974 else: 

975 entity = ContextEntity(id=entity_id, type=entity_type) 

976 entity.add_attributes(attrs) 

977 payload = entity.model_dump(exclude={"id", "type"}, exclude_none=True) 

978 if options: 

979 params.update({"options": ",".join(options)}) 

980 

981 try: 

982 res = self.patch( 

983 url=url, 

984 headers=headers, 

985 json=payload, 

986 params=params, 

987 ) 

988 if res.ok: 

989 self.logger.info("Entity '%s' successfully " "updated!", entity_id) 

990 else: 

991 res.raise_for_status() 

992 except requests.RequestException as err: 

993 msg = f"Could not update attributes of entity" 

994 raise BaseHttpClientException(message=msg, response=err.response) from err 

995 

996 def override_entity( 

997 self, entity: Union[ContextEntity, ContextEntityKeyValues], **kwargs 

998 ): 

999 """ 

1000 The request payload is an object representing the attributes to 

1001 override the existing entity. 

1002 

1003 Note: 

1004 If you want to manipulate you should rather use patch_entity. 

1005 

1006 Args: 

1007 entity (ContextEntity or ContextEntityKeyValues): 

1008 Returns: 

1009 None 

1010 """ 

1011 return self.replace_entity_attributes( 

1012 entity_id=entity.id, 

1013 entity_type=entity.type, 

1014 attrs=entity.get_attributes(), 

1015 **kwargs, 

1016 ) 

1017 

1018 def replace_entity_attributes( 

1019 self, 

1020 entity_id: str, 

1021 attrs: Union[ 

1022 List[Union[NamedContextAttribute, Dict[str, ContextAttribute]]], Dict 

1023 ], 

1024 entity_type: str = None, 

1025 forcedUpdate: bool = False, 

1026 key_values: bool = False, 

1027 ): 

1028 """ 

1029 The attributes previously existing in the entity are removed and 

1030 replaced by the ones in the request. This corresponds to a 'PUT' 

1031 request. 

1032 

1033 Args: 

1034 entity_id: Entity id to be updated 

1035 entity_type: Entity type, to avoid ambiguity in case there are 

1036 several entities with the same entity id. 

1037 attrs: List of attributes to add to the entity or dict of 

1038 attributes in case of key_values=True. 

1039 forcedUpdate: Update operation have to trigger any matching 

1040 subscription, no matter if there is an actual attribute 

1041 update or no instead of the default behavior, which is to 

1042 updated only if attribute is effectively updated. 

1043 key_values(bool): 

1044 By default False. If set to True, "options=keyValues" will 

1045 be included in params of the request. The payload uses 

1046 the keyValues simplified entity representation, i.e. 

1047 ContextEntityKeyValues. 

1048 Returns: 

1049 None 

1050 """ 

1051 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs") 

1052 headers = self.headers.copy() 

1053 params = {} 

1054 options = [] 

1055 if entity_type: 

1056 params.update({"type": entity_type}) 

1057 else: 

1058 entity_type = "dummy" 

1059 

1060 if forcedUpdate: 

1061 options.append("forcedUpdate") 

1062 

1063 if key_values: 

1064 options.append("keyValues") 

1065 assert isinstance(attrs, dict) 

1066 else: 

1067 entity = ContextEntity(id=entity_id, type=entity_type) 

1068 entity.add_attributes(attrs) 

1069 attrs = entity.model_dump(exclude={"id", "type"}, exclude_none=True) 

1070 if options: 

1071 params.update({"options": ",".join(options)}) 

1072 

1073 try: 

1074 res = self.put( 

1075 url=url, 

1076 headers=headers, 

1077 json=attrs, 

1078 params=params, 

1079 ) 

1080 if res.ok: 

1081 self.logger.info("Entity '%s' successfully " "updated!", entity_id) 

1082 else: 

1083 res.raise_for_status() 

1084 except requests.RequestException as err: 

1085 msg = f"Could not replace attribute of entity {entity_id} !" 

1086 raise BaseHttpClientException(message=msg, response=err.response) from err 

1087 

1088 # Attribute operations 

1089 def get_attribute( 

1090 self, 

1091 entity_id: str, 

1092 attr_name: str, 

1093 entity_type: str = None, 

1094 metadata: str = None, 

1095 response_format="", 

1096 ) -> ContextAttribute: 

1097 """ 

1098 Retrieves a specified attribute from an entity. 

1099 

1100 Args: 

1101 entity_id: Id of the entity. Example: Bcn_Welt 

1102 attr_name: Name of the attribute to be retrieved. 

1103 entity_type (Optional): Type of the entity to retrieve 

1104 metadata (Optional): A list of metadata names to include in the 

1105 response. See "Filtering out attributes and metadata" section 

1106 for more detail. 

1107 

1108 Returns: 

1109 The content of the retrieved attribute as ContextAttribute 

1110 

1111 Raises: 

1112 Error 

1113 

1114 """ 

1115 url = urljoin( 

1116 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}" 

1117 ) 

1118 headers = self.headers.copy() 

1119 params = {} 

1120 if entity_type: 

1121 params.update({"type": entity_type}) 

1122 if metadata: 

1123 params.update({"metadata": ",".join(metadata)}) 

1124 try: 

1125 res = self.get(url=url, params=params, headers=headers) 

1126 if res.ok: 

1127 self.logger.debug("Received: %s", res.json()) 

1128 return ContextAttribute(**res.json()) 

1129 res.raise_for_status() 

1130 except requests.RequestException as err: 

1131 msg = ( 

1132 f"Could not load attribute '{attr_name}' from entity" f"'{entity_id}' " 

1133 ) 

1134 raise BaseHttpClientException(message=msg, response=err.response) from err 

1135 

1136 def update_entity_attribute( 

1137 self, 

1138 entity_id: str, 

1139 attr: Union[ContextAttribute, NamedContextAttribute], 

1140 *, 

1141 entity_type: str = None, 

1142 attr_name: str = None, 

1143 override_metadata: bool = True, 

1144 forcedUpdate: bool = False, 

1145 ): 

1146 """ 

1147 Updates a specified attribute from an entity. 

1148 

1149 Args: 

1150 attr: 

1151 context attribute to update 

1152 entity_id: 

1153 Id of the entity. Example: Bcn_Welt 

1154 entity_type: 

1155 Entity type, to avoid ambiguity in case there are 

1156 several entities with the same entity id. 

1157 forcedUpdate: Update operation have to trigger any matching 

1158 subscription, no matter if there is an actual attribute 

1159 update or no instead of the default behavior, which is to 

1160 updated only if attribute is effectively updated. 

1161 attr_name: 

1162 Name of the attribute to be updated. 

1163 override_metadata: 

1164 Bool, if set to `True` (default) the metadata will be 

1165 overwritten. This is for backwards compatibility reasons. 

1166 If `False` the metadata values will be either updated if 

1167 already existing or append if not. 

1168 See also: 

1169 https://fiware-orion.readthedocs.io/en/master/user/metadata.html 

1170 """ 

1171 headers = self.headers.copy() 

1172 if not isinstance(attr, NamedContextAttribute): 

1173 assert attr_name is not None, ( 

1174 "Missing name for attribute. " 

1175 "attr_name must be present if" 

1176 "attr is of type ContextAttribute" 

1177 ) 

1178 else: 

1179 assert attr_name is None, ( 

1180 "Invalid argument attr_name. Do not set " 

1181 "attr_name if attr is of type " 

1182 "NamedContextAttribute" 

1183 ) 

1184 attr_name = attr.name 

1185 

1186 url = urljoin( 

1187 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}" 

1188 ) 

1189 params = {} 

1190 if entity_type: 

1191 params.update({"type": entity_type}) 

1192 # set overrideMetadata option (we assure backwards compatibility here) 

1193 options = [] 

1194 if override_metadata: 

1195 options.append("overrideMetadata") 

1196 if forcedUpdate: 

1197 options.append("forcedUpdate") 

1198 if options: 

1199 params.update({"options": ",".join(options)}) 

1200 try: 

1201 res = self.put( 

1202 url=url, 

1203 headers=headers, 

1204 params=params, 

1205 json=attr.model_dump(exclude={"name"}, exclude_none=True), 

1206 ) 

1207 if res.ok: 

1208 self.logger.info( 

1209 "Attribute '%s' of '%s' " "successfully updated!", 

1210 attr_name, 

1211 entity_id, 

1212 ) 

1213 else: 

1214 res.raise_for_status() 

1215 except requests.RequestException as err: 

1216 msg = ( 

1217 f"Could not update attribute '{attr_name}' of entity" f"'{entity_id}' " 

1218 ) 

1219 raise BaseHttpClientException(message=msg, response=err.response) from err 

1220 

1221 def delete_entity_attribute( 

1222 self, entity_id: str, attr_name: str, entity_type: str = None 

1223 ) -> None: 

1224 """ 

1225 Removes a specified attribute from an entity. 

1226 

1227 Args: 

1228 entity_id: Id of the entity. 

1229 attr_name: Name of the attribute to be retrieved. 

1230 entity_type: Entity type, to avoid ambiguity in case there are 

1231 several entities with the same entity id. 

1232 Raises: 

1233 Error 

1234 

1235 """ 

1236 url = urljoin( 

1237 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}" 

1238 ) 

1239 headers = self.headers.copy() 

1240 params = {} 

1241 if entity_type: 

1242 params.update({"type": entity_type}) 

1243 try: 

1244 res = self.delete(url=url, headers=headers) 

1245 if res.ok: 

1246 self.logger.info( 

1247 "Attribute '%s' of '%s' " "successfully deleted!", 

1248 attr_name, 

1249 entity_id, 

1250 ) 

1251 else: 

1252 res.raise_for_status() 

1253 except requests.RequestException as err: 

1254 msg = f"Could not delete attribute '{attr_name}' of entity '{entity_id}'" 

1255 raise BaseHttpClientException(message=msg, response=err.response) from err 

1256 

1257 # Attribute value operations 

1258 def get_attribute_value( 

1259 self, entity_id: str, attr_name: str, entity_type: str = None 

1260 ) -> Any: 

1261 """ 

1262 This operation returns the value property with the value of the 

1263 attribute. 

1264 

1265 Args: 

1266 entity_id: Id of the entity. Example: Bcn_Welt 

1267 attr_name: Name of the attribute to be retrieved. 

1268 Example: temperature. 

1269 entity_type: Entity type, to avoid ambiguity in case there are 

1270 several entities with the same entity id. 

1271 

1272 Returns: 

1273 

1274 """ 

1275 url = urljoin( 

1276 self.base_url, 

1277 f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value", 

1278 ) 

1279 headers = self.headers.copy() 

1280 params = {} 

1281 if entity_type: 

1282 params.update({"type": entity_type}) 

1283 try: 

1284 res = self.get(url=url, params=params, headers=headers) 

1285 if res.ok: 

1286 self.logger.debug("Received: %s", res.json()) 

1287 return res.json() 

1288 res.raise_for_status() 

1289 except requests.RequestException as err: 

1290 msg = ( 

1291 f"Could not load value of attribute '{attr_name}' from " 

1292 f"entity'{entity_id}' " 

1293 ) 

1294 raise BaseHttpClientException(message=msg, response=err.response) from err 

1295 

1296 def update_attribute_value( 

1297 self, 

1298 *, 

1299 entity_id: str, 

1300 attr_name: str, 

1301 value: Any, 

1302 entity_type: str = None, 

1303 forcedUpdate: bool = False, 

1304 ): 

1305 """ 

1306 Updates the value of a specified attribute of an entity 

1307 

1308 Args: 

1309 value: update value 

1310 entity_id: Id of the entity. Example: Bcn_Welt 

1311 attr_name: Name of the attribute to be retrieved. 

1312 Example: temperature. 

1313 entity_type: Entity type, to avoid ambiguity in case there are 

1314 several entities with the same entity id. 

1315 forcedUpdate: Update operation have to trigger any matching 

1316 subscription, no matter if there is an actual attribute 

1317 update or no instead of the default behavior, which is to 

1318 updated only if attribute is effectively updated. 

1319 Returns: 

1320 

1321 """ 

1322 url = urljoin( 

1323 self.base_url, 

1324 f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value", 

1325 ) 

1326 headers = self.headers.copy() 

1327 params = {} 

1328 if entity_type: 

1329 params.update({"type": entity_type}) 

1330 options = [] 

1331 if forcedUpdate: 

1332 options.append("forcedUpdate") 

1333 if options: 

1334 params.update({"options": ",".join(options)}) 

1335 try: 

1336 if not isinstance(value, (dict, list)): 

1337 headers.update({"Content-Type": "text/plain"}) 

1338 if isinstance(value, str): 

1339 value = f"{value}" 

1340 res = self.put(url=url, headers=headers, json=value, params=params) 

1341 else: 

1342 res = self.put(url=url, headers=headers, json=value, params=params) 

1343 if res.ok: 

1344 self.logger.info( 

1345 "Attribute '%s' of '%s' " "successfully updated!", 

1346 attr_name, 

1347 entity_id, 

1348 ) 

1349 else: 

1350 res.raise_for_status() 

1351 except requests.RequestException as err: 

1352 msg = ( 

1353 f"Could not update value of attribute '{attr_name}' from " 

1354 f"entity '{entity_id}' " 

1355 ) 

1356 raise BaseHttpClientException(message=msg, response=err.response) from err 

1357 

1358 # Types Operations 

1359 def get_entity_types( 

1360 self, *, limit: int = None, offset: int = None, options: str = None 

1361 ) -> List[Dict[str, Any]]: 

1362 """ 

1363 

1364 Args: 

1365 limit: Limit the number of types to be retrieved. 

1366 offset: Skip a number of records. 

1367 options: Options dictionary. Allowed: count, values 

1368 

1369 Returns: 

1370 

1371 """ 

1372 url = urljoin(self.base_url, f"{self._url_version}/types") 

1373 headers = self.headers.copy() 

1374 params = {} 

1375 if limit: 

1376 params.update({"limit": limit}) 

1377 if offset: 

1378 params.update({"offset": offset}) 

1379 if options: 

1380 params.update({"options": options}) 

1381 try: 

1382 res = self.get(url=url, params=params, headers=headers) 

1383 if res.ok: 

1384 self.logger.debug("Received: %s", res.json()) 

1385 return res.json() 

1386 res.raise_for_status() 

1387 except requests.RequestException as err: 

1388 msg = "Could not load entity types!" 

1389 raise BaseHttpClientException(message=msg, response=err.response) from err 

1390 

1391 def get_entity_type(self, entity_type: str) -> Dict[str, Any]: 

1392 """ 

1393 

1394 Args: 

1395 entity_type: Entity Type. Example: Room 

1396 

1397 Returns: 

1398 

1399 """ 

1400 url = urljoin(self.base_url, f"{self._url_version}/types/{entity_type}") 

1401 headers = self.headers.copy() 

1402 params = {} 

1403 try: 

1404 res = self.get(url=url, params=params, headers=headers) 

1405 if res.ok: 

1406 self.logger.debug("Received: %s", res.json()) 

1407 return res.json() 

1408 res.raise_for_status() 

1409 except requests.RequestException as err: 

1410 msg = f"Could not load entities of type" f"'{entity_type}' " 

1411 raise BaseHttpClientException(message=msg, response=err.response) from err 

1412 

1413 # SUBSCRIPTION API ENDPOINTS 

1414 def get_subscription_list(self, limit: PositiveInt = inf) -> List[Subscription]: 

1415 """ 

1416 Returns a list of all the subscriptions present in the system. 

1417 Args: 

1418 limit: Limit the number of subscriptions to be retrieved 

1419 Returns: 

1420 list of subscriptions 

1421 """ 

1422 url = urljoin(self.base_url, f"{self._url_version}/subscriptions/") 

1423 headers = self.headers.copy() 

1424 params = {} 

1425 

1426 # We always use the 'count' option to check weather pagination is 

1427 # required 

1428 params.update({"options": "count"}) 

1429 try: 

1430 items = self.__pagination( 

1431 limit=limit, url=url, params=params, headers=headers 

1432 ) 

1433 adapter = TypeAdapter(List[Subscription]) 

1434 return adapter.validate_python(items) 

1435 except requests.RequestException as err: 

1436 msg = "Could not load subscriptions!" 

1437 raise BaseHttpClientException(message=msg, response=err.response) from err 

1438 

1439 def post_subscription( 

1440 self, 

1441 subscription: Subscription, 

1442 update: bool = False, 

1443 skip_initial_notification: bool = False, 

1444 ) -> str: 

1445 """ 

1446 Creates a new subscription. The subscription is represented by a 

1447 Subscription object defined in filip.cb.models. 

1448 

1449 If the subscription already exists, the adding is prevented and the id 

1450 of the existing subscription is returned. 

1451 

1452 A subscription is deemed as already existing if there exists a 

1453 subscription with the exact same subject and notification fields. All 

1454 optional fields are not considered. 

1455 

1456 Args: 

1457 subscription: Subscription 

1458 update: True - If the subscription already exists, update it 

1459 False- If the subscription already exists, throw warning 

1460 skip_initial_notification: True - Initial Notifications will be 

1461 sent to recipient containing the whole data. This is 

1462 deprecated and removed from version 3.0 of the context broker. 

1463 False - skip the initial notification 

1464 Returns: 

1465 str: Id of the (created) subscription 

1466 

1467 """ 

1468 existing_subscriptions = self.get_subscription_list() 

1469 

1470 sub_dict = subscription.model_dump( 

1471 include={"subject", "notification"}, 

1472 exclude={ 

1473 "notification": { 

1474 "lastSuccess", 

1475 "lastFailure", 

1476 "lastSuccessCode", 

1477 "lastFailureReason", 

1478 } 

1479 }, 

1480 ) 

1481 for ex_sub in existing_subscriptions: 

1482 if self._subscription_dicts_are_equal( 

1483 sub_dict, 

1484 ex_sub.model_dump( 

1485 include={"subject", "notification"}, 

1486 exclude={ 

1487 "lastSuccess", 

1488 "lastFailure", 

1489 "lastSuccessCode", 

1490 "lastFailureReason", 

1491 }, 

1492 ), 

1493 ): 

1494 self.logger.info("Subscription already exists") 

1495 if update: 

1496 self.logger.info("Updated subscription") 

1497 subscription.id = ex_sub.id 

1498 self.update_subscription(subscription) 

1499 else: 

1500 warnings.warn( 

1501 f"Subscription existed already with the id" f" {ex_sub.id}" 

1502 ) 

1503 return ex_sub.id 

1504 

1505 params = {} 

1506 if skip_initial_notification: 

1507 version = self.get_version()["orion"]["version"] 

1508 if parse_version(version) <= parse_version("3.1"): 

1509 params.update({"options": "skipInitialNotification"}) 

1510 else: 

1511 pass 

1512 warnings.warn( 

1513 f"Skip initial notifications is a deprecated " 

1514 f"feature of older versions <=3.1 of the context " 

1515 f"broker. The Context Broker that you requesting has " 

1516 f"version: {version}. For newer versions we " 

1517 f"automatically skip this option. Consider " 

1518 f"refactoring and updating your services", 

1519 DeprecationWarning, 

1520 ) 

1521 

1522 url = urljoin(self.base_url, "v2/subscriptions") 

1523 headers = self.headers.copy() 

1524 headers.update({"Content-Type": "application/json"}) 

1525 try: 

1526 res = self.post( 

1527 url=url, 

1528 headers=headers, 

1529 data=subscription.model_dump_json( 

1530 exclude={ 

1531 "id": True, 

1532 "notification": { 

1533 "lastSuccess", 

1534 "lastFailure", 

1535 "lastSuccessCode", 

1536 "lastFailureReason", 

1537 }, 

1538 }, 

1539 exclude_none=True, 

1540 ), 

1541 params=params, 

1542 ) 

1543 if res.ok: 

1544 self.logger.info("Subscription successfully created!") 

1545 return res.headers["Location"].split("/")[-1] 

1546 res.raise_for_status() 

1547 except requests.RequestException as err: 

1548 msg = "Could not send subscription!" 

1549 raise BaseHttpClientException(message=msg, response=err.response) from err 

1550 

1551 def get_subscription(self, subscription_id: str) -> Subscription: 

1552 """ 

1553 Retrieves a subscription from 

1554 Args: 

1555 subscription_id: id of the subscription 

1556 

1557 Returns: 

1558 

1559 """ 

1560 url = urljoin( 

1561 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}" 

1562 ) 

1563 headers = self.headers.copy() 

1564 try: 

1565 res = self.get(url=url, headers=headers) 

1566 if res.ok: 

1567 self.logger.debug("Received: %s", res.json()) 

1568 return Subscription(**res.json()) 

1569 res.raise_for_status() 

1570 except requests.RequestException as err: 

1571 msg = f"Could not load subscription {subscription_id}" 

1572 raise BaseHttpClientException(message=msg, response=err.response) from err 

1573 

1574 def update_subscription( 

1575 self, subscription: Subscription, skip_initial_notification: bool = False 

1576 ): 

1577 """ 

1578 Only the fields included in the request are updated in the subscription. 

1579 

1580 Args: 

1581 subscription: Subscription to update 

1582 skip_initial_notification: True - Initial Notifications will be 

1583 sent to recipient containing the whole data. This is 

1584 deprecated and removed from version 3.0 of the context broker. 

1585 False - skip the initial notification 

1586 

1587 Returns: 

1588 None 

1589 """ 

1590 params = {} 

1591 if skip_initial_notification: 

1592 version = self.get_version()["orion"]["version"] 

1593 if parse_version(version) <= parse_version("3.1"): 

1594 params.update({"options": "skipInitialNotification"}) 

1595 else: 

1596 pass 

1597 warnings.warn( 

1598 f"Skip initial notifications is a deprecated " 

1599 f"feature of older versions <3.1 of the context " 

1600 f"broker. The Context Broker that you requesting has " 

1601 f"version: {version}. For newer versions we " 

1602 f"automatically skip this option. Consider " 

1603 f"refactoring and updating your services", 

1604 DeprecationWarning, 

1605 ) 

1606 

1607 url = urljoin( 

1608 self.base_url, f"{self._url_version}/subscriptions/{subscription.id}" 

1609 ) 

1610 headers = self.headers.copy() 

1611 headers.update({"Content-Type": "application/json"}) 

1612 try: 

1613 res = self.patch( 

1614 url=url, 

1615 headers=headers, 

1616 data=subscription.model_dump_json( 

1617 exclude={ 

1618 "id": True, 

1619 "notification": { 

1620 "lastSuccess", 

1621 "lastFailure", 

1622 "lastSuccessCode", 

1623 "lastFailureReason", 

1624 }, 

1625 }, 

1626 exclude_none=True, 

1627 ), 

1628 ) 

1629 if res.ok: 

1630 self.logger.info("Subscription successfully updated!") 

1631 else: 

1632 res.raise_for_status() 

1633 except requests.RequestException as err: 

1634 msg = f"Could not update subscription {subscription.id}" 

1635 raise BaseHttpClientException(message=msg, response=err.response) from err 

1636 

1637 def delete_subscription(self, subscription_id: str) -> None: 

1638 """ 

1639 Deletes a subscription from a Context Broker 

1640 Args: 

1641 subscription_id: id of the subscription 

1642 """ 

1643 url = urljoin( 

1644 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}" 

1645 ) 

1646 headers = self.headers.copy() 

1647 try: 

1648 res = self.delete(url=url, headers=headers) 

1649 if res.ok: 

1650 self.logger.info( 

1651 f"Subscription '{subscription_id}' " f"successfully deleted!" 

1652 ) 

1653 else: 

1654 res.raise_for_status() 

1655 except requests.RequestException as err: 

1656 msg = f"Could not delete subscription {subscription_id}" 

1657 raise BaseHttpClientException(message=msg, response=err.response) from err 

1658 

1659 # Registration API 

1660 def get_registration_list(self, *, limit: PositiveInt = None) -> List[Registration]: 

1661 """ 

1662 Lists all the context provider registrations present in the system. 

1663 

1664 Args: 

1665 limit: Limit the number of registrations to be retrieved 

1666 Returns: 

1667 

1668 """ 

1669 url = urljoin(self.base_url, f"{self._url_version}/registrations/") 

1670 headers = self.headers.copy() 

1671 params = {} 

1672 

1673 # We always use the 'count' option to check weather pagination is 

1674 # required 

1675 params.update({"options": "count"}) 

1676 try: 

1677 items = self.__pagination( 

1678 limit=limit, url=url, params=params, headers=headers 

1679 ) 

1680 adapter = TypeAdapter(List[Registration]) 

1681 return adapter.validate_python(items) 

1682 except requests.RequestException as err: 

1683 msg = "Could not load registrations!" 

1684 raise BaseHttpClientException(message=msg, response=err.response) from err 

1685 

1686 def post_registration(self, registration: Registration): 

1687 """ 

1688 Creates a new context provider registration. This is typically used 

1689 for binding context sources as providers of certain data. The 

1690 registration is represented by cb.models.Registration 

1691 

1692 Args: 

1693 registration (Registration): 

1694 

1695 Returns: 

1696 

1697 """ 

1698 url = urljoin(self.base_url, f"{self._url_version}/registrations") 

1699 headers = self.headers.copy() 

1700 headers.update({"Content-Type": "application/json"}) 

1701 try: 

1702 res = self.post( 

1703 url=url, 

1704 headers=headers, 

1705 data=registration.model_dump_json(exclude={"id"}, exclude_none=True), 

1706 ) 

1707 if res.ok: 

1708 self.logger.info("Registration successfully created!") 

1709 return res.headers["Location"].split("/")[-1] 

1710 res.raise_for_status() 

1711 except requests.RequestException as err: 

1712 msg = f"Could not send registration {registration.id}!" 

1713 raise BaseHttpClientException(message=msg, response=err.response) from err 

1714 

1715 def get_registration(self, registration_id: str) -> Registration: 

1716 """ 

1717 Retrieves a registration from context broker by id 

1718 

1719 Args: 

1720 registration_id: id of the registration 

1721 

1722 Returns: 

1723 Registration 

1724 """ 

1725 url = urljoin( 

1726 self.base_url, f"{self._url_version}/registrations/{registration_id}" 

1727 ) 

1728 headers = self.headers.copy() 

1729 try: 

1730 res = self.get(url=url, headers=headers) 

1731 if res.ok: 

1732 self.logger.debug("Received: %s", res.json()) 

1733 return Registration(**res.json()) 

1734 res.raise_for_status() 

1735 except requests.RequestException as err: 

1736 msg = f"Could not load registration {registration_id} !" 

1737 raise BaseHttpClientException(message=msg, response=err.response) from err 

1738 

1739 def add_valid_relationships( 

1740 self, entities: List[ContextEntity] 

1741 ) -> List[ContextEntity]: 

1742 """ 

1743 Validate all attributes in the given entities. If the attribute value points to 

1744 an existing entity, it is assumed that this attribute is a relationship, and it 

1745 will be assigned with the attribute type "relationship" 

1746 

1747 Args: 

1748 entities: list of entities that need to be validated. 

1749 

1750 Returns: 

1751 updated entities 

1752 """ 

1753 updated_entities = [] 

1754 for entity in entities[:]: 

1755 for attr_name, attr_value in entity.model_dump( 

1756 exclude={"id", "type"} 

1757 ).items(): 

1758 if isinstance(attr_value, dict): 

1759 if self.validate_relationship(attr_value): 

1760 entity.update_attribute( 

1761 { 

1762 attr_name: ContextAttribute( 

1763 **{ 

1764 "type": DataType.RELATIONSHIP, 

1765 "value": attr_value.get("value"), 

1766 } 

1767 ) 

1768 } 

1769 ) 

1770 updated_entities.append(entity) 

1771 return updated_entities 

1772 

1773 def remove_invalid_relationships( 

1774 self, entities: List[ContextEntity], hard_remove: bool = True 

1775 ) -> List[ContextEntity]: 

1776 """ 

1777 Removes invalid relationships from the entities. An invalid relationship 

1778 is a relationship that has no destination entity. 

1779 

1780 Args: 

1781 entities: list of entities that need to be validated. 

1782 hard_remove: If True, invalid relationships will be deleted. 

1783 If False, invalid relationships will be changed to Text 

1784 attributes. 

1785 

1786 Returns: 

1787 updated entities 

1788 """ 

1789 updated_entities = [] 

1790 for entity in entities[:]: 

1791 for relationship in entity.get_relationships(): 

1792 if not self.validate_relationship(relationship): 

1793 if hard_remove: 

1794 entity.delete_attributes(attrs=[relationship]) 

1795 else: 

1796 # change the attribute type to "Text" 

1797 entity.update_attribute( 

1798 attrs=[ 

1799 NamedContextAttribute( 

1800 name=relationship.name, 

1801 type=DataType.TEXT, 

1802 value=relationship.value, 

1803 ) 

1804 ] 

1805 ) 

1806 updated_entities.append(entity) 

1807 return updated_entities 

1808 

1809 def validate_relationship( 

1810 self, relationship: Union[NamedContextAttribute, ContextAttribute, Dict] 

1811 ) -> bool: 

1812 """ 

1813 Validates a relationship. A relationship is valid if it points to an existing 

1814 entity. Otherwise, it is considered invalid 

1815 

1816 Args: 

1817 relationship: relationship to validate 

1818 Returns 

1819 True if the relationship is valid, False otherwise 

1820 """ 

1821 if isinstance(relationship, NamedContextAttribute) or isinstance( 

1822 relationship, ContextAttribute 

1823 ): 

1824 destination_id = relationship.value 

1825 elif isinstance(relationship, dict): 

1826 _sentinel = object() 

1827 destination_id = relationship.get("value", _sentinel) 

1828 if destination_id is _sentinel: 

1829 raise ValueError( 

1830 "Invalid relationship dictionary format\n" 

1831 "Expected format: {" 

1832 f'"type": "{DataType.RELATIONSHIP.value}", ' 

1833 '"value" "entity_id"}' 

1834 ) 

1835 else: 

1836 raise ValueError("Invalid relationship type.") 

1837 try: 

1838 destination_entity = self.get_entity(entity_id=destination_id) 

1839 return destination_entity.id == destination_id 

1840 except requests.RequestException as err: 

1841 if err.response.status_code == 404: 

1842 return False 

1843 

1844 def update_registration(self, registration: Registration): 

1845 """ 

1846 Only the fields included in the request are updated in the registration. 

1847 

1848 Args: 

1849 registration: Registration to update 

1850 Returns: 

1851 

1852 """ 

1853 url = urljoin( 

1854 self.base_url, f"{self._url_version}/registrations/{registration.id}" 

1855 ) 

1856 headers = self.headers.copy() 

1857 headers.update({"Content-Type": "application/json"}) 

1858 try: 

1859 res = self.patch( 

1860 url=url, 

1861 headers=headers, 

1862 data=registration.model_dump_json(exclude={"id"}, exclude_none=True), 

1863 ) 

1864 if res.ok: 

1865 self.logger.info("Registration successfully updated!") 

1866 else: 

1867 res.raise_for_status() 

1868 except requests.RequestException as err: 

1869 msg = f"Could not update registration {registration.id} !" 

1870 raise BaseHttpClientException(message=msg, response=err.response) from err 

1871 

1872 def delete_registration(self, registration_id: str) -> None: 

1873 """ 

1874 Deletes a subscription from a Context Broker 

1875 Args: 

1876 registration_id: id of the subscription 

1877 """ 

1878 url = urljoin( 

1879 self.base_url, f"{self._url_version}/registrations/{registration_id}" 

1880 ) 

1881 headers = self.headers.copy() 

1882 try: 

1883 res = self.delete(url=url, headers=headers) 

1884 if res.ok: 

1885 self.logger.info( 

1886 "Registration '%s' " "successfully deleted!", registration_id 

1887 ) 

1888 res.raise_for_status() 

1889 except requests.RequestException as err: 

1890 msg = f"Could not delete registration {registration_id} !" 

1891 raise BaseHttpClientException(message=msg, response=err.response) from err 

1892 

1893 # Batch operation API 

1894 def update( 

1895 self, 

1896 *, 

1897 entities: List[Union[ContextEntity, ContextEntityKeyValues]], 

1898 action_type: Union[ActionType, str], 

1899 update_format: str = None, 

1900 forcedUpdate: bool = False, 

1901 override_metadata: bool = False, 

1902 ) -> None: 

1903 """ 

1904 This operation allows to create, update and/or delete several entities 

1905 in a single batch operation. 

1906 

1907 This operation is split in as many individual operations as entities 

1908 in the entities vector, so the actionType is executed for each one of 

1909 them. Depending on the actionType, a mapping with regular non-batch 

1910 operations can be done: 

1911 

1912 append: maps to POST /v2/entities (if the entity does not already exist) 

1913 or POST /v2/entities/<id>/attrs (if the entity already exists). 

1914 

1915 appendStrict: maps to POST /v2/entities (if the entity does not 

1916 already exist) or POST /v2/entities/<id>/attrs?options=append (if the 

1917 entity already exists). 

1918 

1919 update: maps to PATCH /v2/entities/<id>/attrs. 

1920 

1921 delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every 

1922 attribute included in the entity or to DELETE /v2/entities/<id> if 

1923 no attribute were included in the entity. 

1924 

1925 replace: maps to PUT /v2/entities/<id>/attrs. 

1926 

1927 Args: 

1928 entities: "an array of entities, each entity specified using the " 

1929 "JSON entity representation format " 

1930 action_type (Update): "actionType, to specify the kind of update 

1931 action to do: either append, appendStrict, update, delete, 

1932 or replace. " 

1933 update_format (str): Optional 'keyValues' 

1934 forcedUpdate: Update operation have to trigger any matching 

1935 subscription, no matter if there is an actual attribute 

1936 update or no instead of the default behavior, which is to 

1937 updated only if attribute is effectively updated. 

1938 override_metadata: 

1939 Bool, replace the existing metadata with the one provided in 

1940 the request 

1941 Returns: 

1942 

1943 """ 

1944 

1945 url = urljoin(self.base_url, f"{self._url_version}/op/update") 

1946 headers = self.headers.copy() 

1947 headers.update({"Content-Type": "application/json"}) 

1948 params = {} 

1949 options = [] 

1950 if override_metadata: 

1951 options.append("overrideMetadata") 

1952 if forcedUpdate: 

1953 options.append("forcedUpdate") 

1954 if update_format: 

1955 assert ( 

1956 update_format == AttrsFormat.KEY_VALUES.value 

1957 ), "Only 'keyValues' is allowed as update format" 

1958 options.append("keyValues") 

1959 if options: 

1960 params.update({"options": ",".join(options)}) 

1961 update = Update(actionType=action_type, entities=entities) 

1962 try: 

1963 res = self.post( 

1964 url=url, 

1965 headers=headers, 

1966 params=params, 

1967 json=update.model_dump(by_alias=True), 

1968 ) 

1969 if res.ok: 

1970 self.logger.info("Update operation '%s' succeeded!", action_type) 

1971 else: 

1972 res.raise_for_status() 

1973 except requests.RequestException as err: 

1974 msg = f"Update operation '{action_type}' failed!" 

1975 raise BaseHttpClientException(message=msg, response=err.response) from err 

1976 

1977 def query( 

1978 self, 

1979 *, 

1980 query: Query, 

1981 limit: PositiveInt = None, 

1982 order_by: str = None, 

1983 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

1984 ) -> List[Any]: 

1985 """ 

1986 Generate api query 

1987 Args: 

1988 query (Query): 

1989 limit (PositiveInt): 

1990 order_by (str): 

1991 response_format (AttrsFormat, str): 

1992 Returns: 

1993 The response payload is an Array containing one object per matching 

1994 entity, or an empty array [] if no entities are found. The entities 

1995 follow the JSON entity representation format (described in the 

1996 section "JSON Entity Representation"). 

1997 """ 

1998 url = urljoin(self.base_url, f"{self._url_version}/op/query") 

1999 headers = self.headers.copy() 

2000 headers.update({"Content-Type": "application/json"}) 

2001 params = {"options": "count"} 

2002 

2003 if response_format: 

2004 if response_format not in list(AttrsFormat): 

2005 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

2006 params["options"] = ",".join([response_format, "count"]) 

2007 try: 

2008 items = self.__pagination( 

2009 method=PaginationMethod.POST, 

2010 url=url, 

2011 headers=headers, 

2012 params=params, 

2013 data=query.model_dump_json(exclude_none=True), 

2014 limit=limit, 

2015 ) 

2016 if response_format == AttrsFormat.NORMALIZED: 

2017 adapter = TypeAdapter(List[ContextEntity]) 

2018 return adapter.validate_python(items) 

2019 if response_format == AttrsFormat.KEY_VALUES: 

2020 adapter = TypeAdapter(List[ContextEntityKeyValues]) 

2021 return adapter.validate_python(items) 

2022 return items 

2023 except requests.RequestException as err: 

2024 msg = "Query operation failed!" 

2025 raise BaseHttpClientException(message=msg, response=err.response) from err 

2026 

2027 def notify(self, message: Message) -> None: 

2028 """ 

2029 This operation is intended to consume a notification payload so that 

2030 all the entity data included by such notification is persisted, 

2031 overwriting if necessary. This operation is useful when one NGSIv2 

2032 endpoint is subscribed to another NGSIv2 endpoint (federation 

2033 scenarios). The request payload must be an NGSIv2 notification 

2034 payload. The behaviour must be exactly the same as 'update' 

2035 with 'action_type' equal to append. 

2036 

2037 Args: 

2038 message: Notification message 

2039 

2040 Returns: 

2041 None 

2042 """ 

2043 url = urljoin(self.base_url, "v2/op/notify") 

2044 headers = self.headers.copy() 

2045 headers.update({"Content-Type": "application/json"}) 

2046 params = {} 

2047 try: 

2048 res = self.post( 

2049 url=url, 

2050 headers=headers, 

2051 params=params, 

2052 data=message.model_dump_json(by_alias=True), 

2053 ) 

2054 if res.ok: 

2055 self.logger.info("Notification message sent!") 

2056 else: 

2057 res.raise_for_status() 

2058 except requests.RequestException as err: 

2059 msg = ( 

2060 f"Sending notifcation message failed! \n " 

2061 f"{message.model_dump_json(indent=2)}" 

2062 ) 

2063 raise BaseHttpClientException(message=msg, response=err.response) from err 

2064 

2065 def post_command( 

2066 self, 

2067 *, 

2068 entity_id: str, 

2069 command: Union[Command, NamedCommand, Dict], 

2070 entity_type: str = None, 

2071 command_name: str = None, 

2072 ) -> None: 

2073 """ 

2074 Post a command to a context entity this corresponds to 'PATCH' of the 

2075 specified command attribute. 

2076 

2077 Args: 

2078 entity_id: Entity identifier 

2079 command: Command 

2080 entity_type: Entity type 

2081 command_name: Name of the command in the entity 

2082 

2083 Returns: 

2084 None 

2085 """ 

2086 if command_name: 

2087 assert isinstance(command, (Command, dict)) 

2088 if isinstance(command, dict): 

2089 command = Command(**command) 

2090 command = {command_name: command.model_dump()} 

2091 else: 

2092 assert isinstance(command, (NamedCommand, dict)) 

2093 if isinstance(command, dict): 

2094 command = NamedCommand(**command) 

2095 

2096 self.update_existing_entity_attributes( 

2097 entity_id=entity_id, entity_type=entity_type, attrs=[command] 

2098 ) 

2099 

2100 def does_entity_exist(self, entity_id: str, entity_type: str) -> bool: 

2101 """ 

2102 Test if an entity with given id and type is present in the CB 

2103 

2104 Args: 

2105 entity_id: Entity id 

2106 entity_type: Entity type 

2107 

2108 Returns: 

2109 bool; True if entity exists 

2110 

2111 Raises: 

2112 RequestException, if any error occurs (e.g: No Connection), 

2113 except that the entity is not found 

2114 """ 

2115 url = urljoin(self.base_url, f"v2/entities/{entity_id}") 

2116 headers = self.headers.copy() 

2117 params = {"type": entity_type} 

2118 

2119 try: 

2120 res = self.get(url=url, params=params, headers=headers) 

2121 if res.ok: 

2122 return True 

2123 res.raise_for_status() 

2124 except requests.RequestException as err: 

2125 if err.response is None or not err.response.status_code == 404: 

2126 self.log_error(err=err, msg="Checking entity existence failed!") 

2127 raise 

2128 return False 

2129 

2130 def patch_entity( 

2131 self, 

2132 entity: Union[ContextEntity, ContextEntityKeyValues], 

2133 key_values: bool = False, 

2134 forcedUpdate: bool = False, 

2135 override_metadata: bool = False, 

2136 ) -> None: 

2137 """ 

2138 Takes a given entity and updates the state in the CB to match it. 

2139 It is an extended equivalent to the HTTP method PATCH, which applies 

2140 partial modifications to a resource. 

2141 

2142 Args: 

2143 entity: Entity to update 

2144 key_values: If True, the entity is updated in key-values format. 

2145 forcedUpdate: Update operation have to trigger any matching 

2146 subscription, no matter if there is an actual attribute 

2147 update or no instead of the default behavior, which is to 

2148 updated only if attribute is effectively updated. 

2149 override_metadata: If True, the existing metadata of the entity 

2150 is replaced 

2151 Returns: 

2152 None 

2153 """ 

2154 attributes = entity.get_attributes() 

2155 

2156 self.update_existing_entity_attributes( 

2157 entity_id=entity.id, 

2158 entity_type=entity.type, 

2159 attrs=attributes, 

2160 key_values=key_values, 

2161 forcedUpdate=forcedUpdate, 

2162 override_metadata=override_metadata, 

2163 ) 

2164 

2165 def _subscription_dicts_are_equal(self, first: dict, second: dict): 

2166 """ 

2167 Check if two dictionaries and all sub-dictionaries are equal. 

2168 Logs a warning if the keys are not equal, but ignores the 

2169 comparison of such keys. 

2170 

2171 Args: 

2172 first dict: Dictionary of first subscription 

2173 second dict: Dictionary of second subscription 

2174 

2175 Returns: 

2176 True if equal, else False 

2177 """ 

2178 

2179 def _value_is_not_none(value): 

2180 """ 

2181 Recursive function to check if a value equals none. 

2182 If the value is a dict and any value of the dict is not none, 

2183 the value is not none. 

2184 If the value is a list and any item is not none, the value is not none. 

2185 If it's neither dict nore list, bool is used. 

2186 """ 

2187 if isinstance(value, dict): 

2188 return any([_value_is_not_none(value=_v) for _v in value.values()]) 

2189 if isinstance(value, list): 

2190 return any([_value_is_not_none(value=_v) for _v in value]) 

2191 else: 

2192 return bool(value) 

2193 

2194 if first.keys() != second.keys(): 

2195 warnings.warn( 

2196 "Subscriptions contain a different set of fields. " 

2197 "Only comparing to new fields of the new one." 

2198 ) 

2199 for k, v in first.items(): 

2200 ex_value = second.get(k, None) 

2201 if isinstance(v, dict) and isinstance(ex_value, dict): 

2202 equal = self._subscription_dicts_are_equal(v, ex_value) 

2203 if equal: 

2204 continue 

2205 else: 

2206 return False 

2207 if v != ex_value: 

2208 self.logger.debug(f"Not equal fields for key {k}: ({v}, {ex_value})") 

2209 if ( 

2210 not _value_is_not_none(v) 

2211 and not _value_is_not_none(ex_value) 

2212 or k == "timesSent" 

2213 ): 

2214 continue 

2215 return False 

2216 return True 

2217 

2218 

2219# 

2220# 

2221# def check_duplicate_subscription(self, subscription_body, limit: int = 20): 

2222# """ 

2223# Function compares the subject of the subscription body, on whether a subscription 

2224# already exists for a device / entity. 

2225# :param subscription_body: the body of the new subscripton 

2226# :param limit: pagination parameter, to set the number of 

2227# subscriptions bodies the get request should grab 

2228# :return: exists, boolean -> True, if such a subscription allready 

2229# exists 

2230# """ 

2231# exists = False 

2232# subscription_subject = json.loads(subscription_body)["subject"] 

2233# # Exact keys depend on subscription body 

2234# try: 

2235# subscription_url = json.loads(subscription_body)[ 

2236# "notification"]["httpCustom"]["url"] 

2237# except KeyError: 

2238# subscription_url = json.loads(subscription_body)[ 

2239# "notification"]["http"]["url"] 

2240# 

2241# # If the number of subscriptions is larger then the limit, 

2242# paginations methods have to be used 

2243# url = self.url + '/v2/subscriptions?limit=' + str(limit) + 

2244# '&options=count' 

2245# response = self.session.get(url, headers=self.get_header()) 

2246# 

2247# sub_count = float(response.headers["Fiware-Total-Count"]) 

2248# response = json.loads(response.text) 

2249# if sub_count >= limit: 

2250# response = self.get_pagination(url=url, headers=self.get_header(), 

2251# limit=limit, count=sub_count) 

2252# response = json.loads(response) 

2253# 

2254# for existing_subscription in response: 

2255# # check whether the exact same subscriptions already exists 

2256# if existing_subscription["subject"] == subscription_subject: 

2257# exists = True 

2258# break 

2259# try: 

2260# existing_url = existing_subscription["notification"][ 

2261# "http"]["url"] 

2262# except KeyError: 

2263# existing_url = existing_subscription["notification"][ 

2264# "httpCustom"]["url"] 

2265# # check whether both subscriptions notify to the same path 

2266# if existing_url != subscription_url: 

2267# continue 

2268# else: 

2269# # iterate over all entities included in the subscription object 

2270# for entity in subscription_subject["entities"]: 

2271# if 'type' in entity.keys(): 

2272# subscription_type = entity['type'] 

2273# else: 

2274# subscription_type = entity['typePattern'] 

2275# if 'id' in entity.keys(): 

2276# subscription_id = entity['id'] 

2277# else: 

2278# subscription_id = entity["idPattern"] 

2279# # iterate over all entities included in the exisiting 

2280# subscriptions 

2281# for existing_entity in existing_subscription["subject"][ 

2282# "entities"]: 

2283# if "type" in entity.keys(): 

2284# type_existing = entity["type"] 

2285# else: 

2286# type_existing = entity["typePattern"] 

2287# if "id" in entity.keys(): 

2288# id_existing = entity["id"] 

2289# else: 

2290# id_existing = entity["idPattern"] 

2291# # as the ID field is non optional, it has to match 

2292# # check whether the type match 

2293# # if the type field is empty, they match all types 

2294# if (type_existing == subscription_type) or\ 

2295# ('*' in subscription_type) or \ 

2296# ('*' in type_existing)\ 

2297# or (type_existing == "") or ( 

2298# subscription_type == ""): 

2299# # check if on of the subscriptions is a pattern, 

2300# or if they both refer to the same id 

2301# # Get the attrs first, to avoid code duplication 

2302# # last thing to compare is the attributes 

2303# # Assumption -> position is the same as the 

2304# entities _list 

2305# # i == j 

2306# i = subscription_subject["entities"].index(entity) 

2307# j = existing_subscription["subject"][ 

2308# "entities"].index(existing_entity) 

2309# try: 

2310# subscription_attrs = subscription_subject[ 

2311# "condition"]["attrs"][i] 

2312# except (KeyError, IndexError): 

2313# subscription_attrs = [] 

2314# try: 

2315# existing_attrs = existing_subscription[ 

2316# "subject"]["condition"]["attrs"][j] 

2317# except (KeyError, IndexError): 

2318# existing_attrs = [] 

2319# 

2320# if (".*" in subscription_id) or ('.*' in 

2321# id_existing) or (subscription_id == id_existing): 

2322# # Attributes have to match, or the have to 

2323# be an empty array 

2324# if (subscription_attrs == existing_attrs) or 

2325# (subscription_attrs == []) or (existing_attrs == []): 

2326# exists = True 

2327# # if they do not match completely or subscribe 

2328# to all ids they have to match up to a certain position 

2329# elif ("*" in subscription_id) or ('*' in 

2330# id_existing): 

2331# regex_existing = id_existing.find('*') 

2332# regex_subscription = 

2333# subscription_id.find('*') 

2334# # slice the strings to compare 

2335# if (id_existing[:regex_existing] in 

2336# subscription_id) or (subscription_id[:regex_subscription] in id_existing) or \ 

2337# (id_existing[regex_existing:] in 

2338# subscription_id) or (subscription_id[regex_subscription:] in id_existing): 

2339# if (subscription_attrs == 

2340# existing_attrs) or (subscription_attrs == []) or (existing_attrs == []): 

2341# exists = True 

2342# else: 

2343# continue 

2344# else: 

2345# continue 

2346# else: 

2347# continue 

2348# else: 

2349# continue 

2350# else: 

2351# continue 

2352# return exists 

2353#