Coverage for filip/clients/ngsi_v2/cb.py: 82%

703 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-03-10 13:43 +0000

1""" 

2Context Broker Module for API Client 

3""" 

4 

5from __future__ import annotations 

6 

7import copy 

8from copy import deepcopy 

9from enum import Enum 

10from math import inf 

11from pkg_resources import parse_version 

12from pydantic import PositiveInt, PositiveFloat, AnyHttpUrl, ValidationError 

13from pydantic.type_adapter import TypeAdapter 

14from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union 

15import re 

16import requests 

17from urllib.parse import urljoin 

18import warnings 

19from filip.clients.base_http_client import BaseHttpClient, NgsiURLVersion 

20from filip.config import settings 

21from filip.models.base import FiwareHeader, PaginationMethod, DataType 

22from filip.utils.simple_ql import QueryString 

23from filip.models.ngsi_v2.context import ( 

24 ActionType, 

25 Command, 

26 ContextEntity, 

27 ContextEntityKeyValues, 

28 ContextAttribute, 

29 NamedCommand, 

30 NamedContextAttribute, 

31 Query, 

32 Update, 

33 PropertyFormat, 

34 ContextEntityList, 

35 ContextEntityKeyValuesList, 

36) 

37from filip.models.ngsi_v2.base import AttrsFormat 

38from filip.models.ngsi_v2.subscriptions import Subscription, Message 

39from filip.models.ngsi_v2.registrations import Registration 

40 

41if TYPE_CHECKING: 

42 from filip.clients.ngsi_v2.iota import IoTAClient 

43 

44 

45class ContextBrokerClient(BaseHttpClient): 

46 """ 

47 Implementation of NGSI Context Broker functionalities, such as creating 

48 entities and subscriptions; retrieving, updating and deleting data. 

49 Further documentation: 

50 https://fiware-orion.readthedocs.io/en/master/ 

51 

52 Api specifications for v2 are located here: 

53 https://telefonicaid.github.io/fiware-orion/api/v2/stable/ 

54 

55 Note: 

56 We use the reference implementation for development. Therefore, some 

57 other brokers may show slightly different behavior! 

58 """ 

59 

60 def __init__( 

61 self, 

62 url: str = None, 

63 *, 

64 session: requests.Session = None, 

65 fiware_header: FiwareHeader = None, 

66 **kwargs, 

67 ): 

68 """ 

69 

70 Args: 

71 url: Url of context broker server 

72 session (requests.Session): 

73 fiware_header (FiwareHeader): fiware service and fiware service path 

74 **kwargs (Optional): Optional arguments that ``request`` takes. 

75 """ 

76 # set service url 

77 url = url or settings.CB_URL 

78 self._url_version = NgsiURLVersion.v2_url.value 

79 super().__init__( 

80 url=url, session=session, fiware_header=fiware_header, **kwargs 

81 ) 

82 

83 def __pagination( 

84 self, 

85 *, 

86 method: PaginationMethod = PaginationMethod.GET, 

87 url: str, 

88 headers: Dict, 

89 limit: Union[PositiveInt, PositiveFloat] = None, 

90 params: Dict = None, 

91 data: str = None, 

92 ) -> List[Dict]: 

93 """ 

94 NGSIv2 implements a pagination mechanism in order to help clients to 

95 retrieve large sets of resources. This mechanism works for all listing 

96 operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions, 

97 POST /v2/op/query, etc.). This function helps getting datasets that are 

98 larger than the limit for the different GET operations. 

99 

100 https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html 

101 

102 Args: 

103 url: Information about the url, obtained from the original function 

104 headers: The headers from the original function 

105 params: 

106 limit: 

107 

108 Returns: 

109 object: 

110 

111 """ 

112 if limit is None: 

113 limit = inf 

114 if limit > 1000: 

115 params["limit"] = 1000 # maximum items per request 

116 else: 

117 params["limit"] = limit 

118 

119 if self.session: 

120 session = self.session 

121 else: 

122 session = requests.Session() 

123 with session: 

124 res = session.request( 

125 method=method, url=url, params=params, headers=headers, data=data 

126 ) 

127 if res.ok: 

128 items = res.json() 

129 # do pagination 

130 count = int(res.headers["Fiware-Total-Count"]) 

131 

132 while len(items) < limit and len(items) < count: 

133 # Establishing the offset from where entities are retrieved 

134 params["offset"] = len(items) 

135 params["limit"] = min(1000, (limit - len(items))) 

136 res = session.request( 

137 method=method, 

138 url=url, 

139 params=params, 

140 headers=headers, 

141 data=data, 

142 ) 

143 if res.ok: 

144 items.extend(res.json()) 

145 else: 

146 res.raise_for_status() 

147 self.logger.debug("Received: %s", items) 

148 return items 

149 res.raise_for_status() 

150 

151 # MANAGEMENT API 

152 def get_version(self) -> Dict: 

153 """ 

154 Gets version of IoT Agent 

155 Returns: 

156 Dictionary with response 

157 """ 

158 url = urljoin(self.base_url, "version") 

159 try: 

160 res = self.get(url=url, headers=self.headers) 

161 if res.ok: 

162 return res.json() 

163 res.raise_for_status() 

164 except requests.RequestException as err: 

165 self.logger.error(err) 

166 raise 

167 

168 def get_resources(self) -> Dict: 

169 """ 

170 Gets reo 

171 

172 Returns: 

173 Dict 

174 """ 

175 url = urljoin(self.base_url, self._url_version) 

176 try: 

177 res = self.get(url=url, headers=self.headers) 

178 if res.ok: 

179 return res.json() 

180 res.raise_for_status() 

181 except requests.RequestException as err: 

182 self.logger.error(err) 

183 raise 

184 

185 # STATISTICS API 

186 def get_statistics(self) -> Dict: 

187 """ 

188 Gets statistics of context broker 

189 Returns: 

190 Dictionary with response 

191 """ 

192 url = urljoin(self.base_url, "statistics") 

193 try: 

194 res = self.get(url=url, headers=self.headers) 

195 if res.ok: 

196 return res.json() 

197 res.raise_for_status() 

198 except requests.RequestException as err: 

199 self.logger.error(err) 

200 raise 

201 

202 # CONTEXT MANAGEMENT API ENDPOINTS 

203 # Entity Operations 

204 def post_entity( 

205 self, 

206 entity: Union[ContextEntity, ContextEntityKeyValues], 

207 update: bool = False, 

208 patch: bool = False, 

209 override_attr_metadata: bool = True, 

210 key_values: bool = False, 

211 ): 

212 """ 

213 Function registers an Object with the NGSI Context Broker, 

214 if it already exists it can be automatically updated (overwritten) 

215 if the update bool is True. 

216 First a post request with the entity is tried, if the response code 

217 is 422 the entity is uncrossable, as it already exists there are two 

218 options, either overwrite it, if the attribute have changed 

219 (e.g. at least one new/new values) (update = True) or leave 

220 it the way it is (update=False) 

221 If you only want to manipulate the entities values, you need to set 

222 patch argument. 

223 

224 Args: 

225 entity (ContextEntity/ContextEntityKeyValues): 

226 Context Entity Object 

227 update (bool): 

228 If the response.status_code is 422, whether the override and 

229 existing entity 

230 patch (bool): 

231 If the response.status_code is 422, whether to manipulate the 

232 existing entity. Omitted if update `True`. 

233 override_attr_metadata: 

234 Only applies for patch equal to `True`. 

235 Whether to override or append the attribute's metadata. 

236 `True` for overwrite or `False` for update/append 

237 key_values(bool): 

238 By default False. If set to True, "options=keyValues" will 

239 be included in params of post request. The payload uses 

240 the keyValues simplified entity representation, i.e. 

241 ContextEntityKeyValues. 

242 """ 

243 url = urljoin(self.base_url, f"{self._url_version}/entities") 

244 headers = self.headers.copy() 

245 params = {} 

246 options = [] 

247 if key_values: 

248 assert isinstance(entity, ContextEntityKeyValues) 

249 options.append("keyValues") 

250 else: 

251 assert isinstance(entity, ContextEntity) 

252 if options: 

253 params.update({"options": ",".join(options)}) 

254 try: 

255 res = self.post( 

256 url=url, 

257 headers=headers, 

258 json=entity.model_dump(exclude_none=True), 

259 params=params, 

260 ) 

261 if res.ok: 

262 self.logger.info("Entity successfully posted!") 

263 return res.headers.get("Location") 

264 res.raise_for_status() 

265 except requests.RequestException as err: 

266 if err.response is not None: 

267 if update and err.response.status_code == 422: 

268 return self.override_entity(entity=entity, key_values=key_values) 

269 if patch and err.response.status_code == 422: 

270 if not key_values: 

271 return self.patch_entity( 

272 entity=entity, override_attr_metadata=override_attr_metadata 

273 ) 

274 else: 

275 return self._patch_entity_key_values(entity=entity) 

276 msg = f"Could not post entity {entity.id}" 

277 self.log_error(err=err, msg=msg) 

278 raise 

279 

280 def get_entity_list( 

281 self, 

282 *, 

283 entity_ids: List[str] = None, 

284 entity_types: List[str] = None, 

285 id_pattern: str = None, 

286 type_pattern: str = None, 

287 q: Union[str, QueryString] = None, 

288 mq: Union[str, QueryString] = None, 

289 georel: str = None, 

290 geometry: str = None, 

291 coords: str = None, 

292 limit: PositiveInt = inf, 

293 attrs: List[str] = None, 

294 metadata: str = None, 

295 order_by: str = None, 

296 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

297 ) -> List[Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]]: 

298 r""" 

299 Retrieves a list of context entities that match different criteria by 

300 id, type, pattern matching (either id or type) and/or those which 

301 match a query or geographical query (see Simple Query Language and 

302 Geographical Queries). A given entity has to match all the criteria 

303 to be retrieved (i.e., the criteria is combined in a logical AND 

304 way). Note that pattern matching query parameters are incompatible 

305 (i.e. mutually exclusive) with their corresponding exact matching 

306 parameters, i.e. idPattern with id and typePattern with type. 

307 

308 Args: 

309 entity_ids: A comma-separated list of elements. Retrieve entities 

310 whose ID matches one of the elements in the list. 

311 Incompatible with idPattern,e.g. Boe_Idarium 

312 entity_types: comma-separated list of elements. Retrieve entities 

313 whose type matches one of the elements in the list. 

314 Incompatible with typePattern. Example: Room. 

315 id_pattern: A correctly formatted regular expression. Retrieve 

316 entities whose ID matches the regular expression. Incompatible 

317 with id, e.g. ngsi-ld.* or sensor.* 

318 type_pattern: A correctly formatted regular expression. Retrieve 

319 entities whose type matches the regular expression. 

320 Incompatible with type, e.g. room.* 

321 q (SimpleQuery): A query expression, composed of a list of 

322 statements separated by ;, i.e., 

323 q=statement1;statement2;statement3. See Simple Query 

324 Language specification. Example: temperature>40. 

325 mq (SimpleQuery): A query expression for attribute metadata, 

326 composed of a list of statements separated by ;, i.e., 

327 mq=statement1;statement2;statement3. See Simple Query 

328 Language specification. Example: temperature.accuracy<0.9. 

329 georel: Spatial relationship between matching entities and a 

330 reference shape. See Geographical Queries. Example: 'near'. 

331 geometry: Geographical area to which the query is restricted. 

332 See Geographical Queries. Example: point. 

333 coords: List of latitude-longitude pairs of coordinates separated 

334 by ';'. See Geographical Queries. Example: 41.390205, 

335 2.154007;48.8566,2.3522. 

336 limit: Limits the number of entities to be retrieved Example: 20 

337 attrs: Comma-separated list of attribute names whose data are to 

338 be included in the response. The attributes are retrieved in 

339 the order specified by this parameter. If this parameter is 

340 not included, the attributes are retrieved in arbitrary 

341 order. See "Filtering out attributes and metadata" section 

342 for more detail. Example: seatNumber. 

343 metadata: A list of metadata names to include in the response. 

344 See "Filtering out attributes and metadata" section for more 

345 detail. Example: accuracy. 

346 order_by: Criteria for ordering results. See "Ordering Results" 

347 section for details. Example: temperature,!speed. 

348 response_format (AttrsFormat, str): Response Format. Note: That if 

349 'keyValues' or 'values' are used the response model will 

350 change to List[ContextEntityKeyValues] and to List[Dict[str, 

351 Any]], respectively. 

352 Returns: 

353 

354 """ 

355 url = urljoin(self.base_url, f"{self._url_version}/entities/") 

356 headers = self.headers.copy() 

357 params = {} 

358 

359 if entity_ids and id_pattern: 

360 raise ValueError 

361 if entity_types and type_pattern: 

362 raise ValueError 

363 if entity_ids: 

364 if not isinstance(entity_ids, list): 

365 entity_ids = [entity_ids] 

366 params.update({"id": ",".join(entity_ids)}) 

367 if id_pattern: 

368 try: 

369 re.compile(id_pattern) 

370 except re.error as err: 

371 raise ValueError(f"Invalid Pattern: {err}") from err 

372 params.update({"idPattern": id_pattern}) 

373 if entity_types: 

374 if not isinstance(entity_types, list): 

375 entity_types = [entity_types] 

376 params.update({"type": ",".join(entity_types)}) 

377 if type_pattern: 

378 try: 

379 re.compile(type_pattern) 

380 except re.error as err: 

381 raise ValueError(f"Invalid Pattern: {err.msg}") from err 

382 params.update({"typePattern": type_pattern}) 

383 if attrs: 

384 params.update({"attrs": ",".join(attrs)}) 

385 if metadata: 

386 params.update({"metadata": ",".join(metadata)}) 

387 if q: 

388 if isinstance(q, str): 

389 q = QueryString.parse_str(q) 

390 params.update({"q": str(q)}) 

391 if mq: 

392 params.update({"mq": str(mq)}) 

393 if geometry: 

394 params.update({"geometry": geometry}) 

395 if georel: 

396 params.update({"georel": georel}) 

397 if coords: 

398 params.update({"coords": coords}) 

399 if order_by: 

400 params.update({"orderBy": order_by}) 

401 if response_format not in list(AttrsFormat): 

402 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

403 response_format = ",".join(["count", response_format]) 

404 params.update({"options": response_format}) 

405 try: 

406 items = self.__pagination( 

407 method=PaginationMethod.GET, 

408 limit=limit, 

409 url=url, 

410 params=params, 

411 headers=headers, 

412 ) 

413 if AttrsFormat.NORMALIZED in response_format: 

414 return ContextEntityList.model_validate({"entities": items}).entities 

415 elif AttrsFormat.KEY_VALUES in response_format: 

416 return ContextEntityKeyValuesList.model_validate( 

417 {"entities": items} 

418 ).entities 

419 return items # in case of VALUES as response_format 

420 except requests.RequestException as err: 

421 msg = "Could not load entities" 

422 self.log_error(err=err, msg=msg) 

423 raise 

424 

425 def get_entity( 

426 self, 

427 entity_id: str, 

428 entity_type: str = None, 

429 attrs: List[str] = None, 

430 metadata: List[str] = None, 

431 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

432 ) -> Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]: 

433 """ 

434 This operation must return one entity element only, but there may be 

435 more than one entity with the same ID (e.g. entities with same ID but 

436 different types). In such case, an error message is returned, with 

437 the HTTP status code set to 409 Conflict. 

438 

439 Args: 

440 entity_id (String): Id of the entity to be retrieved 

441 entity_type (String): Entity type, to avoid ambiguity in case 

442 there are several entities with the same entity id. 

443 attrs (List of Strings): List of attribute names whose data must be 

444 included in the response. The attributes are retrieved in the 

445 order specified by this parameter. 

446 See "Filtering out attributes and metadata" section for more 

447 detail. If this parameter is not included, the attributes are 

448 retrieved in arbitrary order, and all the attributes of the 

449 entity are included in the response. 

450 Example: temperature, humidity. 

451 metadata (List of Strings): A list of metadata names to include in 

452 the response. See "Filtering out attributes and metadata" 

453 section for more detail. Example: accuracy. 

454 response_format (AttrsFormat, str): Representation format of 

455 response 

456 Returns: 

457 ContextEntity 

458 """ 

459 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}") 

460 headers = self.headers.copy() 

461 params = {} 

462 if entity_type: 

463 params.update({"type": entity_type}) 

464 if attrs: 

465 params.update({"attrs": ",".join(attrs)}) 

466 if metadata: 

467 params.update({"metadata": ",".join(metadata)}) 

468 if response_format not in list(AttrsFormat): 

469 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

470 params.update({"options": response_format}) 

471 

472 try: 

473 res = self.get(url=url, params=params, headers=headers) 

474 if res.ok: 

475 self.logger.info("Entity successfully retrieved!") 

476 self.logger.debug("Received: %s", res.json()) 

477 if response_format == AttrsFormat.NORMALIZED: 

478 return ContextEntity(**res.json()) 

479 if response_format == AttrsFormat.KEY_VALUES: 

480 return ContextEntityKeyValues(**res.json()) 

481 return res.json() 

482 res.raise_for_status() 

483 except requests.RequestException as err: 

484 msg = f"Could not load entity {entity_id}" 

485 self.log_error(err=err, msg=msg) 

486 raise 

487 

488 def get_entity_attributes( 

489 self, 

490 entity_id: str, 

491 entity_type: str = None, 

492 attrs: List[str] = None, 

493 metadata: List[str] = None, 

494 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

495 ) -> Dict[str, ContextAttribute]: 

496 """ 

497 This request is similar to retrieving the whole entity, however this 

498 one omits the id and type fields. Just like the general request of 

499 getting an entire entity, this operation must return only one entity 

500 element. If more than one entity with the same ID is found (e.g. 

501 entities with same ID but different type), an error message is 

502 returned, with the HTTP status code set to 409 Conflict. 

503 

504 Args: 

505 entity_id (String): Id of the entity to be retrieved 

506 entity_type (String): Entity type, to avoid ambiguity in case 

507 there are several entities with the same entity id. 

508 attrs (List of Strings): List of attribute names whose data must be 

509 included in the response. The attributes are retrieved in the 

510 order specified by this parameter. 

511 See "Filtering out attributes and metadata" section for more 

512 detail. If this parameter is not included, the attributes are 

513 retrieved in arbitrary order, and all the attributes of the 

514 entity are included in the response. Example: temperature, 

515 humidity. 

516 metadata (List of Strings): A list of metadata names to include in 

517 the response. See "Filtering out attributes and metadata" 

518 section for more detail. Example: accuracy. 

519 response_format (AttrsFormat, str): Representation format of 

520 response 

521 Returns: 

522 Dict 

523 """ 

524 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs") 

525 headers = self.headers.copy() 

526 params = {} 

527 if entity_type: 

528 params.update({"type": entity_type}) 

529 if attrs: 

530 params.update({"attrs": ",".join(attrs)}) 

531 if metadata: 

532 params.update({"metadata": ",".join(metadata)}) 

533 if response_format not in list(AttrsFormat): 

534 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

535 params.update({"options": response_format}) 

536 try: 

537 res = self.get(url=url, params=params, headers=headers) 

538 if res.ok: 

539 if response_format == AttrsFormat.NORMALIZED: 

540 return { 

541 key: ContextAttribute(**values) 

542 for key, values in res.json().items() 

543 } 

544 return res.json() 

545 res.raise_for_status() 

546 except requests.RequestException as err: 

547 msg = f"Could not load attributes from entity {entity_id} !" 

548 self.log_error(err=err, msg=msg) 

549 raise 

550 

551 def update_entity( 

552 self, 

553 entity: Union[ContextEntity, ContextEntityKeyValues, dict], 

554 append_strict: bool = False, 

555 key_values: bool = False, 

556 ): 

557 """ 

558 The request payload is an object representing the attributes to 

559 append or update. 

560 

561 Note: 

562 Update means overwriting the existing entity. If you want to 

563 manipulate you should rather use patch_entity. 

564 

565 Args: 

566 entity (ContextEntity): 

567 append_strict: If `False` the entity attributes are updated (if they 

568 previously exist) or appended (if they don't previously exist) 

569 with the ones in the payload. 

570 If `True` all the attributes in the payload not 

571 previously existing in the entity are appended. In addition 

572 to that, in case some of the attributes in the payload 

573 already exist in the entity, an error is returned. 

574 More precisely this means a strict append procedure. 

575 key_values: By default False. If set to True, the payload uses 

576 the keyValues simplified entity representation, i.e. 

577 ContextEntityKeyValues. 

578 Returns: 

579 None 

580 """ 

581 if key_values: 

582 if isinstance(entity, dict): 

583 entity = copy.deepcopy(entity) 

584 _id = entity.pop("id") 

585 _type = entity.pop("type") 

586 attrs = entity 

587 entity = ContextEntityKeyValues(id=_id, type=_type) 

588 else: 

589 attrs = entity.model_dump(exclude={"id", "type"}) 

590 else: 

591 attrs = entity.get_attributes() 

592 self.update_or_append_entity_attributes( 

593 entity_id=entity.id, 

594 entity_type=entity.type, 

595 attrs=attrs, 

596 append_strict=append_strict, 

597 key_values=key_values, 

598 ) 

599 

600 def update_entity_properties( 

601 self, entity: ContextEntity, append_strict: bool = False 

602 ): 

603 """ 

604 The request payload is an object representing the attributes, of any type 

605 but Relationship, to append or update. 

606 

607 Note: 

608 Update means overwriting the existing entity. If you want to 

609 manipulate you should rather use patch_entity. 

610 

611 Args: 

612 entity (ContextEntity): 

613 append_strict: If `False` the entity attributes are updated (if they 

614 previously exist) or appended (if they don't previously exist) 

615 with the ones in the payload. 

616 If `True` all the attributes in the payload not 

617 previously existing in the entity are appended. In addition 

618 to that, in case some of the attributes in the payload 

619 already exist in the entity, an error is returned. 

620 More precisely this means a strict append procedure. 

621 

622 Returns: 

623 None 

624 """ 

625 self.update_or_append_entity_attributes( 

626 entity_id=entity.id, 

627 entity_type=entity.type, 

628 attrs=entity.get_properties(), 

629 append_strict=append_strict, 

630 ) 

631 

632 def update_entity_relationships( 

633 self, entity: ContextEntity, append_strict: bool = False 

634 ): 

635 """ 

636 The request payload is an object representing only the attributes, of type 

637 Relationship, to append or update. 

638 

639 Note: 

640 Update means overwriting the existing entity. If you want to 

641 manipulate you should rather use patch_entity. 

642 

643 Args: 

644 entity (ContextEntity): 

645 append_strict: If `False` the entity attributes are updated (if they 

646 previously exist) or appended (if they don't previously exist) 

647 with the ones in the payload. 

648 If `True` all the attributes in the payload not 

649 previously existing in the entity are appended. In addition 

650 to that, in case some of the attributes in the payload 

651 already exist in the entity, an error is returned. 

652 More precisely this means a strict append procedure. 

653 

654 Returns: 

655 None 

656 """ 

657 self.update_or_append_entity_attributes( 

658 entity_id=entity.id, 

659 entity_type=entity.type, 

660 attrs=entity.get_relationships(), 

661 append_strict=append_strict, 

662 ) 

663 

664 def delete_entity( 

665 self, 

666 entity_id: str, 

667 entity_type: str = None, 

668 delete_devices: bool = False, 

669 iota_client: IoTAClient = None, 

670 iota_url: AnyHttpUrl = settings.IOTA_URL, 

671 ) -> None: 

672 """ 

673 Remove a entity from the context broker. No payload is required 

674 or received. 

675 

676 Args: 

677 entity_id: 

678 Id of the entity to be deleted 

679 entity_type: 

680 Entity type, to avoid ambiguity in case there are several 

681 entities with the same entity id. 

682 delete_devices: 

683 If True, also delete all devices that reference this 

684 entity (entity_id as entity_name) 

685 iota_client: 

686 Corresponding IoTA-Client used to access IoTA-Agent 

687 iota_url: 

688 URL of the corresponding IoT-Agent. This will autogenerate 

689 an IoTA-Client, mirroring the information of the 

690 ContextBrokerClient, e.g. FiwareHeader, and other headers 

691 

692 Returns: 

693 None 

694 """ 

695 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}") 

696 headers = self.headers.copy() 

697 if entity_type: 

698 params = {"type": entity_type} 

699 else: 

700 params = None 

701 try: 

702 res = self.delete(url=url, params=params, headers=headers) 

703 if res.ok: 

704 self.logger.info("Entity '%s' successfully deleted!", entity_id) 

705 else: 

706 res.raise_for_status() 

707 except requests.RequestException as err: 

708 msg = f"Could not delete entity {entity_id} !" 

709 self.log_error(err=err, msg=msg) 

710 raise 

711 

712 if delete_devices: 

713 from filip.clients.ngsi_v2 import IoTAClient 

714 

715 if iota_client: 

716 iota_client_local = deepcopy(iota_client) 

717 else: 

718 warnings.warn( 

719 "No IoTA-Client object provided! " 

720 "Will try to generate one. " 

721 "This usage is not recommended." 

722 ) 

723 

724 iota_client_local = IoTAClient( 

725 url=iota_url, 

726 fiware_header=self.fiware_headers, 

727 headers=self.headers, 

728 ) 

729 

730 for device in iota_client_local.get_device_list(entity_names=[entity_id]): 

731 if entity_type: 

732 if device.entity_type == entity_type: 

733 iota_client_local.delete_device(device_id=device.device_id) 

734 else: 

735 iota_client_local.delete_device(device_id=device.device_id) 

736 iota_client_local.close() 

737 

738 def delete_entities(self, entities: List[ContextEntity]) -> None: 

739 """ 

740 Remove a list of entities from the context broker. This methode is 

741 more efficient than to call delete_entity() for each entity 

742 

743 Args: 

744 entities: List[ContextEntity]: List of entities to be deleted 

745 

746 Raises: 

747 Exception, if one of the entities is not in the ContextBroker 

748 

749 Returns: 

750 None 

751 """ 

752 

753 # update() delete, deletes all entities without attributes completely, 

754 # and removes the attributes for the other 

755 # The entities are sorted based on the fact if they have 

756 # attributes. 

757 entities_with_attributes: List[ContextEntity] = [] 

758 for entity in entities: 

759 attribute_names = [ 

760 key 

761 for key in entity.model_dump() 

762 if key not in ContextEntity.model_fields 

763 ] 

764 if len(attribute_names) > 0: 

765 entities_with_attributes.append( 

766 ContextEntity(id=entity.id, type=entity.type) 

767 ) 

768 

769 # Post update_delete for those without attribute only once, 

770 # for the other post update_delete again but for the changed entity 

771 # in the ContextBroker (only id and type left) 

772 if len(entities) > 0: 

773 self.update(entities=entities, action_type="delete") 

774 if len(entities_with_attributes) > 0: 

775 self.update(entities=entities_with_attributes, action_type="delete") 

776 

777 def update_or_append_entity_attributes( 

778 self, 

779 entity_id: str, 

780 attrs: Union[ 

781 List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any] 

782 ], 

783 entity_type: str = None, 

784 append_strict: bool = False, 

785 forcedUpdate: bool = False, 

786 key_values: bool = False, 

787 ): 

788 """ 

789 The request payload is an object representing the attributes to 

790 append or update. This corresponds to a 'POST' request if append is 

791 set to 'False' 

792 

793 Note: 

794 Be careful not to update attributes that are 

795 provided via context registration, e.g. commands. Commands are 

796 removed before sending the request. To avoid breaking things. 

797 

798 Args: 

799 entity_id: Entity id to be updated 

800 entity_type: Entity type, to avoid ambiguity in case there are 

801 several entities with the same entity id. 

802 attrs: List of attributes to update or to append 

803 append_strict: If `False` the entity attributes are updated (if they 

804 previously exist) or appended (if they don't previously exist) 

805 with the ones in the payload. 

806 If `True` all the attributes in the payload not 

807 previously existing in the entity are appended. In addition 

808 to that, in case some of the attributes in the payload 

809 already exist in the entity, an error is returned. 

810 More precisely this means a strict append procedure. 

811 forcedUpdate: Update operation have to trigger any matching 

812 subscription, no matter if there is an actual attribute 

813 update or no instead of the default behavior, which is to 

814 updated only if attribute is effectively updated. 

815 key_values: By default False. If set to True, the payload uses 

816 the keyValues simplified entity representation, i.e. 

817 ContextEntityKeyValues. 

818 Returns: 

819 None 

820 

821 """ 

822 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs") 

823 headers = self.headers.copy() 

824 params = {} 

825 if entity_type: 

826 params.update({"type": entity_type}) 

827 else: 

828 entity_type = "dummy" 

829 

830 options = [] 

831 if append_strict: 

832 options.append("append") 

833 if forcedUpdate: 

834 options.append("forcedUpdate") 

835 if key_values: 

836 assert isinstance(attrs, dict), "for keyValues attrs has to be a dict" 

837 options.append("keyValues") 

838 if options: 

839 params.update({"options": ",".join(options)}) 

840 

841 if key_values: 

842 entity = ContextEntityKeyValues(id=entity_id, type=entity_type, **attrs) 

843 else: 

844 entity = ContextEntity(id=entity_id, type=entity_type) 

845 entity.add_attributes(attrs) 

846 # exclude commands from the send data, 

847 # as they live in the IoTA-agent 

848 excluded_keys = {"id", "type"} 

849 # excluded_keys.update( 

850 # entity.get_commands(response_format=PropertyFormat.DICT).keys() 

851 # ) 

852 try: 

853 res = self.post( 

854 url=url, 

855 headers=headers, 

856 json=entity.model_dump(exclude=excluded_keys, exclude_none=True), 

857 params=params, 

858 ) 

859 if res.ok: 

860 self.logger.info("Entity '%s' successfully " "updated!", entity.id) 

861 else: 

862 res.raise_for_status() 

863 except requests.RequestException as err: 

864 msg = f"Could not update or append attributes of entity" f" {entity.id} !" 

865 self.log_error(err=err, msg=msg) 

866 raise 

867 

868 def _patch_entity_key_values( 

869 self, 

870 entity: Union[ContextEntityKeyValues, dict], 

871 ): 

872 """ 

873 The entity are updated with a ContextEntityKeyValues object or a 

874 dictionary contain the simplified entity data. This corresponds to a 

875 'PATCH' request. 

876 Only existing attribute can be updated! 

877 

878 Args: 

879 entity: A ContextEntityKeyValues object or a dictionary contain 

880 the simplified entity data 

881 

882 """ 

883 if isinstance(entity, dict): 

884 entity = ContextEntityKeyValues(**entity) 

885 url = urljoin(self.base_url, f"v2/entities/{entity.id}/attrs") 

886 headers = self.headers.copy() 

887 params = {"type": entity.type, "options": AttrsFormat.KEY_VALUES.value} 

888 try: 

889 res = self.patch( 

890 url=url, 

891 headers=headers, 

892 json=entity.model_dump(exclude={"id", "type"}, exclude_unset=True), 

893 params=params, 

894 ) 

895 if res.ok: 

896 self.logger.info("Entity '%s' successfully " "updated!", entity.id) 

897 else: 

898 res.raise_for_status() 

899 except requests.RequestException as err: 

900 msg = f"Could not update attributes of entity" f" {entity.id} !" 

901 self.log_error(err=err, msg=msg) 

902 raise 

903 

904 def update_existing_entity_attributes( 

905 self, 

906 entity_id: str, 

907 attrs: Union[ 

908 List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any] 

909 ], 

910 entity_type: str = None, 

911 forcedUpdate: bool = False, 

912 override_metadata: bool = False, 

913 key_values: bool = False, 

914 ): 

915 """ 

916 The entity attributes are updated with the ones in the payload. 

917 In addition to that, if one or more attributes in the payload doesn't 

918 exist in the entity, an error is returned. This corresponds to a 

919 'PATCH' request. 

920 

921 Args: 

922 entity_id: Entity id to be updated 

923 entity_type: Entity type, to avoid ambiguity in case there are 

924 several entities with the same entity id. 

925 attrs: List of attributes to update or to append 

926 forcedUpdate: Update operation have to trigger any matching 

927 subscription, no matter if there is an actual attribute 

928 update or no instead of the default behavior, which is to 

929 updated only if attribute is effectively updated. 

930 override_metadata: 

931 Bool,replace the existing metadata with the one provided in 

932 the request 

933 key_values: By default False. If set to True, the payload uses 

934 the keyValues simplified entity representation, i.e. 

935 ContextEntityKeyValues. 

936 Returns: 

937 None 

938 

939 """ 

940 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs") 

941 headers = self.headers.copy() 

942 if entity_type: 

943 params = {"type": entity_type} 

944 else: 

945 params = None 

946 entity_type = "dummy" 

947 

948 options = [] 

949 if override_metadata: 

950 options.append("overrideMetadata") 

951 if forcedUpdate: 

952 options.append("forcedUpdate") 

953 if key_values: 

954 assert isinstance(attrs, dict), "for keyValues the attrs must be dict" 

955 payload = attrs 

956 options.append("keyValues") 

957 else: 

958 entity = ContextEntity(id=entity_id, type=entity_type) 

959 entity.add_attributes(attrs) 

960 payload = entity.model_dump(exclude={"id", "type"}, exclude_none=True) 

961 if options: 

962 params.update({"options": ",".join(options)}) 

963 

964 try: 

965 res = self.patch( 

966 url=url, 

967 headers=headers, 

968 json=payload, 

969 params=params, 

970 ) 

971 if res.ok: 

972 self.logger.info("Entity '%s' successfully " "updated!", entity_id) 

973 else: 

974 res.raise_for_status() 

975 except requests.RequestException as err: 

976 msg = f"Could not update attributes of entity" f" {entity_id} !" 

977 self.log_error(err=err, msg=msg) 

978 raise 

979 

980 def override_entity( 

981 self, entity: Union[ContextEntity, ContextEntityKeyValues], **kwargs 

982 ): 

983 """ 

984 The request payload is an object representing the attributes to 

985 override the existing entity. 

986 

987 Note: 

988 If you want to manipulate you should rather use patch_entity. 

989 

990 Args: 

991 entity (ContextEntity or ContextEntityKeyValues): 

992 Returns: 

993 None 

994 """ 

995 return self.replace_entity_attributes( 

996 entity_id=entity.id, 

997 entity_type=entity.type, 

998 attrs=entity.get_attributes(), 

999 **kwargs, 

1000 ) 

1001 

1002 def replace_entity_attributes( 

1003 self, 

1004 entity_id: str, 

1005 attrs: Union[ 

1006 List[Union[NamedContextAttribute, Dict[str, ContextAttribute]]], Dict 

1007 ], 

1008 entity_type: str = None, 

1009 forcedUpdate: bool = False, 

1010 key_values: bool = False, 

1011 ): 

1012 """ 

1013 The attributes previously existing in the entity are removed and 

1014 replaced by the ones in the request. This corresponds to a 'PUT' 

1015 request. 

1016 

1017 Args: 

1018 entity_id: Entity id to be updated 

1019 entity_type: Entity type, to avoid ambiguity in case there are 

1020 several entities with the same entity id. 

1021 attrs: List of attributes to add to the entity or dict of 

1022 attributes in case of key_values=True. 

1023 forcedUpdate: Update operation have to trigger any matching 

1024 subscription, no matter if there is an actual attribute 

1025 update or no instead of the default behavior, which is to 

1026 updated only if attribute is effectively updated. 

1027 key_values(bool): 

1028 By default False. If set to True, "options=keyValues" will 

1029 be included in params of the request. The payload uses 

1030 the keyValues simplified entity representation, i.e. 

1031 ContextEntityKeyValues. 

1032 Returns: 

1033 None 

1034 """ 

1035 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs") 

1036 headers = self.headers.copy() 

1037 params = {} 

1038 options = [] 

1039 if entity_type: 

1040 params.update({"type": entity_type}) 

1041 else: 

1042 entity_type = "dummy" 

1043 

1044 if forcedUpdate: 

1045 options.append("forcedUpdate") 

1046 

1047 if key_values: 

1048 options.append("keyValues") 

1049 assert isinstance(attrs, dict) 

1050 else: 

1051 entity = ContextEntity(id=entity_id, type=entity_type) 

1052 entity.add_attributes(attrs) 

1053 attrs = entity.model_dump(exclude={"id", "type"}, exclude_none=True) 

1054 if options: 

1055 params.update({"options": ",".join(options)}) 

1056 

1057 try: 

1058 res = self.put( 

1059 url=url, 

1060 headers=headers, 

1061 json=attrs, 

1062 params=params, 

1063 ) 

1064 if res.ok: 

1065 self.logger.info("Entity '%s' successfully " "updated!", entity_id) 

1066 else: 

1067 res.raise_for_status() 

1068 except requests.RequestException as err: 

1069 msg = f"Could not replace attribute of entity {entity_id} !" 

1070 self.log_error(err=err, msg=msg) 

1071 raise 

1072 

1073 # Attribute operations 

1074 def get_attribute( 

1075 self, 

1076 entity_id: str, 

1077 attr_name: str, 

1078 entity_type: str = None, 

1079 metadata: str = None, 

1080 response_format="", 

1081 ) -> ContextAttribute: 

1082 """ 

1083 Retrieves a specified attribute from an entity. 

1084 

1085 Args: 

1086 entity_id: Id of the entity. Example: Bcn_Welt 

1087 attr_name: Name of the attribute to be retrieved. 

1088 entity_type (Optional): Type of the entity to retrieve 

1089 metadata (Optional): A list of metadata names to include in the 

1090 response. See "Filtering out attributes and metadata" section 

1091 for more detail. 

1092 

1093 Returns: 

1094 The content of the retrieved attribute as ContextAttribute 

1095 

1096 Raises: 

1097 Error 

1098 

1099 """ 

1100 url = urljoin( 

1101 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}" 

1102 ) 

1103 headers = self.headers.copy() 

1104 params = {} 

1105 if entity_type: 

1106 params.update({"type": entity_type}) 

1107 if metadata: 

1108 params.update({"metadata": ",".join(metadata)}) 

1109 try: 

1110 res = self.get(url=url, params=params, headers=headers) 

1111 if res.ok: 

1112 self.logger.debug("Received: %s", res.json()) 

1113 return ContextAttribute(**res.json()) 

1114 res.raise_for_status() 

1115 except requests.RequestException as err: 

1116 msg = ( 

1117 f"Could not load attribute '{attr_name}' from entity" f"'{entity_id}' " 

1118 ) 

1119 self.log_error(err=err, msg=msg) 

1120 raise 

1121 

1122 def update_entity_attribute( 

1123 self, 

1124 entity_id: str, 

1125 attr: Union[ContextAttribute, NamedContextAttribute], 

1126 *, 

1127 entity_type: str = None, 

1128 attr_name: str = None, 

1129 override_metadata: bool = True, 

1130 forcedUpdate: bool = False, 

1131 ): 

1132 """ 

1133 Updates a specified attribute from an entity. 

1134 

1135 Args: 

1136 attr: 

1137 context attribute to update 

1138 entity_id: 

1139 Id of the entity. Example: Bcn_Welt 

1140 entity_type: 

1141 Entity type, to avoid ambiguity in case there are 

1142 several entities with the same entity id. 

1143 forcedUpdate: Update operation have to trigger any matching 

1144 subscription, no matter if there is an actual attribute 

1145 update or no instead of the default behavior, which is to 

1146 updated only if attribute is effectively updated. 

1147 attr_name: 

1148 Name of the attribute to be updated. 

1149 override_metadata: 

1150 Bool, if set to `True` (default) the metadata will be 

1151 overwritten. This is for backwards compatibility reasons. 

1152 If `False` the metadata values will be either updated if 

1153 already existing or append if not. 

1154 See also: 

1155 https://fiware-orion.readthedocs.io/en/master/user/metadata.html 

1156 """ 

1157 headers = self.headers.copy() 

1158 if not isinstance(attr, NamedContextAttribute): 

1159 assert attr_name is not None, ( 

1160 "Missing name for attribute. " 

1161 "attr_name must be present if" 

1162 "attr is of type ContextAttribute" 

1163 ) 

1164 else: 

1165 assert attr_name is None, ( 

1166 "Invalid argument attr_name. Do not set " 

1167 "attr_name if attr is of type " 

1168 "NamedContextAttribute" 

1169 ) 

1170 attr_name = attr.name 

1171 

1172 url = urljoin( 

1173 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}" 

1174 ) 

1175 params = {} 

1176 if entity_type: 

1177 params.update({"type": entity_type}) 

1178 # set overrideMetadata option (we assure backwards compatibility here) 

1179 options = [] 

1180 if override_metadata: 

1181 options.append("overrideMetadata") 

1182 if forcedUpdate: 

1183 options.append("forcedUpdate") 

1184 if options: 

1185 params.update({"options": ",".join(options)}) 

1186 try: 

1187 res = self.put( 

1188 url=url, 

1189 headers=headers, 

1190 params=params, 

1191 json=attr.model_dump(exclude={"name"}, exclude_none=True), 

1192 ) 

1193 if res.ok: 

1194 self.logger.info( 

1195 "Attribute '%s' of '%s' " "successfully updated!", 

1196 attr_name, 

1197 entity_id, 

1198 ) 

1199 else: 

1200 res.raise_for_status() 

1201 except requests.RequestException as err: 

1202 msg = ( 

1203 f"Could not update attribute '{attr_name}' of entity" f"'{entity_id}' " 

1204 ) 

1205 self.log_error(err=err, msg=msg) 

1206 raise 

1207 

1208 def delete_entity_attribute( 

1209 self, entity_id: str, attr_name: str, entity_type: str = None 

1210 ) -> None: 

1211 """ 

1212 Removes a specified attribute from an entity. 

1213 

1214 Args: 

1215 entity_id: Id of the entity. 

1216 attr_name: Name of the attribute to be retrieved. 

1217 entity_type: Entity type, to avoid ambiguity in case there are 

1218 several entities with the same entity id. 

1219 Raises: 

1220 Error 

1221 

1222 """ 

1223 url = urljoin( 

1224 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}" 

1225 ) 

1226 headers = self.headers.copy() 

1227 params = {} 

1228 if entity_type: 

1229 params.update({"type": entity_type}) 

1230 try: 

1231 res = self.delete(url=url, headers=headers) 

1232 if res.ok: 

1233 self.logger.info( 

1234 "Attribute '%s' of '%s' " "successfully deleted!", 

1235 attr_name, 

1236 entity_id, 

1237 ) 

1238 else: 

1239 res.raise_for_status() 

1240 except requests.RequestException as err: 

1241 msg = f"Could not delete attribute '{attr_name}' of entity '{entity_id}'" 

1242 self.log_error(err=err, msg=msg) 

1243 raise 

1244 

1245 # Attribute value operations 

1246 def get_attribute_value( 

1247 self, entity_id: str, attr_name: str, entity_type: str = None 

1248 ) -> Any: 

1249 """ 

1250 This operation returns the value property with the value of the 

1251 attribute. 

1252 

1253 Args: 

1254 entity_id: Id of the entity. Example: Bcn_Welt 

1255 attr_name: Name of the attribute to be retrieved. 

1256 Example: temperature. 

1257 entity_type: Entity type, to avoid ambiguity in case there are 

1258 several entities with the same entity id. 

1259 

1260 Returns: 

1261 

1262 """ 

1263 url = urljoin( 

1264 self.base_url, 

1265 f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value", 

1266 ) 

1267 headers = self.headers.copy() 

1268 params = {} 

1269 if entity_type: 

1270 params.update({"type": entity_type}) 

1271 try: 

1272 res = self.get(url=url, params=params, headers=headers) 

1273 if res.ok: 

1274 self.logger.debug("Received: %s", res.json()) 

1275 return res.json() 

1276 res.raise_for_status() 

1277 except requests.RequestException as err: 

1278 msg = ( 

1279 f"Could not load value of attribute '{attr_name}' from " 

1280 f"entity'{entity_id}' " 

1281 ) 

1282 self.log_error(err=err, msg=msg) 

1283 raise 

1284 

1285 def update_attribute_value( 

1286 self, 

1287 *, 

1288 entity_id: str, 

1289 attr_name: str, 

1290 value: Any, 

1291 entity_type: str = None, 

1292 forcedUpdate: bool = False, 

1293 ): 

1294 """ 

1295 Updates the value of a specified attribute of an entity 

1296 

1297 Args: 

1298 value: update value 

1299 entity_id: Id of the entity. Example: Bcn_Welt 

1300 attr_name: Name of the attribute to be retrieved. 

1301 Example: temperature. 

1302 entity_type: Entity type, to avoid ambiguity in case there are 

1303 several entities with the same entity id. 

1304 forcedUpdate: Update operation have to trigger any matching 

1305 subscription, no matter if there is an actual attribute 

1306 update or no instead of the default behavior, which is to 

1307 updated only if attribute is effectively updated. 

1308 Returns: 

1309 

1310 """ 

1311 url = urljoin( 

1312 self.base_url, 

1313 f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value", 

1314 ) 

1315 headers = self.headers.copy() 

1316 params = {} 

1317 if entity_type: 

1318 params.update({"type": entity_type}) 

1319 options = [] 

1320 if forcedUpdate: 

1321 options.append("forcedUpdate") 

1322 if options: 

1323 params.update({"options": ",".join(options)}) 

1324 try: 

1325 if not isinstance(value, (dict, list)): 

1326 headers.update({"Content-Type": "text/plain"}) 

1327 if isinstance(value, str): 

1328 value = f"{value}" 

1329 res = self.put(url=url, headers=headers, json=value, params=params) 

1330 else: 

1331 res = self.put(url=url, headers=headers, json=value, params=params) 

1332 if res.ok: 

1333 self.logger.info( 

1334 "Attribute '%s' of '%s' " "successfully updated!", 

1335 attr_name, 

1336 entity_id, 

1337 ) 

1338 else: 

1339 res.raise_for_status() 

1340 except requests.RequestException as err: 

1341 msg = ( 

1342 f"Could not update value of attribute '{attr_name}' from " 

1343 f"entity '{entity_id}' " 

1344 ) 

1345 self.log_error(err=err, msg=msg) 

1346 raise 

1347 

1348 # Types Operations 

1349 def get_entity_types( 

1350 self, *, limit: int = None, offset: int = None, options: str = None 

1351 ) -> List[Dict[str, Any]]: 

1352 """ 

1353 

1354 Args: 

1355 limit: Limit the number of types to be retrieved. 

1356 offset: Skip a number of records. 

1357 options: Options dictionary. Allowed: count, values 

1358 

1359 Returns: 

1360 

1361 """ 

1362 url = urljoin(self.base_url, f"{self._url_version}/types") 

1363 headers = self.headers.copy() 

1364 params = {} 

1365 if limit: 

1366 params.update({"limit": limit}) 

1367 if offset: 

1368 params.update({"offset": offset}) 

1369 if options: 

1370 params.update({"options": options}) 

1371 try: 

1372 res = self.get(url=url, params=params, headers=headers) 

1373 if res.ok: 

1374 self.logger.debug("Received: %s", res.json()) 

1375 return res.json() 

1376 res.raise_for_status() 

1377 except requests.RequestException as err: 

1378 msg = "Could not load entity types!" 

1379 self.log_error(err=err, msg=msg) 

1380 raise 

1381 

1382 def get_entity_type(self, entity_type: str) -> Dict[str, Any]: 

1383 """ 

1384 

1385 Args: 

1386 entity_type: Entity Type. Example: Room 

1387 

1388 Returns: 

1389 

1390 """ 

1391 url = urljoin(self.base_url, f"{self._url_version}/types/{entity_type}") 

1392 headers = self.headers.copy() 

1393 params = {} 

1394 try: 

1395 res = self.get(url=url, params=params, headers=headers) 

1396 if res.ok: 

1397 self.logger.debug("Received: %s", res.json()) 

1398 return res.json() 

1399 res.raise_for_status() 

1400 except requests.RequestException as err: 

1401 msg = f"Could not load entities of type" f"'{entity_type}' " 

1402 self.log_error(err=err, msg=msg) 

1403 raise 

1404 

1405 # SUBSCRIPTION API ENDPOINTS 

1406 def get_subscription_list(self, limit: PositiveInt = inf) -> List[Subscription]: 

1407 """ 

1408 Returns a list of all the subscriptions present in the system. 

1409 Args: 

1410 limit: Limit the number of subscriptions to be retrieved 

1411 Returns: 

1412 list of subscriptions 

1413 """ 

1414 url = urljoin(self.base_url, f"{self._url_version}/subscriptions/") 

1415 headers = self.headers.copy() 

1416 params = {} 

1417 

1418 # We always use the 'count' option to check weather pagination is 

1419 # required 

1420 params.update({"options": "count"}) 

1421 try: 

1422 items = self.__pagination( 

1423 limit=limit, url=url, params=params, headers=headers 

1424 ) 

1425 adapter = TypeAdapter(List[Subscription]) 

1426 return adapter.validate_python(items) 

1427 except requests.RequestException as err: 

1428 msg = "Could not load subscriptions!" 

1429 self.log_error(err=err, msg=msg) 

1430 raise 

1431 

1432 def post_subscription( 

1433 self, 

1434 subscription: Subscription, 

1435 update: bool = False, 

1436 skip_initial_notification: bool = False, 

1437 ) -> str: 

1438 """ 

1439 Creates a new subscription. The subscription is represented by a 

1440 Subscription object defined in filip.cb.models. 

1441 

1442 If the subscription already exists, the adding is prevented and the id 

1443 of the existing subscription is returned. 

1444 

1445 A subscription is deemed as already existing if there exists a 

1446 subscription with the exact same subject and notification fields. All 

1447 optional fields are not considered. 

1448 

1449 Args: 

1450 subscription: Subscription 

1451 update: True - If the subscription already exists, update it 

1452 False- If the subscription already exists, throw warning 

1453 skip_initial_notification: True - Initial Notifications will be 

1454 sent to recipient containing the whole data. This is 

1455 deprecated and removed from version 3.0 of the context broker. 

1456 False - skip the initial notification 

1457 Returns: 

1458 str: Id of the (created) subscription 

1459 

1460 """ 

1461 existing_subscriptions = self.get_subscription_list() 

1462 

1463 sub_dict = subscription.model_dump(include={"subject", "notification"}) 

1464 for ex_sub in existing_subscriptions: 

1465 if self._subscription_dicts_are_equal( 

1466 sub_dict, ex_sub.model_dump(include={"subject", "notification"}) 

1467 ): 

1468 self.logger.info("Subscription already exists") 

1469 if update: 

1470 self.logger.info("Updated subscription") 

1471 subscription.id = ex_sub.id 

1472 self.update_subscription(subscription) 

1473 else: 

1474 warnings.warn( 

1475 f"Subscription existed already with the id" f" {ex_sub.id}" 

1476 ) 

1477 return ex_sub.id 

1478 

1479 params = {} 

1480 if skip_initial_notification: 

1481 version = self.get_version()["orion"]["version"] 

1482 if parse_version(version) <= parse_version("3.1"): 

1483 params.update({"options": "skipInitialNotification"}) 

1484 else: 

1485 pass 

1486 warnings.warn( 

1487 f"Skip initial notifications is a deprecated " 

1488 f"feature of older versions <=3.1 of the context " 

1489 f"broker. The Context Broker that you requesting has " 

1490 f"version: {version}. For newer versions we " 

1491 f"automatically skip this option. Consider " 

1492 f"refactoring and updating your services", 

1493 DeprecationWarning, 

1494 ) 

1495 

1496 url = urljoin(self.base_url, "v2/subscriptions") 

1497 headers = self.headers.copy() 

1498 headers.update({"Content-Type": "application/json"}) 

1499 try: 

1500 res = self.post( 

1501 url=url, 

1502 headers=headers, 

1503 data=subscription.model_dump_json(exclude={"id"}, exclude_none=True), 

1504 params=params, 

1505 ) 

1506 if res.ok: 

1507 self.logger.info("Subscription successfully created!") 

1508 return res.headers["Location"].split("/")[-1] 

1509 res.raise_for_status() 

1510 except requests.RequestException as err: 

1511 msg = "Could not send subscription!" 

1512 self.log_error(err=err, msg=msg) 

1513 raise 

1514 

1515 def get_subscription(self, subscription_id: str) -> Subscription: 

1516 """ 

1517 Retrieves a subscription from 

1518 Args: 

1519 subscription_id: id of the subscription 

1520 

1521 Returns: 

1522 

1523 """ 

1524 url = urljoin( 

1525 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}" 

1526 ) 

1527 headers = self.headers.copy() 

1528 try: 

1529 res = self.get(url=url, headers=headers) 

1530 if res.ok: 

1531 self.logger.debug("Received: %s", res.json()) 

1532 return Subscription(**res.json()) 

1533 res.raise_for_status() 

1534 except requests.RequestException as err: 

1535 msg = f"Could not load subscription {subscription_id}" 

1536 self.log_error(err=err, msg=msg) 

1537 raise 

1538 

1539 def update_subscription( 

1540 self, subscription: Subscription, skip_initial_notification: bool = False 

1541 ): 

1542 """ 

1543 Only the fields included in the request are updated in the subscription. 

1544 

1545 Args: 

1546 subscription: Subscription to update 

1547 skip_initial_notification: True - Initial Notifications will be 

1548 sent to recipient containing the whole data. This is 

1549 deprecated and removed from version 3.0 of the context broker. 

1550 False - skip the initial notification 

1551 

1552 Returns: 

1553 None 

1554 """ 

1555 params = {} 

1556 if skip_initial_notification: 

1557 version = self.get_version()["orion"]["version"] 

1558 if parse_version(version) <= parse_version("3.1"): 

1559 params.update({"options": "skipInitialNotification"}) 

1560 else: 

1561 pass 

1562 warnings.warn( 

1563 f"Skip initial notifications is a deprecated " 

1564 f"feature of older versions <3.1 of the context " 

1565 f"broker. The Context Broker that you requesting has " 

1566 f"version: {version}. For newer versions we " 

1567 f"automatically skip this option. Consider " 

1568 f"refactoring and updating your services", 

1569 DeprecationWarning, 

1570 ) 

1571 

1572 url = urljoin( 

1573 self.base_url, f"{self._url_version}/subscriptions/{subscription.id}" 

1574 ) 

1575 headers = self.headers.copy() 

1576 headers.update({"Content-Type": "application/json"}) 

1577 try: 

1578 res = self.patch( 

1579 url=url, 

1580 headers=headers, 

1581 data=subscription.model_dump_json(exclude={"id"}, exclude_none=True), 

1582 ) 

1583 if res.ok: 

1584 self.logger.info("Subscription successfully updated!") 

1585 else: 

1586 res.raise_for_status() 

1587 except requests.RequestException as err: 

1588 msg = f"Could not update subscription {subscription.id}" 

1589 self.log_error(err=err, msg=msg) 

1590 raise 

1591 

1592 def delete_subscription(self, subscription_id: str) -> None: 

1593 """ 

1594 Deletes a subscription from a Context Broker 

1595 Args: 

1596 subscription_id: id of the subscription 

1597 """ 

1598 url = urljoin( 

1599 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}" 

1600 ) 

1601 headers = self.headers.copy() 

1602 try: 

1603 res = self.delete(url=url, headers=headers) 

1604 if res.ok: 

1605 self.logger.info( 

1606 f"Subscription '{subscription_id}' " f"successfully deleted!" 

1607 ) 

1608 else: 

1609 res.raise_for_status() 

1610 except requests.RequestException as err: 

1611 msg = f"Could not delete subscription {subscription_id}" 

1612 self.log_error(err=err, msg=msg) 

1613 raise 

1614 

1615 # Registration API 

1616 def get_registration_list(self, *, limit: PositiveInt = None) -> List[Registration]: 

1617 """ 

1618 Lists all the context provider registrations present in the system. 

1619 

1620 Args: 

1621 limit: Limit the number of registrations to be retrieved 

1622 Returns: 

1623 

1624 """ 

1625 url = urljoin(self.base_url, f"{self._url_version}/registrations/") 

1626 headers = self.headers.copy() 

1627 params = {} 

1628 

1629 # We always use the 'count' option to check weather pagination is 

1630 # required 

1631 params.update({"options": "count"}) 

1632 try: 

1633 items = self.__pagination( 

1634 limit=limit, url=url, params=params, headers=headers 

1635 ) 

1636 adapter = TypeAdapter(List[Registration]) 

1637 return adapter.validate_python(items) 

1638 except requests.RequestException as err: 

1639 msg = "Could not load registrations!" 

1640 self.log_error(err=err, msg=msg) 

1641 raise 

1642 

1643 def post_registration(self, registration: Registration): 

1644 """ 

1645 Creates a new context provider registration. This is typically used 

1646 for binding context sources as providers of certain data. The 

1647 registration is represented by cb.models.Registration 

1648 

1649 Args: 

1650 registration (Registration): 

1651 

1652 Returns: 

1653 

1654 """ 

1655 url = urljoin(self.base_url, f"{self._url_version}/registrations") 

1656 headers = self.headers.copy() 

1657 headers.update({"Content-Type": "application/json"}) 

1658 try: 

1659 res = self.post( 

1660 url=url, 

1661 headers=headers, 

1662 data=registration.model_dump_json(exclude={"id"}, exclude_none=True), 

1663 ) 

1664 if res.ok: 

1665 self.logger.info("Registration successfully created!") 

1666 return res.headers["Location"].split("/")[-1] 

1667 res.raise_for_status() 

1668 except requests.RequestException as err: 

1669 msg = f"Could not send registration {registration.id}!" 

1670 self.log_error(err=err, msg=msg) 

1671 raise 

1672 

1673 def get_registration(self, registration_id: str) -> Registration: 

1674 """ 

1675 Retrieves a registration from context broker by id 

1676 

1677 Args: 

1678 registration_id: id of the registration 

1679 

1680 Returns: 

1681 Registration 

1682 """ 

1683 url = urljoin( 

1684 self.base_url, f"{self._url_version}/registrations/{registration_id}" 

1685 ) 

1686 headers = self.headers.copy() 

1687 try: 

1688 res = self.get(url=url, headers=headers) 

1689 if res.ok: 

1690 self.logger.debug("Received: %s", res.json()) 

1691 return Registration(**res.json()) 

1692 res.raise_for_status() 

1693 except requests.RequestException as err: 

1694 msg = f"Could not load registration {registration_id} !" 

1695 self.log_error(err=err, msg=msg) 

1696 raise 

1697 

1698 def add_valid_relationships( 

1699 self, entities: List[ContextEntity] 

1700 ) -> List[ContextEntity]: 

1701 """ 

1702 Validate all attributes in the given entities. If the attribute value points to 

1703 an existing entity, it is assumed that this attribute is a relationship, and it 

1704 will be assigned with the attribute type "relationship" 

1705 

1706 Args: 

1707 entities: list of entities that need to be validated. 

1708 

1709 Returns: 

1710 updated entities 

1711 """ 

1712 updated_entities = [] 

1713 for entity in entities[:]: 

1714 for attr_name, attr_value in entity.model_dump( 

1715 exclude={"id", "type"} 

1716 ).items(): 

1717 if isinstance(attr_value, dict): 

1718 if self.validate_relationship(attr_value): 

1719 entity.update_attribute( 

1720 { 

1721 attr_name: ContextAttribute( 

1722 **{ 

1723 "type": DataType.RELATIONSHIP, 

1724 "value": attr_value.get("value"), 

1725 } 

1726 ) 

1727 } 

1728 ) 

1729 updated_entities.append(entity) 

1730 return updated_entities 

1731 

1732 def remove_invalid_relationships( 

1733 self, entities: List[ContextEntity], hard_remove: bool = True 

1734 ) -> List[ContextEntity]: 

1735 """ 

1736 Removes invalid relationships from the entities. An invalid relationship 

1737 is a relationship that has no destination entity. 

1738 

1739 Args: 

1740 entities: list of entities that need to be validated. 

1741 hard_remove: If True, invalid relationships will be deleted. 

1742 If False, invalid relationships will be changed to Text 

1743 attributes. 

1744 

1745 Returns: 

1746 updated entities 

1747 """ 

1748 updated_entities = [] 

1749 for entity in entities[:]: 

1750 for relationship in entity.get_relationships(): 

1751 if not self.validate_relationship(relationship): 

1752 if hard_remove: 

1753 entity.delete_attributes(attrs=[relationship]) 

1754 else: 

1755 # change the attribute type to "Text" 

1756 entity.update_attribute( 

1757 attrs=[ 

1758 NamedContextAttribute( 

1759 name=relationship.name, 

1760 type=DataType.TEXT, 

1761 value=relationship.value, 

1762 ) 

1763 ] 

1764 ) 

1765 updated_entities.append(entity) 

1766 return updated_entities 

1767 

1768 def validate_relationship( 

1769 self, relationship: Union[NamedContextAttribute, ContextAttribute, Dict] 

1770 ) -> bool: 

1771 """ 

1772 Validates a relationship. A relationship is valid if it points to an existing 

1773 entity. Otherwise, it is considered invalid 

1774 

1775 Args: 

1776 relationship: relationship to validate 

1777 Returns 

1778 True if the relationship is valid, False otherwise 

1779 """ 

1780 if isinstance(relationship, NamedContextAttribute) or isinstance( 

1781 relationship, ContextAttribute 

1782 ): 

1783 destination_id = relationship.value 

1784 elif isinstance(relationship, dict): 

1785 destination_id = relationship.get("value") 

1786 if destination_id is None: 

1787 raise ValueError( 

1788 "Invalid relationship dictionary format\n" 

1789 "Expected format: {" 

1790 f'"type": "{DataType.RELATIONSHIP.value}", ' 

1791 '"value" "entity_id"}' 

1792 ) 

1793 else: 

1794 raise ValueError("Invalid relationship type.") 

1795 try: 

1796 destination_entity = self.get_entity(entity_id=destination_id) 

1797 return destination_entity.id == destination_id 

1798 except requests.RequestException as err: 

1799 if err.response.status_code == 404: 

1800 return False 

1801 

1802 def update_registration(self, registration: Registration): 

1803 """ 

1804 Only the fields included in the request are updated in the registration. 

1805 

1806 Args: 

1807 registration: Registration to update 

1808 Returns: 

1809 

1810 """ 

1811 url = urljoin( 

1812 self.base_url, f"{self._url_version}/registrations/{registration.id}" 

1813 ) 

1814 headers = self.headers.copy() 

1815 headers.update({"Content-Type": "application/json"}) 

1816 try: 

1817 res = self.patch( 

1818 url=url, 

1819 headers=headers, 

1820 data=registration.model_dump_json(exclude={"id"}, exclude_none=True), 

1821 ) 

1822 if res.ok: 

1823 self.logger.info("Registration successfully updated!") 

1824 else: 

1825 res.raise_for_status() 

1826 except requests.RequestException as err: 

1827 msg = f"Could not update registration {registration.id} !" 

1828 self.log_error(err=err, msg=msg) 

1829 raise 

1830 

1831 def delete_registration(self, registration_id: str) -> None: 

1832 """ 

1833 Deletes a subscription from a Context Broker 

1834 Args: 

1835 registration_id: id of the subscription 

1836 """ 

1837 url = urljoin( 

1838 self.base_url, f"{self._url_version}/registrations/{registration_id}" 

1839 ) 

1840 headers = self.headers.copy() 

1841 try: 

1842 res = self.delete(url=url, headers=headers) 

1843 if res.ok: 

1844 self.logger.info( 

1845 "Registration '%s' " "successfully deleted!", registration_id 

1846 ) 

1847 res.raise_for_status() 

1848 except requests.RequestException as err: 

1849 msg = f"Could not delete registration {registration_id} !" 

1850 self.log_error(err=err, msg=msg) 

1851 raise 

1852 

1853 # Batch operation API 

1854 def update( 

1855 self, 

1856 *, 

1857 entities: List[Union[ContextEntity, ContextEntityKeyValues]], 

1858 action_type: Union[ActionType, str], 

1859 update_format: str = None, 

1860 forcedUpdate: bool = False, 

1861 override_metadata: bool = False, 

1862 ) -> None: 

1863 """ 

1864 This operation allows to create, update and/or delete several entities 

1865 in a single batch operation. 

1866 

1867 This operation is split in as many individual operations as entities 

1868 in the entities vector, so the actionType is executed for each one of 

1869 them. Depending on the actionType, a mapping with regular non-batch 

1870 operations can be done: 

1871 

1872 append: maps to POST /v2/entities (if the entity does not already exist) 

1873 or POST /v2/entities/<id>/attrs (if the entity already exists). 

1874 

1875 appendStrict: maps to POST /v2/entities (if the entity does not 

1876 already exist) or POST /v2/entities/<id>/attrs?options=append (if the 

1877 entity already exists). 

1878 

1879 update: maps to PATCH /v2/entities/<id>/attrs. 

1880 

1881 delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every 

1882 attribute included in the entity or to DELETE /v2/entities/<id> if 

1883 no attribute were included in the entity. 

1884 

1885 replace: maps to PUT /v2/entities/<id>/attrs. 

1886 

1887 Args: 

1888 entities: "an array of entities, each entity specified using the " 

1889 "JSON entity representation format " 

1890 action_type (Update): "actionType, to specify the kind of update 

1891 action to do: either append, appendStrict, update, delete, 

1892 or replace. " 

1893 update_format (str): Optional 'keyValues' 

1894 forcedUpdate: Update operation have to trigger any matching 

1895 subscription, no matter if there is an actual attribute 

1896 update or no instead of the default behavior, which is to 

1897 updated only if attribute is effectively updated. 

1898 override_metadata: 

1899 Bool, replace the existing metadata with the one provided in 

1900 the request 

1901 Returns: 

1902 

1903 """ 

1904 

1905 url = urljoin(self.base_url, f"{self._url_version}/op/update") 

1906 headers = self.headers.copy() 

1907 headers.update({"Content-Type": "application/json"}) 

1908 params = {} 

1909 options = [] 

1910 if override_metadata: 

1911 options.append("overrideMetadata") 

1912 if forcedUpdate: 

1913 options.append("forcedUpdate") 

1914 if update_format: 

1915 assert ( 

1916 update_format == AttrsFormat.KEY_VALUES.value 

1917 ), "Only 'keyValues' is allowed as update format" 

1918 options.append("keyValues") 

1919 if options: 

1920 params.update({"options": ",".join(options)}) 

1921 update = Update(actionType=action_type, entities=entities) 

1922 try: 

1923 res = self.post( 

1924 url=url, 

1925 headers=headers, 

1926 params=params, 

1927 json=update.model_dump(by_alias=True), 

1928 ) 

1929 if res.ok: 

1930 self.logger.info("Update operation '%s' succeeded!", action_type) 

1931 else: 

1932 res.raise_for_status() 

1933 except requests.RequestException as err: 

1934 msg = f"Update operation '{action_type}' failed!" 

1935 self.log_error(err=err, msg=msg) 

1936 raise 

1937 

1938 def query( 

1939 self, 

1940 *, 

1941 query: Query, 

1942 limit: PositiveInt = None, 

1943 order_by: str = None, 

1944 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

1945 ) -> List[Any]: 

1946 """ 

1947 Generate api query 

1948 Args: 

1949 query (Query): 

1950 limit (PositiveInt): 

1951 order_by (str): 

1952 response_format (AttrsFormat, str): 

1953 Returns: 

1954 The response payload is an Array containing one object per matching 

1955 entity, or an empty array [] if no entities are found. The entities 

1956 follow the JSON entity representation format (described in the 

1957 section "JSON Entity Representation"). 

1958 """ 

1959 url = urljoin(self.base_url, f"{self._url_version}/op/query") 

1960 headers = self.headers.copy() 

1961 headers.update({"Content-Type": "application/json"}) 

1962 params = {"options": "count"} 

1963 

1964 if response_format: 

1965 if response_format not in list(AttrsFormat): 

1966 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

1967 params["options"] = ",".join([response_format, "count"]) 

1968 try: 

1969 items = self.__pagination( 

1970 method=PaginationMethod.POST, 

1971 url=url, 

1972 headers=headers, 

1973 params=params, 

1974 data=query.model_dump_json(exclude_none=True), 

1975 limit=limit, 

1976 ) 

1977 if response_format == AttrsFormat.NORMALIZED: 

1978 adapter = TypeAdapter(List[ContextEntity]) 

1979 return adapter.validate_python(items) 

1980 if response_format == AttrsFormat.KEY_VALUES: 

1981 adapter = TypeAdapter(List[ContextEntityKeyValues]) 

1982 return adapter.validate_python(items) 

1983 return items 

1984 except requests.RequestException as err: 

1985 msg = "Query operation failed!" 

1986 self.log_error(err=err, msg=msg) 

1987 raise 

1988 

1989 def notify(self, message: Message) -> None: 

1990 """ 

1991 This operation is intended to consume a notification payload so that 

1992 all the entity data included by such notification is persisted, 

1993 overwriting if necessary. This operation is useful when one NGSIv2 

1994 endpoint is subscribed to another NGSIv2 endpoint (federation 

1995 scenarios). The request payload must be an NGSIv2 notification 

1996 payload. The behaviour must be exactly the same as 'update' 

1997 with 'action_type' equal to append. 

1998 

1999 Args: 

2000 message: Notification message 

2001 

2002 Returns: 

2003 None 

2004 """ 

2005 url = urljoin(self.base_url, "v2/op/notify") 

2006 headers = self.headers.copy() 

2007 headers.update({"Content-Type": "application/json"}) 

2008 params = {} 

2009 try: 

2010 res = self.post( 

2011 url=url, 

2012 headers=headers, 

2013 params=params, 

2014 data=message.model_dump_json(by_alias=True), 

2015 ) 

2016 if res.ok: 

2017 self.logger.info("Notification message sent!") 

2018 else: 

2019 res.raise_for_status() 

2020 except requests.RequestException as err: 

2021 msg = ( 

2022 f"Sending notifcation message failed! \n " 

2023 f"{message.model_dump_json(indent=2)}" 

2024 ) 

2025 self.log_error(err=err, msg=msg) 

2026 raise 

2027 

2028 def post_command( 

2029 self, 

2030 *, 

2031 entity_id: str, 

2032 command: Union[Command, NamedCommand, Dict], 

2033 entity_type: str = None, 

2034 command_name: str = None, 

2035 ) -> None: 

2036 """ 

2037 Post a command to a context entity this corresponds to 'PATCH' of the 

2038 specified command attribute. 

2039 

2040 Args: 

2041 entity_id: Entity identifier 

2042 command: Command 

2043 entity_type: Entity type 

2044 command_name: Name of the command in the entity 

2045 

2046 Returns: 

2047 None 

2048 """ 

2049 if command_name: 

2050 assert isinstance(command, (Command, dict)) 

2051 if isinstance(command, dict): 

2052 command = Command(**command) 

2053 command = {command_name: command.model_dump()} 

2054 else: 

2055 assert isinstance(command, (NamedCommand, dict)) 

2056 if isinstance(command, dict): 

2057 command = NamedCommand(**command) 

2058 

2059 self.update_existing_entity_attributes( 

2060 entity_id=entity_id, entity_type=entity_type, attrs=[command] 

2061 ) 

2062 

2063 def does_entity_exist(self, entity_id: str, entity_type: str) -> bool: 

2064 """ 

2065 Test if an entity with given id and type is present in the CB 

2066 

2067 Args: 

2068 entity_id: Entity id 

2069 entity_type: Entity type 

2070 

2071 Returns: 

2072 bool; True if entity exists 

2073 

2074 Raises: 

2075 RequestException, if any error occurs (e.g: No Connection), 

2076 except that the entity is not found 

2077 """ 

2078 url = urljoin(self.base_url, f"v2/entities/{entity_id}") 

2079 headers = self.headers.copy() 

2080 params = {"type": entity_type} 

2081 

2082 try: 

2083 res = self.get(url=url, params=params, headers=headers) 

2084 if res.ok: 

2085 return True 

2086 res.raise_for_status() 

2087 except requests.RequestException as err: 

2088 if err.response is None or not err.response.status_code == 404: 

2089 self.log_error(err=err, msg="Checking entity existence failed!") 

2090 raise 

2091 return False 

2092 

2093 def patch_entity( 

2094 self, 

2095 entity: ContextEntity, 

2096 old_entity: Optional[ContextEntity] = None, 

2097 override_attr_metadata: bool = True, 

2098 ) -> None: 

2099 """ 

2100 Takes a given entity and updates the state in the CB to match it. 

2101 It is an extended equivalent to the HTTP method PATCH, which applies 

2102 partial modifications to a resource. 

2103 

2104 Args: 

2105 entity: Entity to update 

2106 old_entity: OPTIONAL, if given only the differences between the 

2107 old_entity and entity are updated in the CB. 

2108 Other changes made to the entity in CB, can be kept. 

2109 If type or id was changed, the old_entity will be 

2110 deleted. 

2111 override_attr_metadata: 

2112 Whether to override or append the attributes metadata. 

2113 `True` for overwrite or `False` for update/append 

2114 

2115 Returns: 

2116 None 

2117 """ 

2118 

2119 new_entity = entity 

2120 

2121 if old_entity is None: 

2122 # If no old entity_was provided we use the current state to compare 

2123 # the entity to 

2124 if self.does_entity_exist( 

2125 entity_id=new_entity.id, entity_type=new_entity.type 

2126 ): 

2127 old_entity = self.get_entity( 

2128 entity_id=new_entity.id, entity_type=new_entity.type 

2129 ) 

2130 else: 

2131 # the entity is new, post and finish 

2132 self.post_entity(new_entity, update=False) 

2133 return 

2134 

2135 else: 

2136 # An old_entity was provided 

2137 # check if the old_entity (still) exists else recall methode 

2138 # and discard old_entity 

2139 if not self.does_entity_exist( 

2140 entity_id=old_entity.id, entity_type=old_entity.type 

2141 ): 

2142 self.patch_entity( 

2143 new_entity, override_attr_metadata=override_attr_metadata 

2144 ) 

2145 return 

2146 

2147 # if type or id was changed, the old_entity needs to be deleted 

2148 # and the new_entity created 

2149 # In this case we will lose the current state of the entity 

2150 if old_entity.id != new_entity.id or old_entity.type != new_entity.type: 

2151 self.delete_entity(entity_id=old_entity.id, entity_type=old_entity.type) 

2152 

2153 if not self.does_entity_exist( 

2154 entity_id=new_entity.id, entity_type=new_entity.type 

2155 ): 

2156 self.post_entity(entity=new_entity, update=False) 

2157 return 

2158 

2159 # At this point we know that we need to patch only the attributes of 

2160 # the entity 

2161 # Check the differences between the attributes of old and new entity 

2162 # Delete the removed attributes, create the new ones, 

2163 # and update the existing if necessary 

2164 old_attributes = old_entity.get_attributes() 

2165 new_attributes = new_entity.get_attributes() 

2166 

2167 # Manage attributes that existed before 

2168 for old_attr in old_attributes: 

2169 # commands do not exist in the ContextEntity and are only 

2170 # registrations to the corresponding device. Operations as 

2171 # delete will fail as it does not technically exist 

2172 corresponding_new_attr = None 

2173 for new_attr in new_attributes: 

2174 if new_attr.name == old_attr.name: 

2175 corresponding_new_attr = new_attr 

2176 

2177 if corresponding_new_attr is None: 

2178 # Attribute no longer exists, delete it 

2179 try: 

2180 self.delete_entity_attribute( 

2181 entity_id=new_entity.id, 

2182 entity_type=new_entity.type, 

2183 attr_name=old_attr.name, 

2184 ) 

2185 except requests.RequestException as err: 

2186 msg = ( 

2187 f"Failed to delete attribute {old_attr.name} of " 

2188 f"entity {new_entity.id}." 

2189 ) 

2190 if err.response is not None and err.response.status_code == 404: 

2191 # if the attribute is provided by a registration the 

2192 # deletion will fail 

2193 msg += ( 

2194 f" The attribute is probably provided " 

2195 f"by a registration." 

2196 ) 

2197 self.log_error(err=err, msg=msg) 

2198 else: 

2199 self.log_error(err=err, msg=msg) 

2200 raise 

2201 else: 

2202 # Check if attributed changed in any way, if yes update 

2203 # else do nothing and keep current state 

2204 if old_attr != corresponding_new_attr: 

2205 try: 

2206 self.update_entity_attribute( 

2207 entity_id=new_entity.id, 

2208 entity_type=new_entity.type, 

2209 attr=corresponding_new_attr, 

2210 override_metadata=override_attr_metadata, 

2211 ) 

2212 except requests.RequestException as err: 

2213 msg = ( 

2214 f"Failed to update attribute {old_attr.name} of " 

2215 f"entity {new_entity.id}." 

2216 ) 

2217 if err.response is not None and err.response.status_code == 404: 

2218 # if the attribute is provided by a registration the 

2219 # update will fail 

2220 msg += ( 

2221 f" The attribute is probably provided " 

2222 f"by a registration." 

2223 ) 

2224 self.log_error(err=err, msg=msg) 

2225 raise 

2226 

2227 # Create new attributes 

2228 update_entity = ContextEntity(id=entity.id, type=entity.type) 

2229 update_needed = False 

2230 for new_attr in new_attributes: 

2231 # commands do not exist in the ContextEntity and are only 

2232 # registrations to the corresponding device. Operations as 

2233 # delete will fail as it does not technically exists 

2234 attr_existed = False 

2235 for old_attr in old_attributes: 

2236 if new_attr.name == old_attr.name: 

2237 attr_existed = True 

2238 

2239 if not attr_existed: 

2240 update_needed = True 

2241 update_entity.add_attributes([new_attr]) 

2242 

2243 if update_needed: 

2244 self.update_entity(update_entity) 

2245 

2246 def _subscription_dicts_are_equal(self, first: dict, second: dict): 

2247 """ 

2248 Check if two dictionaries and all sub-dictionaries are equal. 

2249 Logs a warning if the keys are not equal, but ignores the 

2250 comparison of such keys. 

2251 

2252 Args: 

2253 first dict: Dictionary of first subscription 

2254 second dict: Dictionary of second subscription 

2255 

2256 Returns: 

2257 True if equal, else False 

2258 """ 

2259 

2260 def _value_is_not_none(value): 

2261 """ 

2262 Recursive function to check if a value equals none. 

2263 If the value is a dict and any value of the dict is not none, 

2264 the value is not none. 

2265 If the value is a list and any item is not none, the value is not none. 

2266 If it's neither dict nore list, bool is used. 

2267 """ 

2268 if isinstance(value, dict): 

2269 return any([_value_is_not_none(value=_v) for _v in value.values()]) 

2270 if isinstance(value, list): 

2271 return any([_value_is_not_none(value=_v) for _v in value]) 

2272 else: 

2273 return bool(value) 

2274 

2275 if first.keys() != second.keys(): 

2276 warnings.warn( 

2277 "Subscriptions contain a different set of fields. " 

2278 "Only comparing to new fields of the new one." 

2279 ) 

2280 for k, v in first.items(): 

2281 ex_value = second.get(k, None) 

2282 if isinstance(v, dict) and isinstance(ex_value, dict): 

2283 equal = self._subscription_dicts_are_equal(v, ex_value) 

2284 if equal: 

2285 continue 

2286 else: 

2287 return False 

2288 if v != ex_value: 

2289 self.logger.debug(f"Not equal fields for key {k}: ({v}, {ex_value})") 

2290 if ( 

2291 not _value_is_not_none(v) 

2292 and not _value_is_not_none(ex_value) 

2293 or k == "timesSent" 

2294 ): 

2295 continue 

2296 return False 

2297 return True 

2298 

2299 

2300# 

2301# 

2302# def check_duplicate_subscription(self, subscription_body, limit: int = 20): 

2303# """ 

2304# Function compares the subject of the subscription body, on whether a subscription 

2305# already exists for a device / entity. 

2306# :param subscription_body: the body of the new subscripton 

2307# :param limit: pagination parameter, to set the number of 

2308# subscriptions bodies the get request should grab 

2309# :return: exists, boolean -> True, if such a subscription allready 

2310# exists 

2311# """ 

2312# exists = False 

2313# subscription_subject = json.loads(subscription_body)["subject"] 

2314# # Exact keys depend on subscription body 

2315# try: 

2316# subscription_url = json.loads(subscription_body)[ 

2317# "notification"]["httpCustom"]["url"] 

2318# except KeyError: 

2319# subscription_url = json.loads(subscription_body)[ 

2320# "notification"]["http"]["url"] 

2321# 

2322# # If the number of subscriptions is larger then the limit, 

2323# paginations methods have to be used 

2324# url = self.url + '/v2/subscriptions?limit=' + str(limit) + 

2325# '&options=count' 

2326# response = self.session.get(url, headers=self.get_header()) 

2327# 

2328# sub_count = float(response.headers["Fiware-Total-Count"]) 

2329# response = json.loads(response.text) 

2330# if sub_count >= limit: 

2331# response = self.get_pagination(url=url, headers=self.get_header(), 

2332# limit=limit, count=sub_count) 

2333# response = json.loads(response) 

2334# 

2335# for existing_subscription in response: 

2336# # check whether the exact same subscriptions already exists 

2337# if existing_subscription["subject"] == subscription_subject: 

2338# exists = True 

2339# break 

2340# try: 

2341# existing_url = existing_subscription["notification"][ 

2342# "http"]["url"] 

2343# except KeyError: 

2344# existing_url = existing_subscription["notification"][ 

2345# "httpCustom"]["url"] 

2346# # check whether both subscriptions notify to the same path 

2347# if existing_url != subscription_url: 

2348# continue 

2349# else: 

2350# # iterate over all entities included in the subscription object 

2351# for entity in subscription_subject["entities"]: 

2352# if 'type' in entity.keys(): 

2353# subscription_type = entity['type'] 

2354# else: 

2355# subscription_type = entity['typePattern'] 

2356# if 'id' in entity.keys(): 

2357# subscription_id = entity['id'] 

2358# else: 

2359# subscription_id = entity["idPattern"] 

2360# # iterate over all entities included in the exisiting 

2361# subscriptions 

2362# for existing_entity in existing_subscription["subject"][ 

2363# "entities"]: 

2364# if "type" in entity.keys(): 

2365# type_existing = entity["type"] 

2366# else: 

2367# type_existing = entity["typePattern"] 

2368# if "id" in entity.keys(): 

2369# id_existing = entity["id"] 

2370# else: 

2371# id_existing = entity["idPattern"] 

2372# # as the ID field is non optional, it has to match 

2373# # check whether the type match 

2374# # if the type field is empty, they match all types 

2375# if (type_existing == subscription_type) or\ 

2376# ('*' in subscription_type) or \ 

2377# ('*' in type_existing)\ 

2378# or (type_existing == "") or ( 

2379# subscription_type == ""): 

2380# # check if on of the subscriptions is a pattern, 

2381# or if they both refer to the same id 

2382# # Get the attrs first, to avoid code duplication 

2383# # last thing to compare is the attributes 

2384# # Assumption -> position is the same as the 

2385# entities _list 

2386# # i == j 

2387# i = subscription_subject["entities"].index(entity) 

2388# j = existing_subscription["subject"][ 

2389# "entities"].index(existing_entity) 

2390# try: 

2391# subscription_attrs = subscription_subject[ 

2392# "condition"]["attrs"][i] 

2393# except (KeyError, IndexError): 

2394# subscription_attrs = [] 

2395# try: 

2396# existing_attrs = existing_subscription[ 

2397# "subject"]["condition"]["attrs"][j] 

2398# except (KeyError, IndexError): 

2399# existing_attrs = [] 

2400# 

2401# if (".*" in subscription_id) or ('.*' in 

2402# id_existing) or (subscription_id == id_existing): 

2403# # Attributes have to match, or the have to 

2404# be an empty array 

2405# if (subscription_attrs == existing_attrs) or 

2406# (subscription_attrs == []) or (existing_attrs == []): 

2407# exists = True 

2408# # if they do not match completely or subscribe 

2409# to all ids they have to match up to a certain position 

2410# elif ("*" in subscription_id) or ('*' in 

2411# id_existing): 

2412# regex_existing = id_existing.find('*') 

2413# regex_subscription = 

2414# subscription_id.find('*') 

2415# # slice the strings to compare 

2416# if (id_existing[:regex_existing] in 

2417# subscription_id) or (subscription_id[:regex_subscription] in id_existing) or \ 

2418# (id_existing[regex_existing:] in 

2419# subscription_id) or (subscription_id[regex_subscription:] in id_existing): 

2420# if (subscription_attrs == 

2421# existing_attrs) or (subscription_attrs == []) or (existing_attrs == []): 

2422# exists = True 

2423# else: 

2424# continue 

2425# else: 

2426# continue 

2427# else: 

2428# continue 

2429# else: 

2430# continue 

2431# else: 

2432# continue 

2433# return exists 

2434#