Coverage for filip/clients/ngsi_v2/cb.py: 81%

673 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-11-20 16:54 +0000

1""" 

2Context Broker Module for API Client 

3""" 

4from __future__ import annotations 

5 

6import copy 

7from copy import deepcopy 

8from math import inf 

9from pkg_resources import parse_version 

10from pydantic import PositiveInt, PositiveFloat, AnyHttpUrl 

11from pydantic.type_adapter import TypeAdapter 

12from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union 

13import re 

14import requests 

15from urllib.parse import urljoin 

16import warnings 

17from filip.clients.base_http_client import BaseHttpClient 

18from filip.config import settings 

19from filip.models.base import FiwareHeader, PaginationMethod 

20from filip.utils.simple_ql import QueryString 

21from filip.models.ngsi_v2.context import ( 

22 ActionType, 

23 Command, 

24 ContextEntity, 

25 ContextEntityKeyValues, 

26 ContextAttribute, 

27 NamedCommand, 

28 NamedContextAttribute, 

29 Query, 

30 Update, 

31 PropertyFormat, 

32) 

33from filip.models.ngsi_v2.base import AttrsFormat 

34from filip.models.ngsi_v2.subscriptions import Subscription, Message 

35from filip.models.ngsi_v2.registrations import Registration 

36 

37if TYPE_CHECKING: 

38 from filip.clients.ngsi_v2.iota import IoTAClient 

39 

40 

41class ContextBrokerClient(BaseHttpClient): 

42 """ 

43 Implementation of NGSI Context Broker functionalities, such as creating 

44 entities and subscriptions; retrieving, updating and deleting data. 

45 Further documentation: 

46 https://fiware-orion.readthedocs.io/en/master/ 

47 

48 Api specifications for v2 are located here: 

49 https://telefonicaid.github.io/fiware-orion/api/v2/stable/ 

50 

51 Note: 

52 We use the reference implementation for development. Therefore, some 

53 other brokers may show slightly different behavior! 

54 """ 

55 

56 def __init__( 

57 self, 

58 url: str = None, 

59 *, 

60 session: requests.Session = None, 

61 fiware_header: FiwareHeader = None, 

62 **kwargs, 

63 ): 

64 """ 

65 

66 Args: 

67 url: Url of context broker server 

68 session (requests.Session): 

69 fiware_header (FiwareHeader): fiware service and fiware service path 

70 **kwargs (Optional): Optional arguments that ``request`` takes. 

71 """ 

72 # set service url 

73 url = url or settings.CB_URL 

74 super().__init__( 

75 url=url, session=session, fiware_header=fiware_header, **kwargs 

76 ) 

77 

78 def __pagination( 

79 self, 

80 *, 

81 method: PaginationMethod = PaginationMethod.GET, 

82 url: str, 

83 headers: Dict, 

84 limit: Union[PositiveInt, PositiveFloat] = None, 

85 params: Dict = None, 

86 data: str = None, 

87 ) -> List[Dict]: 

88 """ 

89 NGSIv2 implements a pagination mechanism in order to help clients to 

90 retrieve large sets of resources. This mechanism works for all listing 

91 operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions, 

92 POST /v2/op/query, etc.). This function helps getting datasets that are 

93 larger than the limit for the different GET operations. 

94 

95 https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html 

96 

97 Args: 

98 url: Information about the url, obtained from the original function 

99 headers: The headers from the original function 

100 params: 

101 limit: 

102 

103 Returns: 

104 object: 

105 

106 """ 

107 if limit is None: 

108 limit = inf 

109 if limit > 1000: 

110 params["limit"] = 1000 # maximum items per request 

111 else: 

112 params["limit"] = limit 

113 

114 if self.session: 

115 session = self.session 

116 else: 

117 session = requests.Session() 

118 with session: 

119 res = session.request( 

120 method=method, url=url, params=params, headers=headers, data=data 

121 ) 

122 if res.ok: 

123 items = res.json() 

124 # do pagination 

125 count = int(res.headers["Fiware-Total-Count"]) 

126 

127 while len(items) < limit and len(items) < count: 

128 # Establishing the offset from where entities are retrieved 

129 params["offset"] = len(items) 

130 params["limit"] = min(1000, (limit - len(items))) 

131 res = session.request( 

132 method=method, 

133 url=url, 

134 params=params, 

135 headers=headers, 

136 data=data, 

137 ) 

138 if res.ok: 

139 items.extend(res.json()) 

140 else: 

141 res.raise_for_status() 

142 self.logger.debug("Received: %s", items) 

143 return items 

144 res.raise_for_status() 

145 

146 # MANAGEMENT API 

147 def get_version(self) -> Dict: 

148 """ 

149 Gets version of IoT Agent 

150 Returns: 

151 Dictionary with response 

152 """ 

153 url = urljoin(self.base_url, "version") 

154 try: 

155 res = self.get(url=url, headers=self.headers) 

156 if res.ok: 

157 return res.json() 

158 res.raise_for_status() 

159 except requests.RequestException as err: 

160 self.logger.error(err) 

161 raise 

162 

163 def get_resources(self) -> Dict: 

164 """ 

165 Gets reo 

166 

167 Returns: 

168 Dict 

169 """ 

170 url = urljoin(self.base_url, "v2") 

171 try: 

172 res = self.get(url=url, headers=self.headers) 

173 if res.ok: 

174 return res.json() 

175 res.raise_for_status() 

176 except requests.RequestException as err: 

177 self.logger.error(err) 

178 raise 

179 

180 # STATISTICS API 

181 def get_statistics(self) -> Dict: 

182 """ 

183 Gets statistics of context broker 

184 Returns: 

185 Dictionary with response 

186 """ 

187 url = urljoin(self.base_url, "statistics") 

188 try: 

189 res = self.get(url=url, headers=self.headers) 

190 if res.ok: 

191 return res.json() 

192 res.raise_for_status() 

193 except requests.RequestException as err: 

194 self.logger.error(err) 

195 raise 

196 

197 # CONTEXT MANAGEMENT API ENDPOINTS 

198 # Entity Operations 

199 def post_entity( 

200 self, 

201 entity: Union[ContextEntity, ContextEntityKeyValues], 

202 update: bool = False, 

203 patch: bool = False, 

204 override_attr_metadata: bool = True, 

205 key_values: bool = False, 

206 ): 

207 """ 

208 Function registers an Object with the NGSI Context Broker, 

209 if it already exists it can be automatically updated (overwritten) 

210 if the update bool is True. 

211 First a post request with the entity is tried, if the response code 

212 is 422 the entity is uncrossable, as it already exists there are two 

213 options, either overwrite it, if the attribute have changed 

214 (e.g. at least one new/new values) (update = True) or leave 

215 it the way it is (update=False) 

216 If you only want to manipulate the entities values, you need to set 

217 patch argument. 

218 

219 Args: 

220 entity (ContextEntity/ContextEntityKeyValues): 

221 Context Entity Object 

222 update (bool): 

223 If the response.status_code is 422, whether the override and 

224 existing entity 

225 patch (bool): 

226 If the response.status_code is 422, whether to manipulate the 

227 existing entity. Omitted if update `True`. 

228 override_attr_metadata: 

229 Only applies for patch equal to `True`. 

230 Whether to override or append the attribute's metadata. 

231 `True` for overwrite or `False` for update/append 

232 key_values(bool): 

233 By default False. If set to True, "options=keyValues" will 

234 be included in params of post request. The payload uses 

235 the keyValues simplified entity representation, i.e. 

236 ContextEntityKeyValues. 

237 """ 

238 url = urljoin(self.base_url, "v2/entities") 

239 headers = self.headers.copy() 

240 params = {} 

241 options = [] 

242 if key_values: 

243 assert isinstance(entity, ContextEntityKeyValues) 

244 options.append("keyValues") 

245 else: 

246 assert isinstance(entity, ContextEntity) 

247 if options: 

248 params.update({'options': ",".join(options)}) 

249 try: 

250 res = self.post( 

251 url=url, headers=headers, json=entity.model_dump(exclude_none=True), 

252 params=params, 

253 ) 

254 if res.ok: 

255 self.logger.info("Entity successfully posted!") 

256 return res.headers.get("Location") 

257 res.raise_for_status() 

258 except requests.RequestException as err: 

259 if update and err.response.status_code == 422: 

260 return self.override_entity( 

261 entity=entity, key_values=key_values) 

262 if patch and err.response.status_code == 422: 

263 if not key_values: 

264 return self.patch_entity( 

265 entity=entity, override_attr_metadata=override_attr_metadata 

266 ) 

267 else: 

268 return self._patch_entity_key_values(entity=entity) 

269 msg = f"Could not post entity {entity.id}" 

270 self.log_error(err=err, msg=msg) 

271 raise 

272 

273 def get_entity_list( 

274 self, 

275 *, 

276 entity_ids: List[str] = None, 

277 entity_types: List[str] = None, 

278 id_pattern: str = None, 

279 type_pattern: str = None, 

280 q: Union[str, QueryString] = None, 

281 mq: Union[str, QueryString] = None, 

282 georel: str = None, 

283 geometry: str = None, 

284 coords: str = None, 

285 limit: PositiveInt = inf, 

286 attrs: List[str] = None, 

287 metadata: str = None, 

288 order_by: str = None, 

289 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

290 ) -> List[Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]]: 

291 r""" 

292 Retrieves a list of context entities that match different criteria by 

293 id, type, pattern matching (either id or type) and/or those which 

294 match a query or geographical query (see Simple Query Language and 

295 Geographical Queries). A given entity has to match all the criteria 

296 to be retrieved (i.e., the criteria is combined in a logical AND 

297 way). Note that pattern matching query parameters are incompatible 

298 (i.e. mutually exclusive) with their corresponding exact matching 

299 parameters, i.e. idPattern with id and typePattern with type. 

300 

301 Args: 

302 entity_ids: A comma-separated list of elements. Retrieve entities 

303 whose ID matches one of the elements in the list. 

304 Incompatible with idPattern,e.g. Boe_Idarium 

305 entity_types: comma-separated list of elements. Retrieve entities 

306 whose type matches one of the elements in the list. 

307 Incompatible with typePattern. Example: Room. 

308 id_pattern: A correctly formatted regular expression. Retrieve 

309 entities whose ID matches the regular expression. Incompatible 

310 with id, e.g. ngsi-ld.* or sensor.* 

311 type_pattern: A correctly formatted regular expression. Retrieve 

312 entities whose type matches the regular expression. 

313 Incompatible with type, e.g. room.* 

314 q (SimpleQuery): A query expression, composed of a list of 

315 statements separated by ;, i.e., 

316 q=statement1;statement2;statement3. See Simple Query 

317 Language specification. Example: temperature>40. 

318 mq (SimpleQuery): A query expression for attribute metadata, 

319 composed of a list of statements separated by ;, i.e., 

320 mq=statement1;statement2;statement3. See Simple Query 

321 Language specification. Example: temperature.accuracy<0.9. 

322 georel: Spatial relationship between matching entities and a 

323 reference shape. See Geographical Queries. Example: 'near'. 

324 geometry: Geographical area to which the query is restricted. 

325 See Geographical Queries. Example: point. 

326 coords: List of latitude-longitude pairs of coordinates separated 

327 by ';'. See Geographical Queries. Example: 41.390205, 

328 2.154007;48.8566,2.3522. 

329 limit: Limits the number of entities to be retrieved Example: 20 

330 attrs: Comma-separated list of attribute names whose data are to 

331 be included in the response. The attributes are retrieved in 

332 the order specified by this parameter. If this parameter is 

333 not included, the attributes are retrieved in arbitrary 

334 order. See "Filtering out attributes and metadata" section 

335 for more detail. Example: seatNumber. 

336 metadata: A list of metadata names to include in the response. 

337 See "Filtering out attributes and metadata" section for more 

338 detail. Example: accuracy. 

339 order_by: Criteria for ordering results. See "Ordering Results" 

340 section for details. Example: temperature,!speed. 

341 response_format (AttrsFormat, str): Response Format. Note: That if 

342 'keyValues' or 'values' are used the response model will 

343 change to List[ContextEntityKeyValues] and to List[Dict[str, 

344 Any]], respectively. 

345 Returns: 

346 

347 """ 

348 url = urljoin(self.base_url, "v2/entities/") 

349 headers = self.headers.copy() 

350 params = {} 

351 

352 if entity_ids and id_pattern: 

353 raise ValueError 

354 if entity_types and type_pattern: 

355 raise ValueError 

356 if entity_ids: 

357 if not isinstance(entity_ids, list): 

358 entity_ids = [entity_ids] 

359 params.update({"id": ",".join(entity_ids)}) 

360 if id_pattern: 

361 try: 

362 re.compile(id_pattern) 

363 except re.error as err: 

364 raise ValueError(f"Invalid Pattern: {err}") from err 

365 params.update({"idPattern": id_pattern}) 

366 if entity_types: 

367 if not isinstance(entity_types, list): 

368 entity_types = [entity_types] 

369 params.update({"type": ",".join(entity_types)}) 

370 if type_pattern: 

371 try: 

372 re.compile(type_pattern) 

373 except re.error as err: 

374 raise ValueError(f"Invalid Pattern: {err.msg}") from err 

375 params.update({"typePattern": type_pattern}) 

376 if attrs: 

377 params.update({"attrs": ",".join(attrs)}) 

378 if metadata: 

379 params.update({"metadata": ",".join(metadata)}) 

380 if q: 

381 if isinstance(q, str): 

382 q = QueryString.parse_str(q) 

383 params.update({"q": str(q)}) 

384 if mq: 

385 params.update({"mq": str(mq)}) 

386 if geometry: 

387 params.update({"geometry": geometry}) 

388 if georel: 

389 params.update({"georel": georel}) 

390 if coords: 

391 params.update({"coords": coords}) 

392 if order_by: 

393 params.update({"orderBy": order_by}) 

394 if response_format not in list(AttrsFormat): 

395 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

396 response_format = ",".join(["count", response_format]) 

397 params.update({"options": response_format}) 

398 try: 

399 items = self.__pagination( 

400 method=PaginationMethod.GET, 

401 limit=limit, 

402 url=url, 

403 params=params, 

404 headers=headers, 

405 ) 

406 if AttrsFormat.NORMALIZED in response_format: 

407 adapter = TypeAdapter(List[ContextEntity]) 

408 return adapter.validate_python(items) 

409 if AttrsFormat.KEY_VALUES in response_format: 

410 adapter = TypeAdapter(List[ContextEntityKeyValues]) 

411 return adapter.validate_python(items) 

412 return items 

413 

414 except requests.RequestException as err: 

415 msg = "Could not load entities" 

416 self.log_error(err=err, msg=msg) 

417 raise 

418 

419 def get_entity( 

420 self, 

421 entity_id: str, 

422 entity_type: str = None, 

423 attrs: List[str] = None, 

424 metadata: List[str] = None, 

425 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

426 ) -> Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]: 

427 """ 

428 This operation must return one entity element only, but there may be 

429 more than one entity with the same ID (e.g. entities with same ID but 

430 different types). In such case, an error message is returned, with 

431 the HTTP status code set to 409 Conflict. 

432 

433 Args: 

434 entity_id (String): Id of the entity to be retrieved 

435 entity_type (String): Entity type, to avoid ambiguity in case 

436 there are several entities with the same entity id. 

437 attrs (List of Strings): List of attribute names whose data must be 

438 included in the response. The attributes are retrieved in the 

439 order specified by this parameter. 

440 See "Filtering out attributes and metadata" section for more 

441 detail. If this parameter is not included, the attributes are 

442 retrieved in arbitrary order, and all the attributes of the 

443 entity are included in the response. 

444 Example: temperature, humidity. 

445 metadata (List of Strings): A list of metadata names to include in 

446 the response. See "Filtering out attributes and metadata" 

447 section for more detail. Example: accuracy. 

448 response_format (AttrsFormat, str): Representation format of 

449 response 

450 Returns: 

451 ContextEntity 

452 """ 

453 url = urljoin(self.base_url, f"v2/entities/{entity_id}") 

454 headers = self.headers.copy() 

455 params = {} 

456 if entity_type: 

457 params.update({"type": entity_type}) 

458 if attrs: 

459 params.update({"attrs": ",".join(attrs)}) 

460 if metadata: 

461 params.update({"metadata": ",".join(metadata)}) 

462 if response_format not in list(AttrsFormat): 

463 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

464 params.update({"options": response_format}) 

465 

466 try: 

467 res = self.get(url=url, params=params, headers=headers) 

468 if res.ok: 

469 self.logger.info("Entity successfully retrieved!") 

470 self.logger.debug("Received: %s", res.json()) 

471 if response_format == AttrsFormat.NORMALIZED: 

472 return ContextEntity(**res.json()) 

473 if response_format == AttrsFormat.KEY_VALUES: 

474 return ContextEntityKeyValues(**res.json()) 

475 return res.json() 

476 res.raise_for_status() 

477 except requests.RequestException as err: 

478 msg = f"Could not load entity {entity_id}" 

479 self.log_error(err=err, msg=msg) 

480 raise 

481 

482 def get_entity_attributes( 

483 self, 

484 entity_id: str, 

485 entity_type: str = None, 

486 attrs: List[str] = None, 

487 metadata: List[str] = None, 

488 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

489 ) -> Dict[str, ContextAttribute]: 

490 """ 

491 This request is similar to retrieving the whole entity, however this 

492 one omits the id and type fields. Just like the general request of 

493 getting an entire entity, this operation must return only one entity 

494 element. If more than one entity with the same ID is found (e.g. 

495 entities with same ID but different type), an error message is 

496 returned, with the HTTP status code set to 409 Conflict. 

497 

498 Args: 

499 entity_id (String): Id of the entity to be retrieved 

500 entity_type (String): Entity type, to avoid ambiguity in case 

501 there are several entities with the same entity id. 

502 attrs (List of Strings): List of attribute names whose data must be 

503 included in the response. The attributes are retrieved in the 

504 order specified by this parameter. 

505 See "Filtering out attributes and metadata" section for more 

506 detail. If this parameter is not included, the attributes are 

507 retrieved in arbitrary order, and all the attributes of the 

508 entity are included in the response. Example: temperature, 

509 humidity. 

510 metadata (List of Strings): A list of metadata names to include in 

511 the response. See "Filtering out attributes and metadata" 

512 section for more detail. Example: accuracy. 

513 response_format (AttrsFormat, str): Representation format of 

514 response 

515 Returns: 

516 Dict 

517 """ 

518 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs") 

519 headers = self.headers.copy() 

520 params = {} 

521 if entity_type: 

522 params.update({"type": entity_type}) 

523 if attrs: 

524 params.update({"attrs": ",".join(attrs)}) 

525 if metadata: 

526 params.update({"metadata": ",".join(metadata)}) 

527 if response_format not in list(AttrsFormat): 

528 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

529 params.update({"options": response_format}) 

530 try: 

531 res = self.get(url=url, params=params, headers=headers) 

532 if res.ok: 

533 if response_format == AttrsFormat.NORMALIZED: 

534 return { 

535 key: ContextAttribute(**values) 

536 for key, values in res.json().items() 

537 } 

538 return res.json() 

539 res.raise_for_status() 

540 except requests.RequestException as err: 

541 msg = f"Could not load attributes from entity {entity_id} !" 

542 self.log_error(err=err, msg=msg) 

543 raise 

544 

545 def update_entity(self, entity: Union[ContextEntity, ContextEntityKeyValues, dict], 

546 append_strict: bool = False, 

547 key_values: bool = False 

548 ): 

549 """ 

550 The request payload is an object representing the attributes to 

551 append or update. 

552 

553 Note: 

554 Update means overwriting the existing entity. If you want to 

555 manipulate you should rather use patch_entity. 

556 

557 Args: 

558 entity (ContextEntity): 

559 append_strict: If `False` the entity attributes are updated (if they 

560 previously exist) or appended (if they don't previously exist) 

561 with the ones in the payload. 

562 If `True` all the attributes in the payload not 

563 previously existing in the entity are appended. In addition 

564 to that, in case some of the attributes in the payload 

565 already exist in the entity, an error is returned. 

566 More precisely this means a strict append procedure. 

567 key_values: By default False. If set to True, the payload uses 

568 the keyValues simplified entity representation, i.e. 

569 ContextEntityKeyValues. 

570 Returns: 

571 None 

572 """ 

573 if key_values: 

574 if isinstance(entity, dict): 

575 entity = copy.deepcopy(entity) 

576 _id = entity.pop("id") 

577 _type = entity.pop("type") 

578 attrs = entity 

579 entity = ContextEntityKeyValues(id=_id, type=_type) 

580 else: 

581 attrs = entity.model_dump(exclude={"id", "type"}) 

582 else: 

583 attrs = entity.get_attributes() 

584 self.update_or_append_entity_attributes( 

585 entity_id=entity.id, 

586 entity_type=entity.type, 

587 attrs=attrs, 

588 append_strict=append_strict, 

589 key_values=key_values, 

590 ) 

591 

592 def update_entity_properties(self, entity: ContextEntity, append_strict: bool = False): 

593 """ 

594 The request payload is an object representing the attributes, of any type 

595 but Relationship, to append or update. 

596 

597 Note: 

598 Update means overwriting the existing entity. If you want to 

599 manipulate you should rather use patch_entity. 

600 

601 Args: 

602 entity (ContextEntity): 

603 append_strict: If `False` the entity attributes are updated (if they 

604 previously exist) or appended (if they don't previously exist) 

605 with the ones in the payload. 

606 If `True` all the attributes in the payload not 

607 previously existing in the entity are appended. In addition 

608 to that, in case some of the attributes in the payload 

609 already exist in the entity, an error is returned. 

610 More precisely this means a strict append procedure. 

611 

612 Returns: 

613 None 

614 """ 

615 self.update_or_append_entity_attributes( 

616 entity_id=entity.id, 

617 entity_type=entity.type, 

618 attrs=entity.get_properties(), 

619 append_strict=append_strict, 

620 ) 

621 

622 def update_entity_relationships(self, entity: ContextEntity, 

623 append_strict: bool = False): 

624 """ 

625 The request payload is an object representing only the attributes, of type 

626 Relationship, to append or update. 

627 

628 Note: 

629 Update means overwriting the existing entity. If you want to 

630 manipulate you should rather use patch_entity. 

631 

632 Args: 

633 entity (ContextEntity): 

634 append_strict: If `False` the entity attributes are updated (if they 

635 previously exist) or appended (if they don't previously exist) 

636 with the ones in the payload. 

637 If `True` all the attributes in the payload not 

638 previously existing in the entity are appended. In addition 

639 to that, in case some of the attributes in the payload 

640 already exist in the entity, an error is returned. 

641 More precisely this means a strict append procedure. 

642 

643 Returns: 

644 None 

645 """ 

646 self.update_or_append_entity_attributes( 

647 entity_id=entity.id, 

648 entity_type=entity.type, 

649 attrs=entity.get_relationships(), 

650 append_strict=append_strict, 

651 ) 

652 

653 def delete_entity( 

654 self, 

655 entity_id: str, 

656 entity_type: str= None, 

657 delete_devices: bool = False, 

658 iota_client: IoTAClient = None, 

659 iota_url: AnyHttpUrl = settings.IOTA_URL, 

660 ) -> None: 

661 """ 

662 Remove a entity from the context broker. No payload is required 

663 or received. 

664 

665 Args: 

666 entity_id: 

667 Id of the entity to be deleted 

668 entity_type: 

669 Entity type, to avoid ambiguity in case there are several 

670 entities with the same entity id. 

671 delete_devices: 

672 If True, also delete all devices that reference this 

673 entity (entity_id as entity_name) 

674 iota_client: 

675 Corresponding IoTA-Client used to access IoTA-Agent 

676 iota_url: 

677 URL of the corresponding IoT-Agent. This will autogenerate 

678 an IoTA-Client, mirroring the information of the 

679 ContextBrokerClient, e.g. FiwareHeader, and other headers 

680 

681 Returns: 

682 None 

683 """ 

684 url = urljoin(self.base_url, f"v2/entities/{entity_id}") 

685 headers = self.headers.copy() 

686 if entity_type: 

687 params = {'type': entity_type} 

688 else: 

689 params = None 

690 try: 

691 res = self.delete(url=url, params=params, headers=headers) 

692 if res.ok: 

693 self.logger.info("Entity '%s' successfully deleted!", entity_id) 

694 else: 

695 res.raise_for_status() 

696 except requests.RequestException as err: 

697 msg = f"Could not delete entity {entity_id} !" 

698 self.log_error(err=err, msg=msg) 

699 raise 

700 

701 if delete_devices: 

702 from filip.clients.ngsi_v2 import IoTAClient 

703 

704 if iota_client: 

705 iota_client_local = deepcopy(iota_client) 

706 else: 

707 warnings.warn( 

708 "No IoTA-Client object provided! " 

709 "Will try to generate one. " 

710 "This usage is not recommended." 

711 ) 

712 

713 iota_client_local = IoTAClient( 

714 url=iota_url, 

715 fiware_header=self.fiware_headers, 

716 headers=self.headers, 

717 ) 

718 

719 for device in iota_client_local.get_device_list( 

720 entity_names=[entity_id]): 

721 if entity_type: 

722 if device.entity_type == entity_type: 

723 iota_client_local.delete_device(device_id=device.device_id) 

724 else: 

725 iota_client_local.delete_device(device_id=device.device_id) 

726 iota_client_local.close() 

727 

728 def delete_entities(self, entities: List[ContextEntity]) -> None: 

729 """ 

730 Remove a list of entities from the context broker. This methode is 

731 more efficient than to call delete_entity() for each entity 

732 

733 Args: 

734 entities: List[ContextEntity]: List of entities to be deleted 

735 

736 Raises: 

737 Exception, if one of the entities is not in the ContextBroker 

738 

739 Returns: 

740 None 

741 """ 

742 

743 # update() delete, deletes all entities without attributes completely, 

744 # and removes the attributes for the other 

745 # The entities are sorted based on the fact if they have 

746 # attributes. 

747 entities_with_attributes: List[ContextEntity] = [] 

748 for entity in entities: 

749 attribute_names = [ 

750 key 

751 for key in entity.model_dump() 

752 if key not in ContextEntity.model_fields 

753 ] 

754 if len(attribute_names) > 0: 

755 entities_with_attributes.append( 

756 ContextEntity(id=entity.id, type=entity.type) 

757 ) 

758 

759 # Post update_delete for those without attribute only once, 

760 # for the other post update_delete again but for the changed entity 

761 # in the ContextBroker (only id and type left) 

762 if len(entities) > 0: 

763 self.update(entities=entities, action_type="delete") 

764 if len(entities_with_attributes) > 0: 

765 self.update(entities=entities_with_attributes, action_type="delete") 

766 

767 def update_or_append_entity_attributes( 

768 self, 

769 entity_id: str, 

770 attrs: Union[List[NamedContextAttribute], 

771 Dict[str, ContextAttribute], 

772 Dict[str, Any]], 

773 entity_type: str = None, 

774 append_strict: bool = False, 

775 forcedUpdate: bool = False, 

776 key_values: bool = False 

777 ): 

778 """ 

779 The request payload is an object representing the attributes to 

780 append or update. This corresponds to a 'POST' request if append is 

781 set to 'False' 

782 

783 Note: 

784 Be careful not to update attributes that are 

785 provided via context registration, e.g. commands. Commands are 

786 removed before sending the request. To avoid breaking things. 

787 

788 Args: 

789 entity_id: Entity id to be updated 

790 entity_type: Entity type, to avoid ambiguity in case there are 

791 several entities with the same entity id. 

792 attrs: List of attributes to update or to append 

793 append_strict: If `False` the entity attributes are updated (if they 

794 previously exist) or appended (if they don't previously exist) 

795 with the ones in the payload. 

796 If `True` all the attributes in the payload not 

797 previously existing in the entity are appended. In addition 

798 to that, in case some of the attributes in the payload 

799 already exist in the entity, an error is returned. 

800 More precisely this means a strict append procedure. 

801 forcedUpdate: Update operation have to trigger any matching 

802 subscription, no matter if there is an actual attribute 

803 update or no instead of the default behavior, which is to 

804 updated only if attribute is effectively updated. 

805 key_values: By default False. If set to True, the payload uses 

806 the keyValues simplified entity representation, i.e. 

807 ContextEntityKeyValues. 

808 Returns: 

809 None 

810 

811 """ 

812 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs") 

813 headers = self.headers.copy() 

814 params = {} 

815 if entity_type: 

816 params.update({'type': entity_type}) 

817 else: 

818 entity_type = "dummy" 

819 

820 options = [] 

821 if append_strict: 

822 options.append("append") 

823 if forcedUpdate: 

824 options.append("forcedUpdate") 

825 if key_values: 

826 assert isinstance(attrs, dict), "for keyValues attrs has to be a dict" 

827 options.append("keyValues") 

828 if options: 

829 params.update({'options': ",".join(options)}) 

830 

831 if key_values: 

832 entity = ContextEntityKeyValues(id=entity_id, type=entity_type, **attrs) 

833 else: 

834 entity = ContextEntity(id=entity_id, type=entity_type) 

835 entity.add_attributes(attrs) 

836 # exclude commands from the send data, 

837 # as they live in the IoTA-agent 

838 excluded_keys = {"id", "type"} 

839 # excluded_keys.update( 

840 # entity.get_commands(response_format=PropertyFormat.DICT).keys() 

841 # ) 

842 try: 

843 res = self.post( 

844 url=url, 

845 headers=headers, 

846 json=entity.model_dump( 

847 exclude=excluded_keys, 

848 exclude_none=True 

849 ), 

850 params=params, 

851 ) 

852 if res.ok: 

853 self.logger.info("Entity '%s' successfully " "updated!", entity.id) 

854 else: 

855 res.raise_for_status() 

856 except requests.RequestException as err: 

857 msg = f"Could not update or append attributes of entity" f" {entity.id} !" 

858 self.log_error(err=err, msg=msg) 

859 raise 

860 

861 def _patch_entity_key_values(self, 

862 entity: Union[ContextEntityKeyValues, dict],): 

863 """ 

864 The entity are updated with a ContextEntityKeyValues object or a 

865 dictionary contain the simplified entity data. This corresponds to a 

866 'PATCH' request. 

867 Only existing attribute can be updated! 

868 

869 Args: 

870 entity: A ContextEntityKeyValues object or a dictionary contain 

871 the simplified entity data 

872 

873 """ 

874 if isinstance(entity, dict): 

875 entity = ContextEntityKeyValues(**entity) 

876 url = urljoin(self.base_url, f'v2/entities/{entity.id}/attrs') 

877 headers = self.headers.copy() 

878 params = {"type": entity.type, 

879 "options": AttrsFormat.KEY_VALUES.value 

880 } 

881 try: 

882 res = self.patch(url=url, 

883 headers=headers, 

884 json=entity.model_dump(exclude={'id', 'type'}, 

885 exclude_unset=True), 

886 params=params) 

887 if res.ok: 

888 self.logger.info("Entity '%s' successfully " 

889 "updated!", entity.id) 

890 else: 

891 res.raise_for_status() 

892 except requests.RequestException as err: 

893 msg = f"Could not update attributes of entity" \ 

894 f" {entity.id} !" 

895 self.log_error(err=err, msg=msg) 

896 raise 

897 

898 def update_existing_entity_attributes( 

899 self, 

900 entity_id: str, 

901 attrs: Union[List[NamedContextAttribute], 

902 Dict[str, ContextAttribute], 

903 Dict[str, Any]], 

904 entity_type: str = None, 

905 forcedUpdate: bool = False, 

906 override_metadata: bool = False, 

907 key_values: bool = False, 

908 ): 

909 """ 

910 The entity attributes are updated with the ones in the payload. 

911 In addition to that, if one or more attributes in the payload doesn't 

912 exist in the entity, an error is returned. This corresponds to a 

913 'PATCH' request. 

914 

915 Args: 

916 entity_id: Entity id to be updated 

917 entity_type: Entity type, to avoid ambiguity in case there are 

918 several entities with the same entity id. 

919 attrs: List of attributes to update or to append 

920 forcedUpdate: Update operation have to trigger any matching 

921 subscription, no matter if there is an actual attribute 

922 update or no instead of the default behavior, which is to 

923 updated only if attribute is effectively updated. 

924 override_metadata: 

925 Bool,replace the existing metadata with the one provided in 

926 the request 

927 key_values: By default False. If set to True, the payload uses 

928 the keyValues simplified entity representation, i.e. 

929 ContextEntityKeyValues. 

930 Returns: 

931 None 

932 

933 """ 

934 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs") 

935 headers = self.headers.copy() 

936 if entity_type: 

937 params = {"type": entity_type} 

938 else: 

939 params = None 

940 entity_type = "dummy" 

941 

942 options = [] 

943 if override_metadata: 

944 options.append("overrideMetadata") 

945 if forcedUpdate: 

946 options.append("forcedUpdate") 

947 if key_values: 

948 assert isinstance(attrs, dict), "for keyValues the attrs must be dict" 

949 payload = attrs 

950 options.append("keyValues") 

951 else: 

952 entity = ContextEntity(id=entity_id, type=entity_type) 

953 entity.add_attributes(attrs) 

954 payload = entity.model_dump( 

955 exclude={"id", "type"}, 

956 exclude_none=True 

957 ) 

958 if options: 

959 params.update({'options': ",".join(options)}) 

960 

961 try: 

962 res = self.patch( 

963 url=url, 

964 headers=headers, 

965 json=payload, 

966 params=params, 

967 ) 

968 if res.ok: 

969 self.logger.info("Entity '%s' successfully " "updated!", entity_id) 

970 else: 

971 res.raise_for_status() 

972 except requests.RequestException as err: 

973 msg = f"Could not update attributes of entity" f" {entity_id} !" 

974 self.log_error(err=err, msg=msg) 

975 raise 

976 

977 def override_entity(self, 

978 entity: Union[ContextEntity, ContextEntityKeyValues], 

979 **kwargs 

980 ): 

981 """ 

982 The request payload is an object representing the attributes to 

983 override the existing entity. 

984 

985 Note: 

986 If you want to manipulate you should rather use patch_entity. 

987 

988 Args: 

989 entity (ContextEntity or ContextEntityKeyValues): 

990 Returns: 

991 None 

992 """ 

993 return self.replace_entity_attributes(entity_id=entity.id, 

994 entity_type=entity.type, 

995 attrs=entity.get_attributes(), 

996 **kwargs 

997 ) 

998 

999 def replace_entity_attributes( 

1000 self, 

1001 entity_id: str, 

1002 attrs: Union[List[Union[NamedContextAttribute, 

1003 Dict[str, ContextAttribute]]], 

1004 Dict], 

1005 entity_type: str = None, 

1006 forcedUpdate: bool = False, 

1007 key_values: bool = False, 

1008 ): 

1009 """ 

1010 The attributes previously existing in the entity are removed and 

1011 replaced by the ones in the request. This corresponds to a 'PUT' 

1012 request. 

1013 

1014 Args: 

1015 entity_id: Entity id to be updated 

1016 entity_type: Entity type, to avoid ambiguity in case there are 

1017 several entities with the same entity id. 

1018 attrs: List of attributes to add to the entity or dict of 

1019 attributes in case of key_values=True. 

1020 forcedUpdate: Update operation have to trigger any matching 

1021 subscription, no matter if there is an actual attribute 

1022 update or no instead of the default behavior, which is to 

1023 updated only if attribute is effectively updated. 

1024 key_values(bool): 

1025 By default False. If set to True, "options=keyValues" will 

1026 be included in params of the request. The payload uses 

1027 the keyValues simplified entity representation, i.e. 

1028 ContextEntityKeyValues. 

1029 Returns: 

1030 None 

1031 """ 

1032 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs") 

1033 headers = self.headers.copy() 

1034 params = {} 

1035 options = [] 

1036 if entity_type: 

1037 params.update({"type": entity_type}) 

1038 else: 

1039 entity_type = "dummy" 

1040 

1041 if forcedUpdate: 

1042 options.append("forcedUpdate") 

1043 

1044 if key_values: 

1045 options.append("keyValues") 

1046 assert isinstance(attrs, dict) 

1047 else: 

1048 entity = ContextEntity(id=entity_id, type=entity_type) 

1049 entity.add_attributes(attrs) 

1050 attrs = entity.model_dump( 

1051 exclude={"id", "type"}, 

1052 exclude_none=True 

1053 ) 

1054 if options: 

1055 params.update({'options': ",".join(options)}) 

1056 

1057 try: 

1058 res = self.put( 

1059 url=url, 

1060 headers=headers, 

1061 json=attrs, 

1062 params=params, 

1063 ) 

1064 if res.ok: 

1065 self.logger.info("Entity '%s' successfully " "updated!", entity_id) 

1066 else: 

1067 res.raise_for_status() 

1068 except requests.RequestException as err: 

1069 msg = f"Could not replace attribute of entity {entity_id} !" 

1070 self.log_error(err=err, msg=msg) 

1071 raise 

1072 

1073 # Attribute operations 

1074 def get_attribute( 

1075 self, 

1076 entity_id: str, 

1077 attr_name: str, 

1078 entity_type: str = None, 

1079 metadata: str = None, 

1080 response_format="", 

1081 ) -> ContextAttribute: 

1082 """ 

1083 Retrieves a specified attribute from an entity. 

1084 

1085 Args: 

1086 entity_id: Id of the entity. Example: Bcn_Welt 

1087 attr_name: Name of the attribute to be retrieved. 

1088 entity_type (Optional): Type of the entity to retrieve 

1089 metadata (Optional): A list of metadata names to include in the 

1090 response. See "Filtering out attributes and metadata" section 

1091 for more detail. 

1092 

1093 Returns: 

1094 The content of the retrieved attribute as ContextAttribute 

1095 

1096 Raises: 

1097 Error 

1098 

1099 """ 

1100 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs/{attr_name}") 

1101 headers = self.headers.copy() 

1102 params = {} 

1103 if entity_type: 

1104 params.update({"type": entity_type}) 

1105 if metadata: 

1106 params.update({"metadata": ",".join(metadata)}) 

1107 try: 

1108 res = self.get(url=url, params=params, headers=headers) 

1109 if res.ok: 

1110 self.logger.debug("Received: %s", res.json()) 

1111 return ContextAttribute(**res.json()) 

1112 res.raise_for_status() 

1113 except requests.RequestException as err: 

1114 msg = ( 

1115 f"Could not load attribute '{attr_name}' from entity" f"'{entity_id}' " 

1116 ) 

1117 self.log_error(err=err, msg=msg) 

1118 raise 

1119 

1120 def update_entity_attribute(self, 

1121 entity_id: str, 

1122 attr: Union[ContextAttribute, 

1123 NamedContextAttribute], 

1124 *, 

1125 entity_type: str = None, 

1126 attr_name: str = None, 

1127 override_metadata: bool = True, 

1128 forcedUpdate: bool = False): 

1129 """ 

1130 Updates a specified attribute from an entity. 

1131 

1132 Args: 

1133 attr: 

1134 context attribute to update 

1135 entity_id: 

1136 Id of the entity. Example: Bcn_Welt 

1137 entity_type: 

1138 Entity type, to avoid ambiguity in case there are 

1139 several entities with the same entity id. 

1140 forcedUpdate: Update operation have to trigger any matching 

1141 subscription, no matter if there is an actual attribute 

1142 update or no instead of the default behavior, which is to 

1143 updated only if attribute is effectively updated. 

1144 attr_name: 

1145 Name of the attribute to be updated. 

1146 override_metadata: 

1147 Bool, if set to `True` (default) the metadata will be 

1148 overwritten. This is for backwards compatibility reasons. 

1149 If `False` the metadata values will be either updated if 

1150 already existing or append if not. 

1151 See also: 

1152 https://fiware-orion.readthedocs.io/en/master/user/metadata.html 

1153 """ 

1154 headers = self.headers.copy() 

1155 if not isinstance(attr, NamedContextAttribute): 

1156 assert attr_name is not None, ( 

1157 "Missing name for attribute. " 

1158 "attr_name must be present if" 

1159 "attr is of type ContextAttribute" 

1160 ) 

1161 else: 

1162 assert attr_name is None, ( 

1163 "Invalid argument attr_name. Do not set " 

1164 "attr_name if attr is of type " 

1165 "NamedContextAttribute" 

1166 ) 

1167 attr_name = attr.name 

1168 

1169 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs/{attr_name}") 

1170 params = {} 

1171 if entity_type: 

1172 params.update({"type": entity_type}) 

1173 # set overrideMetadata option (we assure backwards compatibility here) 

1174 options = [] 

1175 if override_metadata: 

1176 options.append("overrideMetadata") 

1177 if forcedUpdate: 

1178 options.append("forcedUpdate") 

1179 if options: 

1180 params.update({'options': ",".join(options)}) 

1181 try: 

1182 res = self.put( 

1183 url=url, 

1184 headers=headers, 

1185 params=params, 

1186 json=attr.model_dump( 

1187 exclude={"name"}, 

1188 exclude_none=True 

1189 ), 

1190 ) 

1191 if res.ok: 

1192 self.logger.info( 

1193 "Attribute '%s' of '%s' " "successfully updated!", 

1194 attr_name, 

1195 entity_id, 

1196 ) 

1197 else: 

1198 res.raise_for_status() 

1199 except requests.RequestException as err: 

1200 msg = ( 

1201 f"Could not update attribute '{attr_name}' of entity" f"'{entity_id}' " 

1202 ) 

1203 self.log_error(err=err, msg=msg) 

1204 raise 

1205 

1206 def delete_entity_attribute( 

1207 self, entity_id: str, attr_name: str, entity_type: str = None 

1208 ) -> None: 

1209 """ 

1210 Removes a specified attribute from an entity. 

1211 

1212 Args: 

1213 entity_id: Id of the entity. 

1214 attr_name: Name of the attribute to be retrieved. 

1215 entity_type: Entity type, to avoid ambiguity in case there are 

1216 several entities with the same entity id. 

1217 Raises: 

1218 Error 

1219 

1220 """ 

1221 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs/{attr_name}") 

1222 headers = self.headers.copy() 

1223 params = {} 

1224 if entity_type: 

1225 params.update({"type": entity_type}) 

1226 try: 

1227 res = self.delete(url=url, headers=headers) 

1228 if res.ok: 

1229 self.logger.info( 

1230 "Attribute '%s' of '%s' " "successfully deleted!", 

1231 attr_name, 

1232 entity_id, 

1233 ) 

1234 else: 

1235 res.raise_for_status() 

1236 except requests.RequestException as err: 

1237 msg = ( 

1238 f"Could not delete attribute '{attr_name}' of entity '{entity_id}'" 

1239 ) 

1240 self.log_error(err=err, msg=msg) 

1241 raise 

1242 

1243 # Attribute value operations 

1244 def get_attribute_value( 

1245 self, entity_id: str, attr_name: str, entity_type: str = None 

1246 ) -> Any: 

1247 """ 

1248 This operation returns the value property with the value of the 

1249 attribute. 

1250 

1251 Args: 

1252 entity_id: Id of the entity. Example: Bcn_Welt 

1253 attr_name: Name of the attribute to be retrieved. 

1254 Example: temperature. 

1255 entity_type: Entity type, to avoid ambiguity in case there are 

1256 several entities with the same entity id. 

1257 

1258 Returns: 

1259 

1260 """ 

1261 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs/{attr_name}/value") 

1262 headers = self.headers.copy() 

1263 params = {} 

1264 if entity_type: 

1265 params.update({"type": entity_type}) 

1266 try: 

1267 res = self.get(url=url, params=params, headers=headers) 

1268 if res.ok: 

1269 self.logger.debug("Received: %s", res.json()) 

1270 return res.json() 

1271 res.raise_for_status() 

1272 except requests.RequestException as err: 

1273 msg = ( 

1274 f"Could not load value of attribute '{attr_name}' from " 

1275 f"entity'{entity_id}' " 

1276 ) 

1277 self.log_error(err=err, msg=msg) 

1278 raise 

1279 

1280 def update_attribute_value(self, *, 

1281 entity_id: str, 

1282 attr_name: str, 

1283 value: Any, 

1284 entity_type: str = None, 

1285 forcedUpdate: bool = False 

1286 ): 

1287 """ 

1288 Updates the value of a specified attribute of an entity 

1289 

1290 Args: 

1291 value: update value 

1292 entity_id: Id of the entity. Example: Bcn_Welt 

1293 attr_name: Name of the attribute to be retrieved. 

1294 Example: temperature. 

1295 entity_type: Entity type, to avoid ambiguity in case there are 

1296 several entities with the same entity id. 

1297 forcedUpdate: Update operation have to trigger any matching 

1298 subscription, no matter if there is an actual attribute 

1299 update or no instead of the default behavior, which is to 

1300 updated only if attribute is effectively updated. 

1301 Returns: 

1302 

1303 """ 

1304 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs/{attr_name}/value") 

1305 headers = self.headers.copy() 

1306 params = {} 

1307 if entity_type: 

1308 params.update({'type': entity_type}) 

1309 options = [] 

1310 if forcedUpdate: 

1311 options.append("forcedUpdate") 

1312 if options: 

1313 params.update({'options': ",".join(options)}) 

1314 try: 

1315 if not isinstance(value, (dict, list)): 

1316 headers.update({"Content-Type": "text/plain"}) 

1317 if isinstance(value, str): 

1318 value = f"{value}" 

1319 res = self.put(url=url, headers=headers, json=value, params=params) 

1320 else: 

1321 res = self.put(url=url, headers=headers, json=value, params=params) 

1322 if res.ok: 

1323 self.logger.info( 

1324 "Attribute '%s' of '%s' " "successfully updated!", 

1325 attr_name, 

1326 entity_id, 

1327 ) 

1328 else: 

1329 res.raise_for_status() 

1330 except requests.RequestException as err: 

1331 msg = ( 

1332 f"Could not update value of attribute '{attr_name}' from " 

1333 f"entity '{entity_id}' " 

1334 ) 

1335 self.log_error(err=err, msg=msg) 

1336 raise 

1337 

1338 # Types Operations 

1339 def get_entity_types( 

1340 self, *, limit: int = None, offset: int = None, options: str = None 

1341 ) -> List[Dict[str, Any]]: 

1342 """ 

1343 

1344 Args: 

1345 limit: Limit the number of types to be retrieved. 

1346 offset: Skip a number of records. 

1347 options: Options dictionary. Allowed: count, values 

1348 

1349 Returns: 

1350 

1351 """ 

1352 url = urljoin(self.base_url, "v2/types") 

1353 headers = self.headers.copy() 

1354 params = {} 

1355 if limit: 

1356 params.update({"limit": limit}) 

1357 if offset: 

1358 params.update({"offset": offset}) 

1359 if options: 

1360 params.update({"options": options}) 

1361 try: 

1362 res = self.get(url=url, params=params, headers=headers) 

1363 if res.ok: 

1364 self.logger.debug("Received: %s", res.json()) 

1365 return res.json() 

1366 res.raise_for_status() 

1367 except requests.RequestException as err: 

1368 msg = "Could not load entity types!" 

1369 self.log_error(err=err, msg=msg) 

1370 raise 

1371 

1372 def get_entity_type(self, entity_type: str) -> Dict[str, Any]: 

1373 """ 

1374 

1375 Args: 

1376 entity_type: Entity Type. Example: Room 

1377 

1378 Returns: 

1379 

1380 """ 

1381 url = urljoin(self.base_url, f"v2/types/{entity_type}") 

1382 headers = self.headers.copy() 

1383 params = {} 

1384 try: 

1385 res = self.get(url=url, params=params, headers=headers) 

1386 if res.ok: 

1387 self.logger.debug("Received: %s", res.json()) 

1388 return res.json() 

1389 res.raise_for_status() 

1390 except requests.RequestException as err: 

1391 msg = f"Could not load entities of type" f"'{entity_type}' " 

1392 self.log_error(err=err, msg=msg) 

1393 raise 

1394 

1395 # SUBSCRIPTION API ENDPOINTS 

1396 def get_subscription_list(self, limit: PositiveInt = inf) -> List[Subscription]: 

1397 """ 

1398 Returns a list of all the subscriptions present in the system. 

1399 Args: 

1400 limit: Limit the number of subscriptions to be retrieved 

1401 Returns: 

1402 list of subscriptions 

1403 """ 

1404 url = urljoin(self.base_url, "v2/subscriptions/") 

1405 headers = self.headers.copy() 

1406 params = {} 

1407 

1408 # We always use the 'count' option to check weather pagination is 

1409 # required 

1410 params.update({"options": "count"}) 

1411 try: 

1412 items = self.__pagination( 

1413 limit=limit, url=url, params=params, headers=headers 

1414 ) 

1415 adapter = TypeAdapter(List[Subscription]) 

1416 return adapter.validate_python(items) 

1417 except requests.RequestException as err: 

1418 msg = "Could not load subscriptions!" 

1419 self.log_error(err=err, msg=msg) 

1420 raise 

1421 

1422 def post_subscription( 

1423 self, 

1424 subscription: Subscription, 

1425 update: bool = False, 

1426 skip_initial_notification: bool = False, 

1427 ) -> str: 

1428 """ 

1429 Creates a new subscription. The subscription is represented by a 

1430 Subscription object defined in filip.cb.models. 

1431 

1432 If the subscription already exists, the adding is prevented and the id 

1433 of the existing subscription is returned. 

1434 

1435 A subscription is deemed as already existing if there exists a 

1436 subscription with the exact same subject and notification fields. All 

1437 optional fields are not considered. 

1438 

1439 Args: 

1440 subscription: Subscription 

1441 update: True - If the subscription already exists, update it 

1442 False- If the subscription already exists, throw warning 

1443 skip_initial_notification: True - Initial Notifications will be 

1444 sent to recipient containing the whole data. This is 

1445 deprecated and removed from version 3.0 of the context broker. 

1446 False - skip the initial notification 

1447 Returns: 

1448 str: Id of the (created) subscription 

1449 

1450 """ 

1451 existing_subscriptions = self.get_subscription_list() 

1452 

1453 sub_dict = subscription.model_dump(include={'subject', 

1454 'notification'}) 

1455 for ex_sub in existing_subscriptions: 

1456 if self._subscription_dicts_are_equal( 

1457 sub_dict, 

1458 ex_sub.model_dump(include={'subject', 'notification'}) 

1459 ): 

1460 self.logger.info("Subscription already exists") 

1461 if update: 

1462 self.logger.info("Updated subscription") 

1463 subscription.id = ex_sub.id 

1464 self.update_subscription(subscription) 

1465 else: 

1466 warnings.warn( 

1467 f"Subscription existed already with the id" f" {ex_sub.id}" 

1468 ) 

1469 return ex_sub.id 

1470 

1471 params = {} 

1472 if skip_initial_notification: 

1473 version = self.get_version()["orion"]["version"] 

1474 if parse_version(version) <= parse_version("3.1"): 

1475 params.update({"options": "skipInitialNotification"}) 

1476 else: 

1477 pass 

1478 warnings.warn( 

1479 f"Skip initial notifications is a deprecated " 

1480 f"feature of older versions <=3.1 of the context " 

1481 f"broker. The Context Broker that you requesting has " 

1482 f"version: {version}. For newer versions we " 

1483 f"automatically skip this option. Consider " 

1484 f"refactoring and updating your services", 

1485 DeprecationWarning, 

1486 ) 

1487 

1488 url = urljoin(self.base_url, "v2/subscriptions") 

1489 headers = self.headers.copy() 

1490 headers.update({"Content-Type": "application/json"}) 

1491 try: 

1492 res = self.post( 

1493 url=url, 

1494 headers=headers, 

1495 data=subscription.model_dump_json(exclude={"id"}, exclude_none=True), 

1496 params=params, 

1497 ) 

1498 if res.ok: 

1499 self.logger.info("Subscription successfully created!") 

1500 return res.headers["Location"].split("/")[-1] 

1501 res.raise_for_status() 

1502 except requests.RequestException as err: 

1503 msg = "Could not send subscription!" 

1504 self.log_error(err=err, msg=msg) 

1505 raise 

1506 

1507 def get_subscription(self, subscription_id: str) -> Subscription: 

1508 """ 

1509 Retrieves a subscription from 

1510 Args: 

1511 subscription_id: id of the subscription 

1512 

1513 Returns: 

1514 

1515 """ 

1516 url = urljoin(self.base_url, f"v2/subscriptions/{subscription_id}") 

1517 headers = self.headers.copy() 

1518 try: 

1519 res = self.get(url=url, headers=headers) 

1520 if res.ok: 

1521 self.logger.debug("Received: %s", res.json()) 

1522 return Subscription(**res.json()) 

1523 res.raise_for_status() 

1524 except requests.RequestException as err: 

1525 msg = f"Could not load subscription {subscription_id}" 

1526 self.log_error(err=err, msg=msg) 

1527 raise 

1528 

1529 def update_subscription( 

1530 self, subscription: Subscription, skip_initial_notification: bool = False 

1531 ): 

1532 """ 

1533 Only the fields included in the request are updated in the subscription. 

1534 

1535 Args: 

1536 subscription: Subscription to update 

1537 skip_initial_notification: True - Initial Notifications will be 

1538 sent to recipient containing the whole data. This is 

1539 deprecated and removed from version 3.0 of the context broker. 

1540 False - skip the initial notification 

1541 

1542 Returns: 

1543 None 

1544 """ 

1545 params = {} 

1546 if skip_initial_notification: 

1547 version = self.get_version()["orion"]["version"] 

1548 if parse_version(version) <= parse_version("3.1"): 

1549 params.update({"options": "skipInitialNotification"}) 

1550 else: 

1551 pass 

1552 warnings.warn( 

1553 f"Skip initial notifications is a deprecated " 

1554 f"feature of older versions <3.1 of the context " 

1555 f"broker. The Context Broker that you requesting has " 

1556 f"version: {version}. For newer versions we " 

1557 f"automatically skip this option. Consider " 

1558 f"refactoring and updating your services", 

1559 DeprecationWarning, 

1560 ) 

1561 

1562 url = urljoin(self.base_url, f"v2/subscriptions/{subscription.id}") 

1563 headers = self.headers.copy() 

1564 headers.update({"Content-Type": "application/json"}) 

1565 try: 

1566 res = self.patch( 

1567 url=url, 

1568 headers=headers, 

1569 data=subscription.model_dump_json( 

1570 exclude={"id"}, 

1571 exclude_none=True 

1572 ), 

1573 ) 

1574 if res.ok: 

1575 self.logger.info("Subscription successfully updated!") 

1576 else: 

1577 res.raise_for_status() 

1578 except requests.RequestException as err: 

1579 msg = f"Could not update subscription {subscription.id}" 

1580 self.log_error(err=err, msg=msg) 

1581 raise 

1582 

1583 def delete_subscription(self, subscription_id: str) -> None: 

1584 """ 

1585 Deletes a subscription from a Context Broker 

1586 Args: 

1587 subscription_id: id of the subscription 

1588 """ 

1589 url = urljoin(self.base_url, f"v2/subscriptions/{subscription_id}") 

1590 headers = self.headers.copy() 

1591 try: 

1592 res = self.delete(url=url, headers=headers) 

1593 if res.ok: 

1594 self.logger.info( 

1595 f"Subscription '{subscription_id}' " f"successfully deleted!" 

1596 ) 

1597 else: 

1598 res.raise_for_status() 

1599 except requests.RequestException as err: 

1600 msg = f"Could not delete subscription {subscription_id}" 

1601 self.log_error(err=err, msg=msg) 

1602 raise 

1603 

1604 # Registration API 

1605 def get_registration_list(self, *, limit: PositiveInt = None) -> List[Registration]: 

1606 """ 

1607 Lists all the context provider registrations present in the system. 

1608 

1609 Args: 

1610 limit: Limit the number of registrations to be retrieved 

1611 Returns: 

1612 

1613 """ 

1614 url = urljoin(self.base_url, "v2/registrations/") 

1615 headers = self.headers.copy() 

1616 params = {} 

1617 

1618 # We always use the 'count' option to check weather pagination is 

1619 # required 

1620 params.update({"options": "count"}) 

1621 try: 

1622 items = self.__pagination( 

1623 limit=limit, url=url, params=params, headers=headers 

1624 ) 

1625 adapter = TypeAdapter(List[Registration]) 

1626 return adapter.validate_python(items) 

1627 except requests.RequestException as err: 

1628 msg = "Could not load registrations!" 

1629 self.log_error(err=err, msg=msg) 

1630 raise 

1631 

1632 def post_registration(self, registration: Registration): 

1633 """ 

1634 Creates a new context provider registration. This is typically used 

1635 for binding context sources as providers of certain data. The 

1636 registration is represented by cb.models.Registration 

1637 

1638 Args: 

1639 registration (Registration): 

1640 

1641 Returns: 

1642 

1643 """ 

1644 url = urljoin(self.base_url, "v2/registrations") 

1645 headers = self.headers.copy() 

1646 headers.update({"Content-Type": "application/json"}) 

1647 try: 

1648 res = self.post( 

1649 url=url, 

1650 headers=headers, 

1651 data=registration.model_dump_json(exclude={"id"}, exclude_none=True), 

1652 ) 

1653 if res.ok: 

1654 self.logger.info("Registration successfully created!") 

1655 return res.headers["Location"].split("/")[-1] 

1656 res.raise_for_status() 

1657 except requests.RequestException as err: 

1658 msg = f"Could not send registration {registration.id}!" 

1659 self.log_error(err=err, msg=msg) 

1660 raise 

1661 

1662 def get_registration(self, registration_id: str) -> Registration: 

1663 """ 

1664 Retrieves a registration from context broker by id 

1665 

1666 Args: 

1667 registration_id: id of the registration 

1668 

1669 Returns: 

1670 Registration 

1671 """ 

1672 url = urljoin(self.base_url, f"v2/registrations/{registration_id}") 

1673 headers = self.headers.copy() 

1674 try: 

1675 res = self.get(url=url, headers=headers) 

1676 if res.ok: 

1677 self.logger.debug("Received: %s", res.json()) 

1678 return Registration(**res.json()) 

1679 res.raise_for_status() 

1680 except requests.RequestException as err: 

1681 msg = f"Could not load registration {registration_id} !" 

1682 self.log_error(err=err, msg=msg) 

1683 raise 

1684 

1685 def update_registration(self, registration: Registration): 

1686 """ 

1687 Only the fields included in the request are updated in the registration. 

1688 

1689 Args: 

1690 registration: Registration to update 

1691 Returns: 

1692 

1693 """ 

1694 url = urljoin(self.base_url, f"v2/registrations/{registration.id}") 

1695 headers = self.headers.copy() 

1696 headers.update({"Content-Type": "application/json"}) 

1697 try: 

1698 res = self.patch( 

1699 url=url, 

1700 headers=headers, 

1701 data=registration.model_dump_json( 

1702 exclude={"id"}, 

1703 exclude_none=True 

1704 ), 

1705 ) 

1706 if res.ok: 

1707 self.logger.info("Registration successfully updated!") 

1708 else: 

1709 res.raise_for_status() 

1710 except requests.RequestException as err: 

1711 msg = f"Could not update registration {registration.id} !" 

1712 self.log_error(err=err, msg=msg) 

1713 raise 

1714 

1715 def delete_registration(self, registration_id: str) -> None: 

1716 """ 

1717 Deletes a subscription from a Context Broker 

1718 Args: 

1719 registration_id: id of the subscription 

1720 """ 

1721 url = urljoin(self.base_url, f"v2/registrations/{registration_id}") 

1722 headers = self.headers.copy() 

1723 try: 

1724 res = self.delete(url=url, headers=headers) 

1725 if res.ok: 

1726 self.logger.info( 

1727 "Registration '%s' " "successfully deleted!", registration_id 

1728 ) 

1729 res.raise_for_status() 

1730 except requests.RequestException as err: 

1731 msg = f"Could not delete registration {registration_id} !" 

1732 self.log_error(err=err, msg=msg) 

1733 raise 

1734 

1735 # Batch operation API 

1736 def update(self, 

1737 *, 

1738 entities: List[Union[ContextEntity, ContextEntityKeyValues]], 

1739 action_type: Union[ActionType, str], 

1740 update_format: str = None, 

1741 forcedUpdate: bool = False, 

1742 override_metadata: bool = False, 

1743 ) -> None: 

1744 """ 

1745 This operation allows to create, update and/or delete several entities 

1746 in a single batch operation. 

1747 

1748 This operation is split in as many individual operations as entities 

1749 in the entities vector, so the actionType is executed for each one of 

1750 them. Depending on the actionType, a mapping with regular non-batch 

1751 operations can be done: 

1752 

1753 append: maps to POST /v2/entities (if the entity does not already exist) 

1754 or POST /v2/entities/<id>/attrs (if the entity already exists). 

1755 

1756 appendStrict: maps to POST /v2/entities (if the entity does not 

1757 already exist) or POST /v2/entities/<id>/attrs?options=append (if the 

1758 entity already exists). 

1759 

1760 update: maps to PATCH /v2/entities/<id>/attrs. 

1761 

1762 delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every 

1763 attribute included in the entity or to DELETE /v2/entities/<id> if 

1764 no attribute were included in the entity. 

1765 

1766 replace: maps to PUT /v2/entities/<id>/attrs. 

1767 

1768 Args: 

1769 entities: "an array of entities, each entity specified using the " 

1770 "JSON entity representation format " 

1771 action_type (Update): "actionType, to specify the kind of update 

1772 action to do: either append, appendStrict, update, delete, 

1773 or replace. " 

1774 update_format (str): Optional 'keyValues' 

1775 forcedUpdate: Update operation have to trigger any matching 

1776 subscription, no matter if there is an actual attribute 

1777 update or no instead of the default behavior, which is to 

1778 updated only if attribute is effectively updated. 

1779 override_metadata: 

1780 Bool, replace the existing metadata with the one provided in 

1781 the request 

1782 Returns: 

1783 

1784 """ 

1785 

1786 url = urljoin(self.base_url, "v2/op/update") 

1787 headers = self.headers.copy() 

1788 headers.update({"Content-Type": "application/json"}) 

1789 params = {} 

1790 options = [] 

1791 if override_metadata: 

1792 options.append("overrideMetadata") 

1793 if forcedUpdate: 

1794 options.append("forcedUpdate") 

1795 if update_format: 

1796 assert ( 

1797 update_format == "keyValues" 

1798 ), "Only 'keyValues' is allowed as update format" 

1799 options.append("keyValues") 

1800 if options: 

1801 params.update({'options': ",".join(options)}) 

1802 update = Update(actionType=action_type, entities=entities) 

1803 try: 

1804 res = self.post( 

1805 url=url, 

1806 headers=headers, 

1807 params=params, 

1808 json=update.model_dump(by_alias=True), 

1809 ) 

1810 if res.ok: 

1811 self.logger.info("Update operation '%s' succeeded!", action_type) 

1812 else: 

1813 res.raise_for_status() 

1814 except requests.RequestException as err: 

1815 msg = f"Update operation '{action_type}' failed!" 

1816 self.log_error(err=err, msg=msg) 

1817 raise 

1818 

1819 def query( 

1820 self, 

1821 *, 

1822 query: Query, 

1823 limit: PositiveInt = None, 

1824 order_by: str = None, 

1825 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

1826 ) -> List[Any]: 

1827 """ 

1828 Generate api query 

1829 Args: 

1830 query (Query): 

1831 limit (PositiveInt): 

1832 order_by (str): 

1833 response_format (AttrsFormat, str): 

1834 Returns: 

1835 The response payload is an Array containing one object per matching 

1836 entity, or an empty array [] if no entities are found. The entities 

1837 follow the JSON entity representation format (described in the 

1838 section "JSON Entity Representation"). 

1839 """ 

1840 url = urljoin(self.base_url, "v2/op/query") 

1841 headers = self.headers.copy() 

1842 headers.update({"Content-Type": "application/json"}) 

1843 params = {"options": "count"} 

1844 

1845 if response_format: 

1846 if response_format not in list(AttrsFormat): 

1847 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

1848 params["options"] = ",".join([response_format, "count"]) 

1849 try: 

1850 items = self.__pagination( 

1851 method=PaginationMethod.POST, 

1852 url=url, 

1853 headers=headers, 

1854 params=params, 

1855 data=query.model_dump_json(exclude_none=True), 

1856 limit=limit, 

1857 ) 

1858 if response_format == AttrsFormat.NORMALIZED: 

1859 adapter = TypeAdapter(List[ContextEntity]) 

1860 return adapter.validate_python(items) 

1861 if response_format == AttrsFormat.KEY_VALUES: 

1862 adapter = TypeAdapter(List[ContextEntityKeyValues]) 

1863 return adapter.validate_python(items) 

1864 return items 

1865 except requests.RequestException as err: 

1866 msg = "Query operation failed!" 

1867 self.log_error(err=err, msg=msg) 

1868 raise 

1869 

1870 def notify(self, message: Message) -> None: 

1871 """ 

1872 This operation is intended to consume a notification payload so that 

1873 all the entity data included by such notification is persisted, 

1874 overwriting if necessary. This operation is useful when one NGSIv2 

1875 endpoint is subscribed to another NGSIv2 endpoint (federation 

1876 scenarios). The request payload must be an NGSIv2 notification 

1877 payload. The behaviour must be exactly the same as 'update' 

1878 with 'action_type' equal to append. 

1879 

1880 Args: 

1881 message: Notification message 

1882 

1883 Returns: 

1884 None 

1885 """ 

1886 url = urljoin(self.base_url, "v2/op/notify") 

1887 headers = self.headers.copy() 

1888 headers.update({"Content-Type": "application/json"}) 

1889 params = {} 

1890 try: 

1891 res = self.post( 

1892 url=url, 

1893 headers=headers, 

1894 params=params, 

1895 data=message.model_dump_json(by_alias=True), 

1896 ) 

1897 if res.ok: 

1898 self.logger.info("Notification message sent!") 

1899 else: 

1900 res.raise_for_status() 

1901 except requests.RequestException as err: 

1902 msg = ( 

1903 f"Sending notifcation message failed! \n " 

1904 f"{message.model_dump_json(indent=2)}" 

1905 ) 

1906 self.log_error(err=err, msg=msg) 

1907 raise 

1908 

1909 def post_command( 

1910 self, 

1911 *, 

1912 entity_id: str, 

1913 command: Union[Command, NamedCommand, Dict], 

1914 entity_type: str = None, 

1915 command_name: str = None, 

1916 ) -> None: 

1917 """ 

1918 Post a command to a context entity this corresponds to 'PATCH' of the 

1919 specified command attribute. 

1920 

1921 Args: 

1922 entity_id: Entity identifier 

1923 command: Command 

1924 entity_type: Entity type 

1925 command_name: Name of the command in the entity 

1926 

1927 Returns: 

1928 None 

1929 """ 

1930 if command_name: 

1931 assert isinstance(command, (Command, dict)) 

1932 if isinstance(command, dict): 

1933 command = Command(**command) 

1934 command = {command_name: command.model_dump()} 

1935 else: 

1936 assert isinstance(command, (NamedCommand, dict)) 

1937 if isinstance(command, dict): 

1938 command = NamedCommand(**command) 

1939 

1940 self.update_existing_entity_attributes( 

1941 entity_id=entity_id, entity_type=entity_type, attrs=[command] 

1942 ) 

1943 

1944 def does_entity_exist(self, entity_id: str, entity_type: str) -> bool: 

1945 """ 

1946 Test if an entity with given id and type is present in the CB 

1947 

1948 Args: 

1949 entity_id: Entity id 

1950 entity_type: Entity type 

1951 

1952 Returns: 

1953 bool; True if entity exists 

1954 

1955 Raises: 

1956 RequestException, if any error occurs (e.g: No Connection), 

1957 except that the entity is not found 

1958 """ 

1959 url = urljoin(self.base_url, f"v2/entities/{entity_id}") 

1960 headers = self.headers.copy() 

1961 params = {"type": entity_type} 

1962 

1963 try: 

1964 res = self.get(url=url, params=params, headers=headers) 

1965 if res.ok: 

1966 return True 

1967 res.raise_for_status() 

1968 except requests.RequestException as err: 

1969 if err.response is None or not err.response.status_code == 404: 

1970 raise 

1971 return False 

1972 

1973 def patch_entity(self, 

1974 entity: ContextEntity, 

1975 old_entity: Optional[ContextEntity] = None, 

1976 override_attr_metadata: bool = True) -> None: 

1977 """ 

1978 Takes a given entity and updates the state in the CB to match it. 

1979 It is an extended equivalent to the HTTP method PATCH, which applies 

1980 partial modifications to a resource. 

1981 

1982 Args: 

1983 entity: Entity to update 

1984 old_entity: OPTIONAL, if given only the differences between the 

1985 old_entity and entity are updated in the CB. 

1986 Other changes made to the entity in CB, can be kept. 

1987 If type or id was changed, the old_entity will be 

1988 deleted. 

1989 override_attr_metadata: 

1990 Whether to override or append the attributes metadata. 

1991 `True` for overwrite or `False` for update/append 

1992 

1993 Returns: 

1994 None 

1995 """ 

1996 

1997 new_entity = entity 

1998 

1999 if old_entity is None: 

2000 # If no old entity_was provided we use the current state to compare 

2001 # the entity to 

2002 if self.does_entity_exist( 

2003 entity_id=new_entity.id, entity_type=new_entity.type 

2004 ): 

2005 old_entity = self.get_entity( 

2006 entity_id=new_entity.id, entity_type=new_entity.type 

2007 ) 

2008 else: 

2009 # the entity is new, post and finish 

2010 self.post_entity(new_entity, update=False) 

2011 return 

2012 

2013 else: 

2014 # An old_entity was provided 

2015 # check if the old_entity (still) exists else recall methode 

2016 # and discard old_entity 

2017 if not self.does_entity_exist( 

2018 entity_id=old_entity.id, entity_type=old_entity.type 

2019 ): 

2020 self.patch_entity( 

2021 new_entity, override_attr_metadata=override_attr_metadata 

2022 ) 

2023 return 

2024 

2025 # if type or id was changed, the old_entity needs to be deleted 

2026 # and the new_entity created 

2027 # In this case we will lose the current state of the entity 

2028 if old_entity.id != new_entity.id or old_entity.type != new_entity.type: 

2029 self.delete_entity(entity_id=old_entity.id, entity_type=old_entity.type) 

2030 

2031 if not self.does_entity_exist( 

2032 entity_id=new_entity.id, entity_type=new_entity.type 

2033 ): 

2034 self.post_entity(entity=new_entity, update=False) 

2035 return 

2036 

2037 # At this point we know that we need to patch only the attributes of 

2038 # the entity 

2039 # Check the differences between the attributes of old and new entity 

2040 # Delete the removed attributes, create the new ones, 

2041 # and update the existing if necessary 

2042 old_attributes = old_entity.get_attributes() 

2043 new_attributes = new_entity.get_attributes() 

2044 

2045 # Manage attributes that existed before 

2046 for old_attr in old_attributes: 

2047 # commands do not exist in the ContextEntity and are only 

2048 # registrations to the corresponding device. Operations as 

2049 # delete will fail as it does not technically exist 

2050 corresponding_new_attr = None 

2051 for new_attr in new_attributes: 

2052 if new_attr.name == old_attr.name: 

2053 corresponding_new_attr = new_attr 

2054 

2055 if corresponding_new_attr is None: 

2056 # Attribute no longer exists, delete it 

2057 try: 

2058 self.delete_entity_attribute( 

2059 entity_id=new_entity.id, 

2060 entity_type=new_entity.type, 

2061 attr_name=old_attr.name, 

2062 ) 

2063 except requests.RequestException as err: 

2064 # if the attribute is provided by a registration the 

2065 # deletion will fail 

2066 if not err.response.status_code == 404: 

2067 raise 

2068 else: 

2069 # Check if attributed changed in any way, if yes update 

2070 # else do nothing and keep current state 

2071 if old_attr != corresponding_new_attr: 

2072 try: 

2073 self.update_entity_attribute( 

2074 entity_id=new_entity.id, 

2075 entity_type=new_entity.type, 

2076 attr=corresponding_new_attr, 

2077 override_metadata=override_attr_metadata, 

2078 ) 

2079 except requests.RequestException as err: 

2080 # if the attribute is provided by a registration the 

2081 # update will fail 

2082 if not err.response.status_code == 404: 

2083 raise 

2084 

2085 # Create new attributes 

2086 update_entity = ContextEntity(id=entity.id, type=entity.type) 

2087 update_needed = False 

2088 for new_attr in new_attributes: 

2089 # commands do not exist in the ContextEntity and are only 

2090 # registrations to the corresponding device. Operations as 

2091 # delete will fail as it does not technically exists 

2092 attr_existed = False 

2093 for old_attr in old_attributes: 

2094 if new_attr.name == old_attr.name: 

2095 attr_existed = True 

2096 

2097 if not attr_existed: 

2098 update_needed = True 

2099 update_entity.add_attributes([new_attr]) 

2100 

2101 if update_needed: 

2102 self.update_entity(update_entity) 

2103 

2104 def _subscription_dicts_are_equal(self, first: dict, second: dict): 

2105 """ 

2106 Check if two dictionaries and all sub-dictionaries are equal. 

2107 Logs a warning if the keys are not equal, but ignores the 

2108 comparison of such keys. 

2109 

2110 Args: 

2111 first dict: Dictionary of first subscription 

2112 second dict: Dictionary of second subscription 

2113 

2114 Returns: 

2115 True if equal, else False 

2116 """ 

2117 

2118 def _value_is_not_none(value): 

2119 """ 

2120 Recursive function to check if a value equals none. 

2121 If the value is a dict and any value of the dict is not none, 

2122 the value is not none. 

2123 If the value is a list and any item is not none, the value is not none. 

2124 If it's neither dict nore list, bool is used. 

2125 """ 

2126 if isinstance(value, dict): 

2127 return any([_value_is_not_none(value=_v) 

2128 for _v in value.values()]) 

2129 if isinstance(value, list): 

2130 return any([_value_is_not_none(value=_v)for _v in value]) 

2131 else: 

2132 return bool(value) 

2133 if first.keys() != second.keys(): 

2134 warnings.warn( 

2135 "Subscriptions contain a different set of fields. " 

2136 "Only comparing to new fields of the new one." 

2137 ) 

2138 for k, v in first.items(): 

2139 ex_value = second.get(k, None) 

2140 if isinstance(v, dict) and isinstance(ex_value, dict): 

2141 equal = self._subscription_dicts_are_equal(v, ex_value) 

2142 if equal: 

2143 continue 

2144 else: 

2145 return False 

2146 if v != ex_value: 

2147 self.logger.debug(f"Not equal fields for key {k}: ({v}, {ex_value})") 

2148 if not _value_is_not_none(v) and not _value_is_not_none(ex_value) or k == "timesSent": 

2149 continue 

2150 return False 

2151 return True 

2152 

2153 

2154# 

2155# 

2156# def check_duplicate_subscription(self, subscription_body, limit: int = 20): 

2157# """ 

2158# Function compares the subject of the subscription body, on whether a subscription 

2159# already exists for a device / entity. 

2160# :param subscription_body: the body of the new subscripton 

2161# :param limit: pagination parameter, to set the number of 

2162# subscriptions bodies the get request should grab 

2163# :return: exists, boolean -> True, if such a subscription allready 

2164# exists 

2165# """ 

2166# exists = False 

2167# subscription_subject = json.loads(subscription_body)["subject"] 

2168# # Exact keys depend on subscription body 

2169# try: 

2170# subscription_url = json.loads(subscription_body)[ 

2171# "notification"]["httpCustom"]["url"] 

2172# except KeyError: 

2173# subscription_url = json.loads(subscription_body)[ 

2174# "notification"]["http"]["url"] 

2175# 

2176# # If the number of subscriptions is larger then the limit, 

2177# paginations methods have to be used 

2178# url = self.url + '/v2/subscriptions?limit=' + str(limit) + 

2179# '&options=count' 

2180# response = self.session.get(url, headers=self.get_header()) 

2181# 

2182# sub_count = float(response.headers["Fiware-Total-Count"]) 

2183# response = json.loads(response.text) 

2184# if sub_count >= limit: 

2185# response = self.get_pagination(url=url, headers=self.get_header(), 

2186# limit=limit, count=sub_count) 

2187# response = json.loads(response) 

2188# 

2189# for existing_subscription in response: 

2190# # check whether the exact same subscriptions already exists 

2191# if existing_subscription["subject"] == subscription_subject: 

2192# exists = True 

2193# break 

2194# try: 

2195# existing_url = existing_subscription["notification"][ 

2196# "http"]["url"] 

2197# except KeyError: 

2198# existing_url = existing_subscription["notification"][ 

2199# "httpCustom"]["url"] 

2200# # check whether both subscriptions notify to the same path 

2201# if existing_url != subscription_url: 

2202# continue 

2203# else: 

2204# # iterate over all entities included in the subscription object 

2205# for entity in subscription_subject["entities"]: 

2206# if 'type' in entity.keys(): 

2207# subscription_type = entity['type'] 

2208# else: 

2209# subscription_type = entity['typePattern'] 

2210# if 'id' in entity.keys(): 

2211# subscription_id = entity['id'] 

2212# else: 

2213# subscription_id = entity["idPattern"] 

2214# # iterate over all entities included in the exisiting 

2215# subscriptions 

2216# for existing_entity in existing_subscription["subject"][ 

2217# "entities"]: 

2218# if "type" in entity.keys(): 

2219# type_existing = entity["type"] 

2220# else: 

2221# type_existing = entity["typePattern"] 

2222# if "id" in entity.keys(): 

2223# id_existing = entity["id"] 

2224# else: 

2225# id_existing = entity["idPattern"] 

2226# # as the ID field is non optional, it has to match 

2227# # check whether the type match 

2228# # if the type field is empty, they match all types 

2229# if (type_existing == subscription_type) or\ 

2230# ('*' in subscription_type) or \ 

2231# ('*' in type_existing)\ 

2232# or (type_existing == "") or ( 

2233# subscription_type == ""): 

2234# # check if on of the subscriptions is a pattern, 

2235# or if they both refer to the same id 

2236# # Get the attrs first, to avoid code duplication 

2237# # last thing to compare is the attributes 

2238# # Assumption -> position is the same as the 

2239# entities _list 

2240# # i == j 

2241# i = subscription_subject["entities"].index(entity) 

2242# j = existing_subscription["subject"][ 

2243# "entities"].index(existing_entity) 

2244# try: 

2245# subscription_attrs = subscription_subject[ 

2246# "condition"]["attrs"][i] 

2247# except (KeyError, IndexError): 

2248# subscription_attrs = [] 

2249# try: 

2250# existing_attrs = existing_subscription[ 

2251# "subject"]["condition"]["attrs"][j] 

2252# except (KeyError, IndexError): 

2253# existing_attrs = [] 

2254# 

2255# if (".*" in subscription_id) or ('.*' in 

2256# id_existing) or (subscription_id == id_existing): 

2257# # Attributes have to match, or the have to 

2258# be an empty array 

2259# if (subscription_attrs == existing_attrs) or 

2260# (subscription_attrs == []) or (existing_attrs == []): 

2261# exists = True 

2262# # if they do not match completely or subscribe 

2263# to all ids they have to match up to a certain position 

2264# elif ("*" in subscription_id) or ('*' in 

2265# id_existing): 

2266# regex_existing = id_existing.find('*') 

2267# regex_subscription = 

2268# subscription_id.find('*') 

2269# # slice the strings to compare 

2270# if (id_existing[:regex_existing] in 

2271# subscription_id) or (subscription_id[:regex_subscription] in id_existing) or \ 

2272# (id_existing[regex_existing:] in 

2273# subscription_id) or (subscription_id[regex_subscription:] in id_existing): 

2274# if (subscription_attrs == 

2275# existing_attrs) or (subscription_attrs == []) or (existing_attrs == []): 

2276# exists = True 

2277# else: 

2278# continue 

2279# else: 

2280# continue 

2281# else: 

2282# continue 

2283# else: 

2284# continue 

2285# else: 

2286# continue 

2287# return exists 

2288#