Coverage for filip/clients/ngsi_v2/cb.py: 81%

724 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-04-17 14:42 +0000

1""" 

2Context Broker Module for API Client 

3""" 

4 

5from __future__ import annotations 

6 

7import copy 

8from copy import deepcopy 

9from enum import Enum 

10from math import inf 

11from pkg_resources import parse_version 

12from pydantic import PositiveInt, PositiveFloat, AnyHttpUrl, ValidationError 

13from pydantic.type_adapter import TypeAdapter 

14from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union 

15import re 

16import requests 

17from urllib.parse import urljoin 

18import warnings 

19from filip.clients.base_http_client import BaseHttpClient, NgsiURLVersion 

20from filip.config import settings 

21from filip.models.base import FiwareHeader, PaginationMethod, DataType 

22from filip.utils.simple_ql import QueryString 

23from filip.models.ngsi_v2.context import ( 

24 ActionType, 

25 Command, 

26 ContextEntity, 

27 ContextEntityKeyValues, 

28 ContextAttribute, 

29 NamedCommand, 

30 NamedContextAttribute, 

31 Query, 

32 Update, 

33 PropertyFormat, 

34 ContextEntityList, 

35 ContextEntityKeyValuesList, 

36 ContextEntityValidationList, 

37 ContextEntityKeyValuesValidationList, 

38) 

39from filip.models.ngsi_v2.base import AttrsFormat 

40from filip.models.ngsi_v2.subscriptions import Subscription, Message 

41from filip.models.ngsi_v2.registrations import Registration 

42 

43if TYPE_CHECKING: 

44 from filip.clients.ngsi_v2.iota import IoTAClient 

45 

46 

47class ContextBrokerClient(BaseHttpClient): 

48 """ 

49 Implementation of NGSI Context Broker functionalities, such as creating 

50 entities and subscriptions; retrieving, updating and deleting data. 

51 Further documentation: 

52 https://fiware-orion.readthedocs.io/en/master/ 

53 

54 Api specifications for v2 are located here: 

55 https://telefonicaid.github.io/fiware-orion/api/v2/stable/ 

56 

57 Note: 

58 We use the reference implementation for development. Therefore, some 

59 other brokers may show slightly different behavior! 

60 """ 

61 

62 def __init__( 

63 self, 

64 url: str = None, 

65 *, 

66 session: requests.Session = None, 

67 fiware_header: FiwareHeader = None, 

68 **kwargs, 

69 ): 

70 """ 

71 

72 Args: 

73 url: Url of context broker server 

74 session (requests.Session): 

75 fiware_header (FiwareHeader): fiware service and fiware service path 

76 **kwargs (Optional): Optional arguments that ``request`` takes. 

77 """ 

78 # set service url 

79 url = url or settings.CB_URL 

80 self._url_version = NgsiURLVersion.v2_url.value 

81 super().__init__( 

82 url=url, session=session, fiware_header=fiware_header, **kwargs 

83 ) 

84 

85 def __pagination( 

86 self, 

87 *, 

88 method: PaginationMethod = PaginationMethod.GET, 

89 url: str, 

90 headers: Dict, 

91 limit: Union[PositiveInt, PositiveFloat] = None, 

92 params: Dict = None, 

93 data: str = None, 

94 ) -> List[Dict]: 

95 """ 

96 NGSIv2 implements a pagination mechanism in order to help clients to 

97 retrieve large sets of resources. This mechanism works for all listing 

98 operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions, 

99 POST /v2/op/query, etc.). This function helps getting datasets that are 

100 larger than the limit for the different GET operations. 

101 

102 https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html 

103 

104 Args: 

105 url: Information about the url, obtained from the original function 

106 headers: The headers from the original function 

107 params: 

108 limit: 

109 

110 Returns: 

111 object: 

112 

113 """ 

114 if limit is None: 

115 limit = inf 

116 if limit > 1000: 

117 params["limit"] = 1000 # maximum items per request 

118 else: 

119 params["limit"] = limit 

120 

121 if self.session: 

122 session = self.session 

123 else: 

124 session = requests.Session() 

125 with session: 

126 res = session.request( 

127 method=method, url=url, params=params, headers=headers, data=data 

128 ) 

129 if res.ok: 

130 items = res.json() 

131 # do pagination 

132 count = int(res.headers["Fiware-Total-Count"]) 

133 

134 while len(items) < limit and len(items) < count: 

135 # Establishing the offset from where entities are retrieved 

136 params["offset"] = len(items) 

137 params["limit"] = min(1000, (limit - len(items))) 

138 res = session.request( 

139 method=method, 

140 url=url, 

141 params=params, 

142 headers=headers, 

143 data=data, 

144 ) 

145 if res.ok: 

146 items.extend(res.json()) 

147 else: 

148 res.raise_for_status() 

149 self.logger.debug("Received: %s", items) 

150 return items 

151 res.raise_for_status() 

152 

153 # MANAGEMENT API 

154 def get_version(self) -> Dict: 

155 """ 

156 Gets version of IoT Agent 

157 Returns: 

158 Dictionary with response 

159 """ 

160 url = urljoin(self.base_url, "version") 

161 try: 

162 res = self.get(url=url, headers=self.headers) 

163 if res.ok: 

164 return res.json() 

165 res.raise_for_status() 

166 except requests.RequestException as err: 

167 self.logger.error(err) 

168 raise 

169 

170 def get_resources(self) -> Dict: 

171 """ 

172 Gets reo 

173 

174 Returns: 

175 Dict 

176 """ 

177 url = urljoin(self.base_url, self._url_version) 

178 try: 

179 res = self.get(url=url, headers=self.headers) 

180 if res.ok: 

181 return res.json() 

182 res.raise_for_status() 

183 except requests.RequestException as err: 

184 self.logger.error(err) 

185 raise 

186 

187 # STATISTICS API 

188 def get_statistics(self) -> Dict: 

189 """ 

190 Gets statistics of context broker 

191 Returns: 

192 Dictionary with response 

193 """ 

194 url = urljoin(self.base_url, "statistics") 

195 try: 

196 res = self.get(url=url, headers=self.headers) 

197 if res.ok: 

198 return res.json() 

199 res.raise_for_status() 

200 except requests.RequestException as err: 

201 self.logger.error(err) 

202 raise 

203 

204 # CONTEXT MANAGEMENT API ENDPOINTS 

205 # Entity Operations 

206 def post_entity( 

207 self, 

208 entity: Union[ContextEntity, ContextEntityKeyValues], 

209 update: bool = False, 

210 patch: bool = False, 

211 override_attr_metadata: bool = True, 

212 key_values: bool = False, 

213 ): 

214 """ 

215 Function registers an Object with the NGSI Context Broker, 

216 if it already exists it can be automatically updated (overwritten) 

217 if the update bool is True. 

218 First a post request with the entity is tried, if the response code 

219 is 422 the entity is uncrossable, as it already exists there are two 

220 options, either overwrite it, if the attribute have changed 

221 (e.g. at least one new/new values) (update = True) or leave 

222 it the way it is (update=False) 

223 If you only want to manipulate the entities values, you need to set 

224 patch argument. 

225 

226 Args: 

227 entity (ContextEntity/ContextEntityKeyValues): 

228 Context Entity Object 

229 update (bool): 

230 If the response.status_code is 422, whether the override and 

231 existing entity 

232 patch (bool): 

233 If the response.status_code is 422, whether to manipulate the 

234 existing entity. Omitted if update `True`. 

235 override_attr_metadata: 

236 Only applies for patch equal to `True`. 

237 Whether to override or append the attribute's metadata. 

238 `True` for overwrite or `False` for update/append 

239 key_values(bool): 

240 By default False. If set to True, "options=keyValues" will 

241 be included in params of post request. The payload uses 

242 the keyValues simplified entity representation, i.e. 

243 ContextEntityKeyValues. 

244 """ 

245 url = urljoin(self.base_url, f"{self._url_version}/entities") 

246 headers = self.headers.copy() 

247 params = {} 

248 options = [] 

249 if key_values: 

250 assert isinstance(entity, ContextEntityKeyValues) 

251 options.append("keyValues") 

252 else: 

253 assert isinstance(entity, ContextEntity) 

254 if options: 

255 params.update({"options": ",".join(options)}) 

256 try: 

257 res = self.post( 

258 url=url, 

259 headers=headers, 

260 json=entity.model_dump(exclude_none=True), 

261 params=params, 

262 ) 

263 if res.ok: 

264 self.logger.info("Entity successfully posted!") 

265 return res.headers.get("Location") 

266 res.raise_for_status() 

267 except requests.RequestException as err: 

268 if err.response is not None: 

269 if update and err.response.status_code == 422: 

270 return self.override_entity(entity=entity, key_values=key_values) 

271 if patch and err.response.status_code == 422: 

272 if not key_values: 

273 return self.patch_entity( 

274 entity=entity, override_attr_metadata=override_attr_metadata 

275 ) 

276 else: 

277 return self._patch_entity_key_values(entity=entity) 

278 msg = f"Could not post entity {entity.id}" 

279 self.log_error(err=err, msg=msg) 

280 raise 

281 

282 def get_entity_list( 

283 self, 

284 *, 

285 entity_ids: List[str] = None, 

286 entity_types: List[str] = None, 

287 id_pattern: str = None, 

288 type_pattern: str = None, 

289 q: Union[str, QueryString] = None, 

290 mq: Union[str, QueryString] = None, 

291 georel: str = None, 

292 geometry: str = None, 

293 coords: str = None, 

294 limit: PositiveInt = inf, 

295 attrs: List[str] = None, 

296 metadata: str = None, 

297 order_by: str = None, 

298 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

299 include_invalid: bool = False, 

300 ) -> Union[ 

301 List[Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]], 

302 ContextEntityValidationList, 

303 ContextEntityKeyValuesValidationList, 

304 ]: 

305 r""" 

306 Retrieves a list of context entities that match different criteria by 

307 id, type, pattern matching (either id or type) and/or those which 

308 match a query or geographical query (see Simple Query Language and 

309 Geographical Queries). A given entity has to match all the criteria 

310 to be retrieved (i.e., the criteria is combined in a logical AND 

311 way). Note that pattern matching query parameters are incompatible 

312 (i.e. mutually exclusive) with their corresponding exact matching 

313 parameters, i.e. idPattern with id and typePattern with type. 

314 

315 Args: 

316 entity_ids: A comma-separated list of elements. Retrieve entities 

317 whose ID matches one of the elements in the list. 

318 Incompatible with idPattern,e.g. Boe_Idarium 

319 entity_types: comma-separated list of elements. Retrieve entities 

320 whose type matches one of the elements in the list. 

321 Incompatible with typePattern. Example: Room. 

322 id_pattern: A correctly formatted regular expression. Retrieve 

323 entities whose ID matches the regular expression. Incompatible 

324 with id, e.g. ngsi-ld.* or sensor.* 

325 type_pattern: A correctly formatted regular expression. Retrieve 

326 entities whose type matches the regular expression. 

327 Incompatible with type, e.g. room.* 

328 q (SimpleQuery): A query expression, composed of a list of 

329 statements separated by ;, i.e., 

330 q=statement1;statement2;statement3. See Simple Query 

331 Language specification. Example: temperature>40. 

332 mq (SimpleQuery): A query expression for attribute metadata, 

333 composed of a list of statements separated by ;, i.e., 

334 mq=statement1;statement2;statement3. See Simple Query 

335 Language specification. Example: temperature.accuracy<0.9. 

336 georel: Spatial relationship between matching entities and a 

337 reference shape. See Geographical Queries. Example: 'near'. 

338 geometry: Geographical area to which the query is restricted. 

339 See Geographical Queries. Example: point. 

340 coords: List of latitude-longitude pairs of coordinates separated 

341 by ';'. See Geographical Queries. Example: 41.390205, 

342 2.154007;48.8566,2.3522. 

343 limit: Limits the number of entities to be retrieved Example: 20 

344 attrs: Comma-separated list of attribute names whose data are to 

345 be included in the response. The attributes are retrieved in 

346 the order specified by this parameter. If this parameter is 

347 not included, the attributes are retrieved in arbitrary 

348 order. See "Filtering out attributes and metadata" section 

349 for more detail. Example: seatNumber. 

350 metadata: A list of metadata names to include in the response. 

351 See "Filtering out attributes and metadata" section for more 

352 detail. Example: accuracy. 

353 order_by: Criteria for ordering results. See "Ordering Results" 

354 section for details. Example: temperature,!speed. 

355 response_format (AttrsFormat, str): Response Format. Note: That if 

356 'keyValues' or 'values' are used the response model will 

357 change to List[ContextEntityKeyValues] and to List[Dict[str, 

358 Any]], respectively. 

359 include_invalid: Specify if the returned list should also contain a list of invalid entity IDs or not. 

360 Returns: 

361 

362 """ 

363 url = urljoin(self.base_url, f"{self._url_version}/entities/") 

364 headers = self.headers.copy() 

365 params = {} 

366 

367 if entity_ids and id_pattern: 

368 raise ValueError 

369 if entity_types and type_pattern: 

370 raise ValueError 

371 if entity_ids: 

372 if not isinstance(entity_ids, list): 

373 entity_ids = [entity_ids] 

374 params.update({"id": ",".join(entity_ids)}) 

375 if id_pattern: 

376 try: 

377 re.compile(id_pattern) 

378 except re.error as err: 

379 raise ValueError(f"Invalid Pattern: {err}") from err 

380 params.update({"idPattern": id_pattern}) 

381 if entity_types: 

382 if not isinstance(entity_types, list): 

383 entity_types = [entity_types] 

384 params.update({"type": ",".join(entity_types)}) 

385 if type_pattern: 

386 try: 

387 re.compile(type_pattern) 

388 except re.error as err: 

389 raise ValueError(f"Invalid Pattern: {err.msg}") from err 

390 params.update({"typePattern": type_pattern}) 

391 if attrs: 

392 params.update({"attrs": ",".join(attrs)}) 

393 if metadata: 

394 params.update({"metadata": ",".join(metadata)}) 

395 if q: 

396 if isinstance(q, str): 

397 q = QueryString.parse_str(q) 

398 params.update({"q": str(q)}) 

399 if mq: 

400 params.update({"mq": str(mq)}) 

401 if geometry: 

402 params.update({"geometry": geometry}) 

403 if georel: 

404 params.update({"georel": georel}) 

405 if coords: 

406 params.update({"coords": coords}) 

407 if order_by: 

408 params.update({"orderBy": order_by}) 

409 if response_format not in list(AttrsFormat): 

410 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

411 response_format = ",".join(["count", response_format]) 

412 params.update({"options": response_format}) 

413 try: 

414 items = self.__pagination( 

415 method=PaginationMethod.GET, 

416 limit=limit, 

417 url=url, 

418 params=params, 

419 headers=headers, 

420 ) 

421 if include_invalid: 

422 valid_entities = [] 

423 invalid_entities = [] 

424 

425 if AttrsFormat.NORMALIZED in response_format: 

426 adapter = TypeAdapter(ContextEntity) 

427 

428 for entity in items: 

429 try: 

430 valid_entity = adapter.validate_python(entity) 

431 valid_entities.append(valid_entity) 

432 except ValidationError: 

433 invalid_entities.append(entity.get("id")) 

434 

435 return ContextEntityValidationList.model_validate( 

436 { 

437 "entities": valid_entities, 

438 "invalid_entities": invalid_entities, 

439 } 

440 ) 

441 elif AttrsFormat.KEY_VALUES in response_format: 

442 adapter = TypeAdapter(ContextEntityKeyValues) 

443 

444 for entity in items: 

445 try: 

446 valid_entity = adapter.validate_python(entity) 

447 valid_entities.append(valid_entity) 

448 except ValidationError: 

449 invalid_entities.append(entity.get("id")) 

450 

451 return ContextEntityKeyValuesValidationList.model_validate( 

452 { 

453 "entities": valid_entities, 

454 "invalid_entities": invalid_entities, 

455 } 

456 ) 

457 else: 

458 return items 

459 else: 

460 if AttrsFormat.NORMALIZED in response_format: 

461 return ContextEntityList.model_validate( 

462 {"entities": items} 

463 ).entities 

464 elif AttrsFormat.KEY_VALUES in response_format: 

465 return ContextEntityKeyValuesList.model_validate( 

466 {"entities": items} 

467 ).entities 

468 return items # in case of VALUES as response_format 

469 

470 except requests.RequestException as err: 

471 msg = "Could not load entities" 

472 self.log_error(err=err, msg=msg) 

473 raise 

474 

475 def get_entity( 

476 self, 

477 entity_id: str, 

478 entity_type: str = None, 

479 attrs: List[str] = None, 

480 metadata: List[str] = None, 

481 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

482 ) -> Union[ContextEntity, ContextEntityKeyValues, Dict[str, Any]]: 

483 """ 

484 This operation must return one entity element only, but there may be 

485 more than one entity with the same ID (e.g. entities with same ID but 

486 different types). In such case, an error message is returned, with 

487 the HTTP status code set to 409 Conflict. 

488 

489 Args: 

490 entity_id (String): Id of the entity to be retrieved 

491 entity_type (String): Entity type, to avoid ambiguity in case 

492 there are several entities with the same entity id. 

493 attrs (List of Strings): List of attribute names whose data must be 

494 included in the response. The attributes are retrieved in the 

495 order specified by this parameter. 

496 See "Filtering out attributes and metadata" section for more 

497 detail. If this parameter is not included, the attributes are 

498 retrieved in arbitrary order, and all the attributes of the 

499 entity are included in the response. 

500 Example: temperature, humidity. 

501 metadata (List of Strings): A list of metadata names to include in 

502 the response. See "Filtering out attributes and metadata" 

503 section for more detail. Example: accuracy. 

504 response_format (AttrsFormat, str): Representation format of 

505 response 

506 Returns: 

507 ContextEntity 

508 """ 

509 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}") 

510 headers = self.headers.copy() 

511 params = {} 

512 if entity_type: 

513 params.update({"type": entity_type}) 

514 if attrs: 

515 params.update({"attrs": ",".join(attrs)}) 

516 if metadata: 

517 params.update({"metadata": ",".join(metadata)}) 

518 if response_format not in list(AttrsFormat): 

519 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

520 params.update({"options": response_format}) 

521 

522 try: 

523 res = self.get(url=url, params=params, headers=headers) 

524 if res.ok: 

525 self.logger.info("Entity successfully retrieved!") 

526 self.logger.debug("Received: %s", res.json()) 

527 if response_format == AttrsFormat.NORMALIZED: 

528 return ContextEntity(**res.json()) 

529 if response_format == AttrsFormat.KEY_VALUES: 

530 return ContextEntityKeyValues(**res.json()) 

531 return res.json() 

532 res.raise_for_status() 

533 except requests.RequestException as err: 

534 msg = f"Could not load entity {entity_id}" 

535 self.log_error(err=err, msg=msg) 

536 raise 

537 

538 def get_entity_attributes( 

539 self, 

540 entity_id: str, 

541 entity_type: str = None, 

542 attrs: List[str] = None, 

543 metadata: List[str] = None, 

544 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

545 ) -> Dict[str, ContextAttribute]: 

546 """ 

547 This request is similar to retrieving the whole entity, however this 

548 one omits the id and type fields. Just like the general request of 

549 getting an entire entity, this operation must return only one entity 

550 element. If more than one entity with the same ID is found (e.g. 

551 entities with same ID but different type), an error message is 

552 returned, with the HTTP status code set to 409 Conflict. 

553 

554 Args: 

555 entity_id (String): Id of the entity to be retrieved 

556 entity_type (String): Entity type, to avoid ambiguity in case 

557 there are several entities with the same entity id. 

558 attrs (List of Strings): List of attribute names whose data must be 

559 included in the response. The attributes are retrieved in the 

560 order specified by this parameter. 

561 See "Filtering out attributes and metadata" section for more 

562 detail. If this parameter is not included, the attributes are 

563 retrieved in arbitrary order, and all the attributes of the 

564 entity are included in the response. Example: temperature, 

565 humidity. 

566 metadata (List of Strings): A list of metadata names to include in 

567 the response. See "Filtering out attributes and metadata" 

568 section for more detail. Example: accuracy. 

569 response_format (AttrsFormat, str): Representation format of 

570 response 

571 Returns: 

572 Dict 

573 """ 

574 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs") 

575 headers = self.headers.copy() 

576 params = {} 

577 if entity_type: 

578 params.update({"type": entity_type}) 

579 if attrs: 

580 params.update({"attrs": ",".join(attrs)}) 

581 if metadata: 

582 params.update({"metadata": ",".join(metadata)}) 

583 if response_format not in list(AttrsFormat): 

584 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

585 params.update({"options": response_format}) 

586 try: 

587 res = self.get(url=url, params=params, headers=headers) 

588 if res.ok: 

589 if response_format == AttrsFormat.NORMALIZED: 

590 return { 

591 key: ContextAttribute(**values) 

592 for key, values in res.json().items() 

593 } 

594 return res.json() 

595 res.raise_for_status() 

596 except requests.RequestException as err: 

597 msg = f"Could not load attributes from entity {entity_id} !" 

598 self.log_error(err=err, msg=msg) 

599 raise 

600 

601 def update_entity( 

602 self, 

603 entity: Union[ContextEntity, ContextEntityKeyValues, dict], 

604 append_strict: bool = False, 

605 key_values: bool = False, 

606 ): 

607 """ 

608 The request payload is an object representing the attributes to 

609 append or update. 

610 

611 Note: 

612 Update means overwriting the existing entity. If you want to 

613 manipulate you should rather use patch_entity. 

614 

615 Args: 

616 entity (ContextEntity): 

617 append_strict: If `False` the entity attributes are updated (if they 

618 previously exist) or appended (if they don't previously exist) 

619 with the ones in the payload. 

620 If `True` all the attributes in the payload not 

621 previously existing in the entity are appended. In addition 

622 to that, in case some of the attributes in the payload 

623 already exist in the entity, an error is returned. 

624 More precisely this means a strict append procedure. 

625 key_values: By default False. If set to True, the payload uses 

626 the keyValues simplified entity representation, i.e. 

627 ContextEntityKeyValues. 

628 Returns: 

629 None 

630 """ 

631 if key_values: 

632 if isinstance(entity, dict): 

633 entity = copy.deepcopy(entity) 

634 _id = entity.pop("id") 

635 _type = entity.pop("type") 

636 attrs = entity 

637 entity = ContextEntityKeyValues(id=_id, type=_type) 

638 else: 

639 attrs = entity.model_dump(exclude={"id", "type"}) 

640 else: 

641 attrs = entity.get_attributes() 

642 self.update_or_append_entity_attributes( 

643 entity_id=entity.id, 

644 entity_type=entity.type, 

645 attrs=attrs, 

646 append_strict=append_strict, 

647 key_values=key_values, 

648 ) 

649 

650 def update_entity_properties( 

651 self, entity: ContextEntity, append_strict: bool = False 

652 ): 

653 """ 

654 The request payload is an object representing the attributes, of any type 

655 but Relationship, to append or update. 

656 

657 Note: 

658 Update means overwriting the existing entity. If you want to 

659 manipulate you should rather use patch_entity. 

660 

661 Args: 

662 entity (ContextEntity): 

663 append_strict: If `False` the entity attributes are updated (if they 

664 previously exist) or appended (if they don't previously exist) 

665 with the ones in the payload. 

666 If `True` all the attributes in the payload not 

667 previously existing in the entity are appended. In addition 

668 to that, in case some of the attributes in the payload 

669 already exist in the entity, an error is returned. 

670 More precisely this means a strict append procedure. 

671 

672 Returns: 

673 None 

674 """ 

675 self.update_or_append_entity_attributes( 

676 entity_id=entity.id, 

677 entity_type=entity.type, 

678 attrs=entity.get_properties(), 

679 append_strict=append_strict, 

680 ) 

681 

682 def update_entity_relationships( 

683 self, entity: ContextEntity, append_strict: bool = False 

684 ): 

685 """ 

686 The request payload is an object representing only the attributes, of type 

687 Relationship, to append or update. 

688 

689 Note: 

690 Update means overwriting the existing entity. If you want to 

691 manipulate you should rather use patch_entity. 

692 

693 Args: 

694 entity (ContextEntity): 

695 append_strict: If `False` the entity attributes are updated (if they 

696 previously exist) or appended (if they don't previously exist) 

697 with the ones in the payload. 

698 If `True` all the attributes in the payload not 

699 previously existing in the entity are appended. In addition 

700 to that, in case some of the attributes in the payload 

701 already exist in the entity, an error is returned. 

702 More precisely this means a strict append procedure. 

703 

704 Returns: 

705 None 

706 """ 

707 self.update_or_append_entity_attributes( 

708 entity_id=entity.id, 

709 entity_type=entity.type, 

710 attrs=entity.get_relationships(), 

711 append_strict=append_strict, 

712 ) 

713 

714 def delete_entity( 

715 self, 

716 entity_id: str, 

717 entity_type: str = None, 

718 delete_devices: bool = False, 

719 iota_client: IoTAClient = None, 

720 iota_url: AnyHttpUrl = settings.IOTA_URL, 

721 ) -> None: 

722 """ 

723 Remove a entity from the context broker. No payload is required 

724 or received. 

725 

726 Args: 

727 entity_id: 

728 Id of the entity to be deleted 

729 entity_type: 

730 Entity type, to avoid ambiguity in case there are several 

731 entities with the same entity id. 

732 delete_devices: 

733 If True, also delete all devices that reference this 

734 entity (entity_id as entity_name) 

735 iota_client: 

736 Corresponding IoTA-Client used to access IoTA-Agent 

737 iota_url: 

738 URL of the corresponding IoT-Agent. This will autogenerate 

739 an IoTA-Client, mirroring the information of the 

740 ContextBrokerClient, e.g. FiwareHeader, and other headers 

741 

742 Returns: 

743 None 

744 """ 

745 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}") 

746 headers = self.headers.copy() 

747 if entity_type: 

748 params = {"type": entity_type} 

749 else: 

750 params = None 

751 try: 

752 res = self.delete(url=url, params=params, headers=headers) 

753 if res.ok: 

754 self.logger.info("Entity '%s' successfully deleted!", entity_id) 

755 else: 

756 res.raise_for_status() 

757 except requests.RequestException as err: 

758 msg = f"Could not delete entity {entity_id} !" 

759 self.log_error(err=err, msg=msg) 

760 raise 

761 

762 if delete_devices: 

763 from filip.clients.ngsi_v2 import IoTAClient 

764 

765 if iota_client: 

766 iota_client_local = deepcopy(iota_client) 

767 else: 

768 warnings.warn( 

769 "No IoTA-Client object provided! " 

770 "Will try to generate one. " 

771 "This usage is not recommended." 

772 ) 

773 

774 iota_client_local = IoTAClient( 

775 url=iota_url, 

776 fiware_header=self.fiware_headers, 

777 headers=self.headers, 

778 ) 

779 

780 for device in iota_client_local.get_device_list(entity_names=[entity_id]): 

781 if entity_type: 

782 if device.entity_type == entity_type: 

783 iota_client_local.delete_device(device_id=device.device_id) 

784 else: 

785 iota_client_local.delete_device(device_id=device.device_id) 

786 iota_client_local.close() 

787 

788 def delete_entities(self, entities: List[ContextEntity]) -> None: 

789 """ 

790 Remove a list of entities from the context broker. This methode is 

791 more efficient than to call delete_entity() for each entity 

792 

793 Args: 

794 entities: List[ContextEntity]: List of entities to be deleted 

795 

796 Raises: 

797 Exception, if one of the entities is not in the ContextBroker 

798 

799 Returns: 

800 None 

801 """ 

802 

803 # update() delete, deletes all entities without attributes completely, 

804 # and removes the attributes for the other 

805 # The entities are sorted based on the fact if they have 

806 # attributes. 

807 entities_with_attributes: List[ContextEntity] = [] 

808 for entity in entities: 

809 attribute_names = [ 

810 key 

811 for key in entity.model_dump() 

812 if key not in ContextEntity.model_fields 

813 ] 

814 if len(attribute_names) > 0: 

815 entities_with_attributes.append( 

816 ContextEntity(id=entity.id, type=entity.type) 

817 ) 

818 

819 # Post update_delete for those without attribute only once, 

820 # for the other post update_delete again but for the changed entity 

821 # in the ContextBroker (only id and type left) 

822 if len(entities) > 0: 

823 self.update(entities=entities, action_type="delete") 

824 if len(entities_with_attributes) > 0: 

825 self.update(entities=entities_with_attributes, action_type="delete") 

826 

827 def update_or_append_entity_attributes( 

828 self, 

829 entity_id: str, 

830 attrs: Union[ 

831 List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any] 

832 ], 

833 entity_type: str = None, 

834 append_strict: bool = False, 

835 forcedUpdate: bool = False, 

836 key_values: bool = False, 

837 ): 

838 """ 

839 The request payload is an object representing the attributes to 

840 append or update. This corresponds to a 'POST' request if append is 

841 set to 'False' 

842 

843 Note: 

844 Be careful not to update attributes that are 

845 provided via context registration, e.g. commands. Commands are 

846 removed before sending the request. To avoid breaking things. 

847 

848 Args: 

849 entity_id: Entity id to be updated 

850 entity_type: Entity type, to avoid ambiguity in case there are 

851 several entities with the same entity id. 

852 attrs: List of attributes to update or to append 

853 append_strict: If `False` the entity attributes are updated (if they 

854 previously exist) or appended (if they don't previously exist) 

855 with the ones in the payload. 

856 If `True` all the attributes in the payload not 

857 previously existing in the entity are appended. In addition 

858 to that, in case some of the attributes in the payload 

859 already exist in the entity, an error is returned. 

860 More precisely this means a strict append procedure. 

861 forcedUpdate: Update operation have to trigger any matching 

862 subscription, no matter if there is an actual attribute 

863 update or no instead of the default behavior, which is to 

864 updated only if attribute is effectively updated. 

865 key_values: By default False. If set to True, the payload uses 

866 the keyValues simplified entity representation, i.e. 

867 ContextEntityKeyValues. 

868 Returns: 

869 None 

870 

871 """ 

872 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs") 

873 headers = self.headers.copy() 

874 params = {} 

875 if entity_type: 

876 params.update({"type": entity_type}) 

877 else: 

878 entity_type = "dummy" 

879 

880 options = [] 

881 if append_strict: 

882 options.append("append") 

883 if forcedUpdate: 

884 options.append("forcedUpdate") 

885 if key_values: 

886 assert isinstance(attrs, dict), "for keyValues attrs has to be a dict" 

887 options.append("keyValues") 

888 if options: 

889 params.update({"options": ",".join(options)}) 

890 

891 if key_values: 

892 entity = ContextEntityKeyValues(id=entity_id, type=entity_type, **attrs) 

893 else: 

894 entity = ContextEntity(id=entity_id, type=entity_type) 

895 entity.add_attributes(attrs) 

896 # exclude commands from the send data, 

897 # as they live in the IoTA-agent 

898 excluded_keys = {"id", "type"} 

899 # excluded_keys.update( 

900 # entity.get_commands(response_format=PropertyFormat.DICT).keys() 

901 # ) 

902 try: 

903 res = self.post( 

904 url=url, 

905 headers=headers, 

906 json=entity.model_dump(exclude=excluded_keys, exclude_none=True), 

907 params=params, 

908 ) 

909 if res.ok: 

910 self.logger.info("Entity '%s' successfully " "updated!", entity.id) 

911 else: 

912 res.raise_for_status() 

913 except requests.RequestException as err: 

914 msg = f"Could not update or append attributes of entity" f" {entity.id} !" 

915 self.log_error(err=err, msg=msg) 

916 raise 

917 

918 def _patch_entity_key_values( 

919 self, 

920 entity: Union[ContextEntityKeyValues, dict], 

921 ): 

922 """ 

923 The entity are updated with a ContextEntityKeyValues object or a 

924 dictionary contain the simplified entity data. This corresponds to a 

925 'PATCH' request. 

926 Only existing attribute can be updated! 

927 

928 Args: 

929 entity: A ContextEntityKeyValues object or a dictionary contain 

930 the simplified entity data 

931 

932 """ 

933 if isinstance(entity, dict): 

934 entity = ContextEntityKeyValues(**entity) 

935 url = urljoin(self.base_url, f"v2/entities/{entity.id}/attrs") 

936 headers = self.headers.copy() 

937 params = {"type": entity.type, "options": AttrsFormat.KEY_VALUES.value} 

938 try: 

939 res = self.patch( 

940 url=url, 

941 headers=headers, 

942 json=entity.model_dump(exclude={"id", "type"}, exclude_unset=True), 

943 params=params, 

944 ) 

945 if res.ok: 

946 self.logger.info("Entity '%s' successfully " "updated!", entity.id) 

947 else: 

948 res.raise_for_status() 

949 except requests.RequestException as err: 

950 msg = f"Could not update attributes of entity" f" {entity.id} !" 

951 self.log_error(err=err, msg=msg) 

952 raise 

953 

954 def update_existing_entity_attributes( 

955 self, 

956 entity_id: str, 

957 attrs: Union[ 

958 List[NamedContextAttribute], Dict[str, ContextAttribute], Dict[str, Any] 

959 ], 

960 entity_type: str = None, 

961 forcedUpdate: bool = False, 

962 override_metadata: bool = False, 

963 key_values: bool = False, 

964 ): 

965 """ 

966 The entity attributes are updated with the ones in the payload. 

967 In addition to that, if one or more attributes in the payload doesn't 

968 exist in the entity, an error is returned. This corresponds to a 

969 'PATCH' request. 

970 

971 Args: 

972 entity_id: Entity id to be updated 

973 entity_type: Entity type, to avoid ambiguity in case there are 

974 several entities with the same entity id. 

975 attrs: List of attributes to update or to append 

976 forcedUpdate: Update operation have to trigger any matching 

977 subscription, no matter if there is an actual attribute 

978 update or no instead of the default behavior, which is to 

979 updated only if attribute is effectively updated. 

980 override_metadata: 

981 Bool,replace the existing metadata with the one provided in 

982 the request 

983 key_values: By default False. If set to True, the payload uses 

984 the keyValues simplified entity representation, i.e. 

985 ContextEntityKeyValues. 

986 Returns: 

987 None 

988 

989 """ 

990 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs") 

991 headers = self.headers.copy() 

992 if entity_type: 

993 params = {"type": entity_type} 

994 else: 

995 params = None 

996 entity_type = "dummy" 

997 

998 options = [] 

999 if override_metadata: 

1000 options.append("overrideMetadata") 

1001 if forcedUpdate: 

1002 options.append("forcedUpdate") 

1003 if key_values: 

1004 assert isinstance(attrs, dict), "for keyValues the attrs must be dict" 

1005 payload = attrs 

1006 options.append("keyValues") 

1007 else: 

1008 entity = ContextEntity(id=entity_id, type=entity_type) 

1009 entity.add_attributes(attrs) 

1010 payload = entity.model_dump(exclude={"id", "type"}, exclude_none=True) 

1011 if options: 

1012 params.update({"options": ",".join(options)}) 

1013 

1014 try: 

1015 res = self.patch( 

1016 url=url, 

1017 headers=headers, 

1018 json=payload, 

1019 params=params, 

1020 ) 

1021 if res.ok: 

1022 self.logger.info("Entity '%s' successfully " "updated!", entity_id) 

1023 else: 

1024 res.raise_for_status() 

1025 except requests.RequestException as err: 

1026 msg = f"Could not update attributes of entity" f" {entity_id} !" 

1027 self.log_error(err=err, msg=msg) 

1028 raise 

1029 

1030 def override_entity( 

1031 self, entity: Union[ContextEntity, ContextEntityKeyValues], **kwargs 

1032 ): 

1033 """ 

1034 The request payload is an object representing the attributes to 

1035 override the existing entity. 

1036 

1037 Note: 

1038 If you want to manipulate you should rather use patch_entity. 

1039 

1040 Args: 

1041 entity (ContextEntity or ContextEntityKeyValues): 

1042 Returns: 

1043 None 

1044 """ 

1045 return self.replace_entity_attributes( 

1046 entity_id=entity.id, 

1047 entity_type=entity.type, 

1048 attrs=entity.get_attributes(), 

1049 **kwargs, 

1050 ) 

1051 

1052 def replace_entity_attributes( 

1053 self, 

1054 entity_id: str, 

1055 attrs: Union[ 

1056 List[Union[NamedContextAttribute, Dict[str, ContextAttribute]]], Dict 

1057 ], 

1058 entity_type: str = None, 

1059 forcedUpdate: bool = False, 

1060 key_values: bool = False, 

1061 ): 

1062 """ 

1063 The attributes previously existing in the entity are removed and 

1064 replaced by the ones in the request. This corresponds to a 'PUT' 

1065 request. 

1066 

1067 Args: 

1068 entity_id: Entity id to be updated 

1069 entity_type: Entity type, to avoid ambiguity in case there are 

1070 several entities with the same entity id. 

1071 attrs: List of attributes to add to the entity or dict of 

1072 attributes in case of key_values=True. 

1073 forcedUpdate: Update operation have to trigger any matching 

1074 subscription, no matter if there is an actual attribute 

1075 update or no instead of the default behavior, which is to 

1076 updated only if attribute is effectively updated. 

1077 key_values(bool): 

1078 By default False. If set to True, "options=keyValues" will 

1079 be included in params of the request. The payload uses 

1080 the keyValues simplified entity representation, i.e. 

1081 ContextEntityKeyValues. 

1082 Returns: 

1083 None 

1084 """ 

1085 url = urljoin(self.base_url, f"{self._url_version}/entities/{entity_id}/attrs") 

1086 headers = self.headers.copy() 

1087 params = {} 

1088 options = [] 

1089 if entity_type: 

1090 params.update({"type": entity_type}) 

1091 else: 

1092 entity_type = "dummy" 

1093 

1094 if forcedUpdate: 

1095 options.append("forcedUpdate") 

1096 

1097 if key_values: 

1098 options.append("keyValues") 

1099 assert isinstance(attrs, dict) 

1100 else: 

1101 entity = ContextEntity(id=entity_id, type=entity_type) 

1102 entity.add_attributes(attrs) 

1103 attrs = entity.model_dump(exclude={"id", "type"}, exclude_none=True) 

1104 if options: 

1105 params.update({"options": ",".join(options)}) 

1106 

1107 try: 

1108 res = self.put( 

1109 url=url, 

1110 headers=headers, 

1111 json=attrs, 

1112 params=params, 

1113 ) 

1114 if res.ok: 

1115 self.logger.info("Entity '%s' successfully " "updated!", entity_id) 

1116 else: 

1117 res.raise_for_status() 

1118 except requests.RequestException as err: 

1119 msg = f"Could not replace attribute of entity {entity_id} !" 

1120 self.log_error(err=err, msg=msg) 

1121 raise 

1122 

1123 # Attribute operations 

1124 def get_attribute( 

1125 self, 

1126 entity_id: str, 

1127 attr_name: str, 

1128 entity_type: str = None, 

1129 metadata: str = None, 

1130 response_format="", 

1131 ) -> ContextAttribute: 

1132 """ 

1133 Retrieves a specified attribute from an entity. 

1134 

1135 Args: 

1136 entity_id: Id of the entity. Example: Bcn_Welt 

1137 attr_name: Name of the attribute to be retrieved. 

1138 entity_type (Optional): Type of the entity to retrieve 

1139 metadata (Optional): A list of metadata names to include in the 

1140 response. See "Filtering out attributes and metadata" section 

1141 for more detail. 

1142 

1143 Returns: 

1144 The content of the retrieved attribute as ContextAttribute 

1145 

1146 Raises: 

1147 Error 

1148 

1149 """ 

1150 url = urljoin( 

1151 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}" 

1152 ) 

1153 headers = self.headers.copy() 

1154 params = {} 

1155 if entity_type: 

1156 params.update({"type": entity_type}) 

1157 if metadata: 

1158 params.update({"metadata": ",".join(metadata)}) 

1159 try: 

1160 res = self.get(url=url, params=params, headers=headers) 

1161 if res.ok: 

1162 self.logger.debug("Received: %s", res.json()) 

1163 return ContextAttribute(**res.json()) 

1164 res.raise_for_status() 

1165 except requests.RequestException as err: 

1166 msg = ( 

1167 f"Could not load attribute '{attr_name}' from entity" f"'{entity_id}' " 

1168 ) 

1169 self.log_error(err=err, msg=msg) 

1170 raise 

1171 

1172 def update_entity_attribute( 

1173 self, 

1174 entity_id: str, 

1175 attr: Union[ContextAttribute, NamedContextAttribute], 

1176 *, 

1177 entity_type: str = None, 

1178 attr_name: str = None, 

1179 override_metadata: bool = True, 

1180 forcedUpdate: bool = False, 

1181 ): 

1182 """ 

1183 Updates a specified attribute from an entity. 

1184 

1185 Args: 

1186 attr: 

1187 context attribute to update 

1188 entity_id: 

1189 Id of the entity. Example: Bcn_Welt 

1190 entity_type: 

1191 Entity type, to avoid ambiguity in case there are 

1192 several entities with the same entity id. 

1193 forcedUpdate: Update operation have to trigger any matching 

1194 subscription, no matter if there is an actual attribute 

1195 update or no instead of the default behavior, which is to 

1196 updated only if attribute is effectively updated. 

1197 attr_name: 

1198 Name of the attribute to be updated. 

1199 override_metadata: 

1200 Bool, if set to `True` (default) the metadata will be 

1201 overwritten. This is for backwards compatibility reasons. 

1202 If `False` the metadata values will be either updated if 

1203 already existing or append if not. 

1204 See also: 

1205 https://fiware-orion.readthedocs.io/en/master/user/metadata.html 

1206 """ 

1207 headers = self.headers.copy() 

1208 if not isinstance(attr, NamedContextAttribute): 

1209 assert attr_name is not None, ( 

1210 "Missing name for attribute. " 

1211 "attr_name must be present if" 

1212 "attr is of type ContextAttribute" 

1213 ) 

1214 else: 

1215 assert attr_name is None, ( 

1216 "Invalid argument attr_name. Do not set " 

1217 "attr_name if attr is of type " 

1218 "NamedContextAttribute" 

1219 ) 

1220 attr_name = attr.name 

1221 

1222 url = urljoin( 

1223 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}" 

1224 ) 

1225 params = {} 

1226 if entity_type: 

1227 params.update({"type": entity_type}) 

1228 # set overrideMetadata option (we assure backwards compatibility here) 

1229 options = [] 

1230 if override_metadata: 

1231 options.append("overrideMetadata") 

1232 if forcedUpdate: 

1233 options.append("forcedUpdate") 

1234 if options: 

1235 params.update({"options": ",".join(options)}) 

1236 try: 

1237 res = self.put( 

1238 url=url, 

1239 headers=headers, 

1240 params=params, 

1241 json=attr.model_dump(exclude={"name"}, exclude_none=True), 

1242 ) 

1243 if res.ok: 

1244 self.logger.info( 

1245 "Attribute '%s' of '%s' " "successfully updated!", 

1246 attr_name, 

1247 entity_id, 

1248 ) 

1249 else: 

1250 res.raise_for_status() 

1251 except requests.RequestException as err: 

1252 msg = ( 

1253 f"Could not update attribute '{attr_name}' of entity" f"'{entity_id}' " 

1254 ) 

1255 self.log_error(err=err, msg=msg) 

1256 raise 

1257 

1258 def delete_entity_attribute( 

1259 self, entity_id: str, attr_name: str, entity_type: str = None 

1260 ) -> None: 

1261 """ 

1262 Removes a specified attribute from an entity. 

1263 

1264 Args: 

1265 entity_id: Id of the entity. 

1266 attr_name: Name of the attribute to be retrieved. 

1267 entity_type: Entity type, to avoid ambiguity in case there are 

1268 several entities with the same entity id. 

1269 Raises: 

1270 Error 

1271 

1272 """ 

1273 url = urljoin( 

1274 self.base_url, f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}" 

1275 ) 

1276 headers = self.headers.copy() 

1277 params = {} 

1278 if entity_type: 

1279 params.update({"type": entity_type}) 

1280 try: 

1281 res = self.delete(url=url, headers=headers) 

1282 if res.ok: 

1283 self.logger.info( 

1284 "Attribute '%s' of '%s' " "successfully deleted!", 

1285 attr_name, 

1286 entity_id, 

1287 ) 

1288 else: 

1289 res.raise_for_status() 

1290 except requests.RequestException as err: 

1291 msg = f"Could not delete attribute '{attr_name}' of entity '{entity_id}'" 

1292 self.log_error(err=err, msg=msg) 

1293 raise 

1294 

1295 # Attribute value operations 

1296 def get_attribute_value( 

1297 self, entity_id: str, attr_name: str, entity_type: str = None 

1298 ) -> Any: 

1299 """ 

1300 This operation returns the value property with the value of the 

1301 attribute. 

1302 

1303 Args: 

1304 entity_id: Id of the entity. Example: Bcn_Welt 

1305 attr_name: Name of the attribute to be retrieved. 

1306 Example: temperature. 

1307 entity_type: Entity type, to avoid ambiguity in case there are 

1308 several entities with the same entity id. 

1309 

1310 Returns: 

1311 

1312 """ 

1313 url = urljoin( 

1314 self.base_url, 

1315 f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value", 

1316 ) 

1317 headers = self.headers.copy() 

1318 params = {} 

1319 if entity_type: 

1320 params.update({"type": entity_type}) 

1321 try: 

1322 res = self.get(url=url, params=params, headers=headers) 

1323 if res.ok: 

1324 self.logger.debug("Received: %s", res.json()) 

1325 return res.json() 

1326 res.raise_for_status() 

1327 except requests.RequestException as err: 

1328 msg = ( 

1329 f"Could not load value of attribute '{attr_name}' from " 

1330 f"entity'{entity_id}' " 

1331 ) 

1332 self.log_error(err=err, msg=msg) 

1333 raise 

1334 

1335 def update_attribute_value( 

1336 self, 

1337 *, 

1338 entity_id: str, 

1339 attr_name: str, 

1340 value: Any, 

1341 entity_type: str = None, 

1342 forcedUpdate: bool = False, 

1343 ): 

1344 """ 

1345 Updates the value of a specified attribute of an entity 

1346 

1347 Args: 

1348 value: update value 

1349 entity_id: Id of the entity. Example: Bcn_Welt 

1350 attr_name: Name of the attribute to be retrieved. 

1351 Example: temperature. 

1352 entity_type: Entity type, to avoid ambiguity in case there are 

1353 several entities with the same entity id. 

1354 forcedUpdate: Update operation have to trigger any matching 

1355 subscription, no matter if there is an actual attribute 

1356 update or no instead of the default behavior, which is to 

1357 updated only if attribute is effectively updated. 

1358 Returns: 

1359 

1360 """ 

1361 url = urljoin( 

1362 self.base_url, 

1363 f"{self._url_version}/entities/{entity_id}/attrs/{attr_name}/value", 

1364 ) 

1365 headers = self.headers.copy() 

1366 params = {} 

1367 if entity_type: 

1368 params.update({"type": entity_type}) 

1369 options = [] 

1370 if forcedUpdate: 

1371 options.append("forcedUpdate") 

1372 if options: 

1373 params.update({"options": ",".join(options)}) 

1374 try: 

1375 if not isinstance(value, (dict, list)): 

1376 headers.update({"Content-Type": "text/plain"}) 

1377 if isinstance(value, str): 

1378 value = f"{value}" 

1379 res = self.put(url=url, headers=headers, json=value, params=params) 

1380 else: 

1381 res = self.put(url=url, headers=headers, json=value, params=params) 

1382 if res.ok: 

1383 self.logger.info( 

1384 "Attribute '%s' of '%s' " "successfully updated!", 

1385 attr_name, 

1386 entity_id, 

1387 ) 

1388 else: 

1389 res.raise_for_status() 

1390 except requests.RequestException as err: 

1391 msg = ( 

1392 f"Could not update value of attribute '{attr_name}' from " 

1393 f"entity '{entity_id}' " 

1394 ) 

1395 self.log_error(err=err, msg=msg) 

1396 raise 

1397 

1398 # Types Operations 

1399 def get_entity_types( 

1400 self, *, limit: int = None, offset: int = None, options: str = None 

1401 ) -> List[Dict[str, Any]]: 

1402 """ 

1403 

1404 Args: 

1405 limit: Limit the number of types to be retrieved. 

1406 offset: Skip a number of records. 

1407 options: Options dictionary. Allowed: count, values 

1408 

1409 Returns: 

1410 

1411 """ 

1412 url = urljoin(self.base_url, f"{self._url_version}/types") 

1413 headers = self.headers.copy() 

1414 params = {} 

1415 if limit: 

1416 params.update({"limit": limit}) 

1417 if offset: 

1418 params.update({"offset": offset}) 

1419 if options: 

1420 params.update({"options": options}) 

1421 try: 

1422 res = self.get(url=url, params=params, headers=headers) 

1423 if res.ok: 

1424 self.logger.debug("Received: %s", res.json()) 

1425 return res.json() 

1426 res.raise_for_status() 

1427 except requests.RequestException as err: 

1428 msg = "Could not load entity types!" 

1429 self.log_error(err=err, msg=msg) 

1430 raise 

1431 

1432 def get_entity_type(self, entity_type: str) -> Dict[str, Any]: 

1433 """ 

1434 

1435 Args: 

1436 entity_type: Entity Type. Example: Room 

1437 

1438 Returns: 

1439 

1440 """ 

1441 url = urljoin(self.base_url, f"{self._url_version}/types/{entity_type}") 

1442 headers = self.headers.copy() 

1443 params = {} 

1444 try: 

1445 res = self.get(url=url, params=params, headers=headers) 

1446 if res.ok: 

1447 self.logger.debug("Received: %s", res.json()) 

1448 return res.json() 

1449 res.raise_for_status() 

1450 except requests.RequestException as err: 

1451 msg = f"Could not load entities of type" f"'{entity_type}' " 

1452 self.log_error(err=err, msg=msg) 

1453 raise 

1454 

1455 # SUBSCRIPTION API ENDPOINTS 

1456 def get_subscription_list(self, limit: PositiveInt = inf) -> List[Subscription]: 

1457 """ 

1458 Returns a list of all the subscriptions present in the system. 

1459 Args: 

1460 limit: Limit the number of subscriptions to be retrieved 

1461 Returns: 

1462 list of subscriptions 

1463 """ 

1464 url = urljoin(self.base_url, f"{self._url_version}/subscriptions/") 

1465 headers = self.headers.copy() 

1466 params = {} 

1467 

1468 # We always use the 'count' option to check weather pagination is 

1469 # required 

1470 params.update({"options": "count"}) 

1471 try: 

1472 items = self.__pagination( 

1473 limit=limit, url=url, params=params, headers=headers 

1474 ) 

1475 adapter = TypeAdapter(List[Subscription]) 

1476 return adapter.validate_python(items) 

1477 except requests.RequestException as err: 

1478 msg = "Could not load subscriptions!" 

1479 self.log_error(err=err, msg=msg) 

1480 raise 

1481 

1482 def post_subscription( 

1483 self, 

1484 subscription: Subscription, 

1485 update: bool = False, 

1486 skip_initial_notification: bool = False, 

1487 ) -> str: 

1488 """ 

1489 Creates a new subscription. The subscription is represented by a 

1490 Subscription object defined in filip.cb.models. 

1491 

1492 If the subscription already exists, the adding is prevented and the id 

1493 of the existing subscription is returned. 

1494 

1495 A subscription is deemed as already existing if there exists a 

1496 subscription with the exact same subject and notification fields. All 

1497 optional fields are not considered. 

1498 

1499 Args: 

1500 subscription: Subscription 

1501 update: True - If the subscription already exists, update it 

1502 False- If the subscription already exists, throw warning 

1503 skip_initial_notification: True - Initial Notifications will be 

1504 sent to recipient containing the whole data. This is 

1505 deprecated and removed from version 3.0 of the context broker. 

1506 False - skip the initial notification 

1507 Returns: 

1508 str: Id of the (created) subscription 

1509 

1510 """ 

1511 existing_subscriptions = self.get_subscription_list() 

1512 

1513 sub_dict = subscription.model_dump(include={"subject", "notification"}) 

1514 for ex_sub in existing_subscriptions: 

1515 if self._subscription_dicts_are_equal( 

1516 sub_dict, ex_sub.model_dump(include={"subject", "notification"}) 

1517 ): 

1518 self.logger.info("Subscription already exists") 

1519 if update: 

1520 self.logger.info("Updated subscription") 

1521 subscription.id = ex_sub.id 

1522 self.update_subscription(subscription) 

1523 else: 

1524 warnings.warn( 

1525 f"Subscription existed already with the id" f" {ex_sub.id}" 

1526 ) 

1527 return ex_sub.id 

1528 

1529 params = {} 

1530 if skip_initial_notification: 

1531 version = self.get_version()["orion"]["version"] 

1532 if parse_version(version) <= parse_version("3.1"): 

1533 params.update({"options": "skipInitialNotification"}) 

1534 else: 

1535 pass 

1536 warnings.warn( 

1537 f"Skip initial notifications is a deprecated " 

1538 f"feature of older versions <=3.1 of the context " 

1539 f"broker. The Context Broker that you requesting has " 

1540 f"version: {version}. For newer versions we " 

1541 f"automatically skip this option. Consider " 

1542 f"refactoring and updating your services", 

1543 DeprecationWarning, 

1544 ) 

1545 

1546 url = urljoin(self.base_url, "v2/subscriptions") 

1547 headers = self.headers.copy() 

1548 headers.update({"Content-Type": "application/json"}) 

1549 try: 

1550 res = self.post( 

1551 url=url, 

1552 headers=headers, 

1553 data=subscription.model_dump_json(exclude={"id"}, exclude_none=True), 

1554 params=params, 

1555 ) 

1556 if res.ok: 

1557 self.logger.info("Subscription successfully created!") 

1558 return res.headers["Location"].split("/")[-1] 

1559 res.raise_for_status() 

1560 except requests.RequestException as err: 

1561 msg = "Could not send subscription!" 

1562 self.log_error(err=err, msg=msg) 

1563 raise 

1564 

1565 def get_subscription(self, subscription_id: str) -> Subscription: 

1566 """ 

1567 Retrieves a subscription from 

1568 Args: 

1569 subscription_id: id of the subscription 

1570 

1571 Returns: 

1572 

1573 """ 

1574 url = urljoin( 

1575 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}" 

1576 ) 

1577 headers = self.headers.copy() 

1578 try: 

1579 res = self.get(url=url, headers=headers) 

1580 if res.ok: 

1581 self.logger.debug("Received: %s", res.json()) 

1582 return Subscription(**res.json()) 

1583 res.raise_for_status() 

1584 except requests.RequestException as err: 

1585 msg = f"Could not load subscription {subscription_id}" 

1586 self.log_error(err=err, msg=msg) 

1587 raise 

1588 

1589 def update_subscription( 

1590 self, subscription: Subscription, skip_initial_notification: bool = False 

1591 ): 

1592 """ 

1593 Only the fields included in the request are updated in the subscription. 

1594 

1595 Args: 

1596 subscription: Subscription to update 

1597 skip_initial_notification: True - Initial Notifications will be 

1598 sent to recipient containing the whole data. This is 

1599 deprecated and removed from version 3.0 of the context broker. 

1600 False - skip the initial notification 

1601 

1602 Returns: 

1603 None 

1604 """ 

1605 params = {} 

1606 if skip_initial_notification: 

1607 version = self.get_version()["orion"]["version"] 

1608 if parse_version(version) <= parse_version("3.1"): 

1609 params.update({"options": "skipInitialNotification"}) 

1610 else: 

1611 pass 

1612 warnings.warn( 

1613 f"Skip initial notifications is a deprecated " 

1614 f"feature of older versions <3.1 of the context " 

1615 f"broker. The Context Broker that you requesting has " 

1616 f"version: {version}. For newer versions we " 

1617 f"automatically skip this option. Consider " 

1618 f"refactoring and updating your services", 

1619 DeprecationWarning, 

1620 ) 

1621 

1622 url = urljoin( 

1623 self.base_url, f"{self._url_version}/subscriptions/{subscription.id}" 

1624 ) 

1625 headers = self.headers.copy() 

1626 headers.update({"Content-Type": "application/json"}) 

1627 try: 

1628 res = self.patch( 

1629 url=url, 

1630 headers=headers, 

1631 data=subscription.model_dump_json(exclude={"id"}, exclude_none=True), 

1632 ) 

1633 if res.ok: 

1634 self.logger.info("Subscription successfully updated!") 

1635 else: 

1636 res.raise_for_status() 

1637 except requests.RequestException as err: 

1638 msg = f"Could not update subscription {subscription.id}" 

1639 self.log_error(err=err, msg=msg) 

1640 raise 

1641 

1642 def delete_subscription(self, subscription_id: str) -> None: 

1643 """ 

1644 Deletes a subscription from a Context Broker 

1645 Args: 

1646 subscription_id: id of the subscription 

1647 """ 

1648 url = urljoin( 

1649 self.base_url, f"{self._url_version}/subscriptions/{subscription_id}" 

1650 ) 

1651 headers = self.headers.copy() 

1652 try: 

1653 res = self.delete(url=url, headers=headers) 

1654 if res.ok: 

1655 self.logger.info( 

1656 f"Subscription '{subscription_id}' " f"successfully deleted!" 

1657 ) 

1658 else: 

1659 res.raise_for_status() 

1660 except requests.RequestException as err: 

1661 msg = f"Could not delete subscription {subscription_id}" 

1662 self.log_error(err=err, msg=msg) 

1663 raise 

1664 

1665 # Registration API 

1666 def get_registration_list(self, *, limit: PositiveInt = None) -> List[Registration]: 

1667 """ 

1668 Lists all the context provider registrations present in the system. 

1669 

1670 Args: 

1671 limit: Limit the number of registrations to be retrieved 

1672 Returns: 

1673 

1674 """ 

1675 url = urljoin(self.base_url, f"{self._url_version}/registrations/") 

1676 headers = self.headers.copy() 

1677 params = {} 

1678 

1679 # We always use the 'count' option to check weather pagination is 

1680 # required 

1681 params.update({"options": "count"}) 

1682 try: 

1683 items = self.__pagination( 

1684 limit=limit, url=url, params=params, headers=headers 

1685 ) 

1686 adapter = TypeAdapter(List[Registration]) 

1687 return adapter.validate_python(items) 

1688 except requests.RequestException as err: 

1689 msg = "Could not load registrations!" 

1690 self.log_error(err=err, msg=msg) 

1691 raise 

1692 

1693 def post_registration(self, registration: Registration): 

1694 """ 

1695 Creates a new context provider registration. This is typically used 

1696 for binding context sources as providers of certain data. The 

1697 registration is represented by cb.models.Registration 

1698 

1699 Args: 

1700 registration (Registration): 

1701 

1702 Returns: 

1703 

1704 """ 

1705 url = urljoin(self.base_url, f"{self._url_version}/registrations") 

1706 headers = self.headers.copy() 

1707 headers.update({"Content-Type": "application/json"}) 

1708 try: 

1709 res = self.post( 

1710 url=url, 

1711 headers=headers, 

1712 data=registration.model_dump_json(exclude={"id"}, exclude_none=True), 

1713 ) 

1714 if res.ok: 

1715 self.logger.info("Registration successfully created!") 

1716 return res.headers["Location"].split("/")[-1] 

1717 res.raise_for_status() 

1718 except requests.RequestException as err: 

1719 msg = f"Could not send registration {registration.id}!" 

1720 self.log_error(err=err, msg=msg) 

1721 raise 

1722 

1723 def get_registration(self, registration_id: str) -> Registration: 

1724 """ 

1725 Retrieves a registration from context broker by id 

1726 

1727 Args: 

1728 registration_id: id of the registration 

1729 

1730 Returns: 

1731 Registration 

1732 """ 

1733 url = urljoin( 

1734 self.base_url, f"{self._url_version}/registrations/{registration_id}" 

1735 ) 

1736 headers = self.headers.copy() 

1737 try: 

1738 res = self.get(url=url, headers=headers) 

1739 if res.ok: 

1740 self.logger.debug("Received: %s", res.json()) 

1741 return Registration(**res.json()) 

1742 res.raise_for_status() 

1743 except requests.RequestException as err: 

1744 msg = f"Could not load registration {registration_id} !" 

1745 self.log_error(err=err, msg=msg) 

1746 raise 

1747 

1748 def add_valid_relationships( 

1749 self, entities: List[ContextEntity] 

1750 ) -> List[ContextEntity]: 

1751 """ 

1752 Validate all attributes in the given entities. If the attribute value points to 

1753 an existing entity, it is assumed that this attribute is a relationship, and it 

1754 will be assigned with the attribute type "relationship" 

1755 

1756 Args: 

1757 entities: list of entities that need to be validated. 

1758 

1759 Returns: 

1760 updated entities 

1761 """ 

1762 updated_entities = [] 

1763 for entity in entities[:]: 

1764 for attr_name, attr_value in entity.model_dump( 

1765 exclude={"id", "type"} 

1766 ).items(): 

1767 if isinstance(attr_value, dict): 

1768 if self.validate_relationship(attr_value): 

1769 entity.update_attribute( 

1770 { 

1771 attr_name: ContextAttribute( 

1772 **{ 

1773 "type": DataType.RELATIONSHIP, 

1774 "value": attr_value.get("value"), 

1775 } 

1776 ) 

1777 } 

1778 ) 

1779 updated_entities.append(entity) 

1780 return updated_entities 

1781 

1782 def remove_invalid_relationships( 

1783 self, entities: List[ContextEntity], hard_remove: bool = True 

1784 ) -> List[ContextEntity]: 

1785 """ 

1786 Removes invalid relationships from the entities. An invalid relationship 

1787 is a relationship that has no destination entity. 

1788 

1789 Args: 

1790 entities: list of entities that need to be validated. 

1791 hard_remove: If True, invalid relationships will be deleted. 

1792 If False, invalid relationships will be changed to Text 

1793 attributes. 

1794 

1795 Returns: 

1796 updated entities 

1797 """ 

1798 updated_entities = [] 

1799 for entity in entities[:]: 

1800 for relationship in entity.get_relationships(): 

1801 if not self.validate_relationship(relationship): 

1802 if hard_remove: 

1803 entity.delete_attributes(attrs=[relationship]) 

1804 else: 

1805 # change the attribute type to "Text" 

1806 entity.update_attribute( 

1807 attrs=[ 

1808 NamedContextAttribute( 

1809 name=relationship.name, 

1810 type=DataType.TEXT, 

1811 value=relationship.value, 

1812 ) 

1813 ] 

1814 ) 

1815 updated_entities.append(entity) 

1816 return updated_entities 

1817 

1818 def validate_relationship( 

1819 self, relationship: Union[NamedContextAttribute, ContextAttribute, Dict] 

1820 ) -> bool: 

1821 """ 

1822 Validates a relationship. A relationship is valid if it points to an existing 

1823 entity. Otherwise, it is considered invalid 

1824 

1825 Args: 

1826 relationship: relationship to validate 

1827 Returns 

1828 True if the relationship is valid, False otherwise 

1829 """ 

1830 if isinstance(relationship, NamedContextAttribute) or isinstance( 

1831 relationship, ContextAttribute 

1832 ): 

1833 destination_id = relationship.value 

1834 elif isinstance(relationship, dict): 

1835 _sentinel = object() 

1836 destination_id = relationship.get("value", _sentinel) 

1837 if destination_id is _sentinel: 

1838 raise ValueError( 

1839 "Invalid relationship dictionary format\n" 

1840 "Expected format: {" 

1841 f'"type": "{DataType.RELATIONSHIP.value}", ' 

1842 '"value" "entity_id"}' 

1843 ) 

1844 else: 

1845 raise ValueError("Invalid relationship type.") 

1846 try: 

1847 destination_entity = self.get_entity(entity_id=destination_id) 

1848 return destination_entity.id == destination_id 

1849 except requests.RequestException as err: 

1850 if err.response.status_code == 404: 

1851 return False 

1852 

1853 def update_registration(self, registration: Registration): 

1854 """ 

1855 Only the fields included in the request are updated in the registration. 

1856 

1857 Args: 

1858 registration: Registration to update 

1859 Returns: 

1860 

1861 """ 

1862 url = urljoin( 

1863 self.base_url, f"{self._url_version}/registrations/{registration.id}" 

1864 ) 

1865 headers = self.headers.copy() 

1866 headers.update({"Content-Type": "application/json"}) 

1867 try: 

1868 res = self.patch( 

1869 url=url, 

1870 headers=headers, 

1871 data=registration.model_dump_json(exclude={"id"}, exclude_none=True), 

1872 ) 

1873 if res.ok: 

1874 self.logger.info("Registration successfully updated!") 

1875 else: 

1876 res.raise_for_status() 

1877 except requests.RequestException as err: 

1878 msg = f"Could not update registration {registration.id} !" 

1879 self.log_error(err=err, msg=msg) 

1880 raise 

1881 

1882 def delete_registration(self, registration_id: str) -> None: 

1883 """ 

1884 Deletes a subscription from a Context Broker 

1885 Args: 

1886 registration_id: id of the subscription 

1887 """ 

1888 url = urljoin( 

1889 self.base_url, f"{self._url_version}/registrations/{registration_id}" 

1890 ) 

1891 headers = self.headers.copy() 

1892 try: 

1893 res = self.delete(url=url, headers=headers) 

1894 if res.ok: 

1895 self.logger.info( 

1896 "Registration '%s' " "successfully deleted!", registration_id 

1897 ) 

1898 res.raise_for_status() 

1899 except requests.RequestException as err: 

1900 msg = f"Could not delete registration {registration_id} !" 

1901 self.log_error(err=err, msg=msg) 

1902 raise 

1903 

1904 # Batch operation API 

1905 def update( 

1906 self, 

1907 *, 

1908 entities: List[Union[ContextEntity, ContextEntityKeyValues]], 

1909 action_type: Union[ActionType, str], 

1910 update_format: str = None, 

1911 forcedUpdate: bool = False, 

1912 override_metadata: bool = False, 

1913 ) -> None: 

1914 """ 

1915 This operation allows to create, update and/or delete several entities 

1916 in a single batch operation. 

1917 

1918 This operation is split in as many individual operations as entities 

1919 in the entities vector, so the actionType is executed for each one of 

1920 them. Depending on the actionType, a mapping with regular non-batch 

1921 operations can be done: 

1922 

1923 append: maps to POST /v2/entities (if the entity does not already exist) 

1924 or POST /v2/entities/<id>/attrs (if the entity already exists). 

1925 

1926 appendStrict: maps to POST /v2/entities (if the entity does not 

1927 already exist) or POST /v2/entities/<id>/attrs?options=append (if the 

1928 entity already exists). 

1929 

1930 update: maps to PATCH /v2/entities/<id>/attrs. 

1931 

1932 delete: maps to DELETE /v2/entities/<id>/attrs/<attrName> on every 

1933 attribute included in the entity or to DELETE /v2/entities/<id> if 

1934 no attribute were included in the entity. 

1935 

1936 replace: maps to PUT /v2/entities/<id>/attrs. 

1937 

1938 Args: 

1939 entities: "an array of entities, each entity specified using the " 

1940 "JSON entity representation format " 

1941 action_type (Update): "actionType, to specify the kind of update 

1942 action to do: either append, appendStrict, update, delete, 

1943 or replace. " 

1944 update_format (str): Optional 'keyValues' 

1945 forcedUpdate: Update operation have to trigger any matching 

1946 subscription, no matter if there is an actual attribute 

1947 update or no instead of the default behavior, which is to 

1948 updated only if attribute is effectively updated. 

1949 override_metadata: 

1950 Bool, replace the existing metadata with the one provided in 

1951 the request 

1952 Returns: 

1953 

1954 """ 

1955 

1956 url = urljoin(self.base_url, f"{self._url_version}/op/update") 

1957 headers = self.headers.copy() 

1958 headers.update({"Content-Type": "application/json"}) 

1959 params = {} 

1960 options = [] 

1961 if override_metadata: 

1962 options.append("overrideMetadata") 

1963 if forcedUpdate: 

1964 options.append("forcedUpdate") 

1965 if update_format: 

1966 assert ( 

1967 update_format == AttrsFormat.KEY_VALUES.value 

1968 ), "Only 'keyValues' is allowed as update format" 

1969 options.append("keyValues") 

1970 if options: 

1971 params.update({"options": ",".join(options)}) 

1972 update = Update(actionType=action_type, entities=entities) 

1973 try: 

1974 res = self.post( 

1975 url=url, 

1976 headers=headers, 

1977 params=params, 

1978 json=update.model_dump(by_alias=True), 

1979 ) 

1980 if res.ok: 

1981 self.logger.info("Update operation '%s' succeeded!", action_type) 

1982 else: 

1983 res.raise_for_status() 

1984 except requests.RequestException as err: 

1985 msg = f"Update operation '{action_type}' failed!" 

1986 self.log_error(err=err, msg=msg) 

1987 raise 

1988 

1989 def query( 

1990 self, 

1991 *, 

1992 query: Query, 

1993 limit: PositiveInt = None, 

1994 order_by: str = None, 

1995 response_format: Union[AttrsFormat, str] = AttrsFormat.NORMALIZED, 

1996 ) -> List[Any]: 

1997 """ 

1998 Generate api query 

1999 Args: 

2000 query (Query): 

2001 limit (PositiveInt): 

2002 order_by (str): 

2003 response_format (AttrsFormat, str): 

2004 Returns: 

2005 The response payload is an Array containing one object per matching 

2006 entity, or an empty array [] if no entities are found. The entities 

2007 follow the JSON entity representation format (described in the 

2008 section "JSON Entity Representation"). 

2009 """ 

2010 url = urljoin(self.base_url, f"{self._url_version}/op/query") 

2011 headers = self.headers.copy() 

2012 headers.update({"Content-Type": "application/json"}) 

2013 params = {"options": "count"} 

2014 

2015 if response_format: 

2016 if response_format not in list(AttrsFormat): 

2017 raise ValueError(f"Value must be in {list(AttrsFormat)}") 

2018 params["options"] = ",".join([response_format, "count"]) 

2019 try: 

2020 items = self.__pagination( 

2021 method=PaginationMethod.POST, 

2022 url=url, 

2023 headers=headers, 

2024 params=params, 

2025 data=query.model_dump_json(exclude_none=True), 

2026 limit=limit, 

2027 ) 

2028 if response_format == AttrsFormat.NORMALIZED: 

2029 adapter = TypeAdapter(List[ContextEntity]) 

2030 return adapter.validate_python(items) 

2031 if response_format == AttrsFormat.KEY_VALUES: 

2032 adapter = TypeAdapter(List[ContextEntityKeyValues]) 

2033 return adapter.validate_python(items) 

2034 return items 

2035 except requests.RequestException as err: 

2036 msg = "Query operation failed!" 

2037 self.log_error(err=err, msg=msg) 

2038 raise 

2039 

2040 def notify(self, message: Message) -> None: 

2041 """ 

2042 This operation is intended to consume a notification payload so that 

2043 all the entity data included by such notification is persisted, 

2044 overwriting if necessary. This operation is useful when one NGSIv2 

2045 endpoint is subscribed to another NGSIv2 endpoint (federation 

2046 scenarios). The request payload must be an NGSIv2 notification 

2047 payload. The behaviour must be exactly the same as 'update' 

2048 with 'action_type' equal to append. 

2049 

2050 Args: 

2051 message: Notification message 

2052 

2053 Returns: 

2054 None 

2055 """ 

2056 url = urljoin(self.base_url, "v2/op/notify") 

2057 headers = self.headers.copy() 

2058 headers.update({"Content-Type": "application/json"}) 

2059 params = {} 

2060 try: 

2061 res = self.post( 

2062 url=url, 

2063 headers=headers, 

2064 params=params, 

2065 data=message.model_dump_json(by_alias=True), 

2066 ) 

2067 if res.ok: 

2068 self.logger.info("Notification message sent!") 

2069 else: 

2070 res.raise_for_status() 

2071 except requests.RequestException as err: 

2072 msg = ( 

2073 f"Sending notifcation message failed! \n " 

2074 f"{message.model_dump_json(indent=2)}" 

2075 ) 

2076 self.log_error(err=err, msg=msg) 

2077 raise 

2078 

2079 def post_command( 

2080 self, 

2081 *, 

2082 entity_id: str, 

2083 command: Union[Command, NamedCommand, Dict], 

2084 entity_type: str = None, 

2085 command_name: str = None, 

2086 ) -> None: 

2087 """ 

2088 Post a command to a context entity this corresponds to 'PATCH' of the 

2089 specified command attribute. 

2090 

2091 Args: 

2092 entity_id: Entity identifier 

2093 command: Command 

2094 entity_type: Entity type 

2095 command_name: Name of the command in the entity 

2096 

2097 Returns: 

2098 None 

2099 """ 

2100 if command_name: 

2101 assert isinstance(command, (Command, dict)) 

2102 if isinstance(command, dict): 

2103 command = Command(**command) 

2104 command = {command_name: command.model_dump()} 

2105 else: 

2106 assert isinstance(command, (NamedCommand, dict)) 

2107 if isinstance(command, dict): 

2108 command = NamedCommand(**command) 

2109 

2110 self.update_existing_entity_attributes( 

2111 entity_id=entity_id, entity_type=entity_type, attrs=[command] 

2112 ) 

2113 

2114 def does_entity_exist(self, entity_id: str, entity_type: str) -> bool: 

2115 """ 

2116 Test if an entity with given id and type is present in the CB 

2117 

2118 Args: 

2119 entity_id: Entity id 

2120 entity_type: Entity type 

2121 

2122 Returns: 

2123 bool; True if entity exists 

2124 

2125 Raises: 

2126 RequestException, if any error occurs (e.g: No Connection), 

2127 except that the entity is not found 

2128 """ 

2129 url = urljoin(self.base_url, f"v2/entities/{entity_id}") 

2130 headers = self.headers.copy() 

2131 params = {"type": entity_type} 

2132 

2133 try: 

2134 res = self.get(url=url, params=params, headers=headers) 

2135 if res.ok: 

2136 return True 

2137 res.raise_for_status() 

2138 except requests.RequestException as err: 

2139 if err.response is None or not err.response.status_code == 404: 

2140 self.log_error(err=err, msg="Checking entity existence failed!") 

2141 raise 

2142 return False 

2143 

2144 def patch_entity( 

2145 self, 

2146 entity: ContextEntity, 

2147 old_entity: Optional[ContextEntity] = None, 

2148 override_attr_metadata: bool = True, 

2149 ) -> None: 

2150 """ 

2151 Takes a given entity and updates the state in the CB to match it. 

2152 It is an extended equivalent to the HTTP method PATCH, which applies 

2153 partial modifications to a resource. 

2154 

2155 Args: 

2156 entity: Entity to update 

2157 old_entity: OPTIONAL, if given only the differences between the 

2158 old_entity and entity are updated in the CB. 

2159 Other changes made to the entity in CB, can be kept. 

2160 If type or id was changed, the old_entity will be 

2161 deleted. 

2162 override_attr_metadata: 

2163 Whether to override or append the attributes metadata. 

2164 `True` for overwrite or `False` for update/append 

2165 

2166 Returns: 

2167 None 

2168 """ 

2169 

2170 new_entity = entity 

2171 

2172 if old_entity is None: 

2173 # If no old entity_was provided we use the current state to compare 

2174 # the entity to 

2175 if self.does_entity_exist( 

2176 entity_id=new_entity.id, entity_type=new_entity.type 

2177 ): 

2178 old_entity = self.get_entity( 

2179 entity_id=new_entity.id, entity_type=new_entity.type 

2180 ) 

2181 else: 

2182 # the entity is new, post and finish 

2183 self.post_entity(new_entity, update=False) 

2184 return 

2185 

2186 else: 

2187 # An old_entity was provided 

2188 # check if the old_entity (still) exists else recall methode 

2189 # and discard old_entity 

2190 if not self.does_entity_exist( 

2191 entity_id=old_entity.id, entity_type=old_entity.type 

2192 ): 

2193 self.patch_entity( 

2194 new_entity, override_attr_metadata=override_attr_metadata 

2195 ) 

2196 return 

2197 

2198 # if type or id was changed, the old_entity needs to be deleted 

2199 # and the new_entity created 

2200 # In this case we will lose the current state of the entity 

2201 if old_entity.id != new_entity.id or old_entity.type != new_entity.type: 

2202 self.delete_entity(entity_id=old_entity.id, entity_type=old_entity.type) 

2203 

2204 if not self.does_entity_exist( 

2205 entity_id=new_entity.id, entity_type=new_entity.type 

2206 ): 

2207 self.post_entity(entity=new_entity, update=False) 

2208 return 

2209 

2210 # At this point we know that we need to patch only the attributes of 

2211 # the entity 

2212 # Check the differences between the attributes of old and new entity 

2213 # Delete the removed attributes, create the new ones, 

2214 # and update the existing if necessary 

2215 old_attributes = old_entity.get_attributes() 

2216 new_attributes = new_entity.get_attributes() 

2217 

2218 # Manage attributes that existed before 

2219 for old_attr in old_attributes: 

2220 # commands do not exist in the ContextEntity and are only 

2221 # registrations to the corresponding device. Operations as 

2222 # delete will fail as it does not technically exist 

2223 corresponding_new_attr = None 

2224 for new_attr in new_attributes: 

2225 if new_attr.name == old_attr.name: 

2226 corresponding_new_attr = new_attr 

2227 

2228 if corresponding_new_attr is None: 

2229 # Attribute no longer exists, delete it 

2230 try: 

2231 self.delete_entity_attribute( 

2232 entity_id=new_entity.id, 

2233 entity_type=new_entity.type, 

2234 attr_name=old_attr.name, 

2235 ) 

2236 except requests.RequestException as err: 

2237 msg = ( 

2238 f"Failed to delete attribute {old_attr.name} of " 

2239 f"entity {new_entity.id}." 

2240 ) 

2241 if err.response is not None and err.response.status_code == 404: 

2242 # if the attribute is provided by a registration the 

2243 # deletion will fail 

2244 msg += ( 

2245 f" The attribute is probably provided " 

2246 f"by a registration." 

2247 ) 

2248 self.log_error(err=err, msg=msg) 

2249 else: 

2250 self.log_error(err=err, msg=msg) 

2251 raise 

2252 else: 

2253 # Check if attributed changed in any way, if yes update 

2254 # else do nothing and keep current state 

2255 if old_attr != corresponding_new_attr: 

2256 try: 

2257 self.update_entity_attribute( 

2258 entity_id=new_entity.id, 

2259 entity_type=new_entity.type, 

2260 attr=corresponding_new_attr, 

2261 override_metadata=override_attr_metadata, 

2262 ) 

2263 except requests.RequestException as err: 

2264 msg = ( 

2265 f"Failed to update attribute {old_attr.name} of " 

2266 f"entity {new_entity.id}." 

2267 ) 

2268 if err.response is not None and err.response.status_code == 404: 

2269 # if the attribute is provided by a registration the 

2270 # update will fail 

2271 msg += ( 

2272 f" The attribute is probably provided " 

2273 f"by a registration." 

2274 ) 

2275 self.log_error(err=err, msg=msg) 

2276 raise 

2277 

2278 # Create new attributes 

2279 update_entity = ContextEntity(id=entity.id, type=entity.type) 

2280 update_needed = False 

2281 for new_attr in new_attributes: 

2282 # commands do not exist in the ContextEntity and are only 

2283 # registrations to the corresponding device. Operations as 

2284 # delete will fail as it does not technically exists 

2285 attr_existed = False 

2286 for old_attr in old_attributes: 

2287 if new_attr.name == old_attr.name: 

2288 attr_existed = True 

2289 

2290 if not attr_existed: 

2291 update_needed = True 

2292 update_entity.add_attributes([new_attr]) 

2293 

2294 if update_needed: 

2295 self.update_entity(update_entity) 

2296 

2297 def _subscription_dicts_are_equal(self, first: dict, second: dict): 

2298 """ 

2299 Check if two dictionaries and all sub-dictionaries are equal. 

2300 Logs a warning if the keys are not equal, but ignores the 

2301 comparison of such keys. 

2302 

2303 Args: 

2304 first dict: Dictionary of first subscription 

2305 second dict: Dictionary of second subscription 

2306 

2307 Returns: 

2308 True if equal, else False 

2309 """ 

2310 

2311 def _value_is_not_none(value): 

2312 """ 

2313 Recursive function to check if a value equals none. 

2314 If the value is a dict and any value of the dict is not none, 

2315 the value is not none. 

2316 If the value is a list and any item is not none, the value is not none. 

2317 If it's neither dict nore list, bool is used. 

2318 """ 

2319 if isinstance(value, dict): 

2320 return any([_value_is_not_none(value=_v) for _v in value.values()]) 

2321 if isinstance(value, list): 

2322 return any([_value_is_not_none(value=_v) for _v in value]) 

2323 else: 

2324 return bool(value) 

2325 

2326 if first.keys() != second.keys(): 

2327 warnings.warn( 

2328 "Subscriptions contain a different set of fields. " 

2329 "Only comparing to new fields of the new one." 

2330 ) 

2331 for k, v in first.items(): 

2332 ex_value = second.get(k, None) 

2333 if isinstance(v, dict) and isinstance(ex_value, dict): 

2334 equal = self._subscription_dicts_are_equal(v, ex_value) 

2335 if equal: 

2336 continue 

2337 else: 

2338 return False 

2339 if v != ex_value: 

2340 self.logger.debug(f"Not equal fields for key {k}: ({v}, {ex_value})") 

2341 if ( 

2342 not _value_is_not_none(v) 

2343 and not _value_is_not_none(ex_value) 

2344 or k == "timesSent" 

2345 ): 

2346 continue 

2347 return False 

2348 return True 

2349 

2350 

2351# 

2352# 

2353# def check_duplicate_subscription(self, subscription_body, limit: int = 20): 

2354# """ 

2355# Function compares the subject of the subscription body, on whether a subscription 

2356# already exists for a device / entity. 

2357# :param subscription_body: the body of the new subscripton 

2358# :param limit: pagination parameter, to set the number of 

2359# subscriptions bodies the get request should grab 

2360# :return: exists, boolean -> True, if such a subscription allready 

2361# exists 

2362# """ 

2363# exists = False 

2364# subscription_subject = json.loads(subscription_body)["subject"] 

2365# # Exact keys depend on subscription body 

2366# try: 

2367# subscription_url = json.loads(subscription_body)[ 

2368# "notification"]["httpCustom"]["url"] 

2369# except KeyError: 

2370# subscription_url = json.loads(subscription_body)[ 

2371# "notification"]["http"]["url"] 

2372# 

2373# # If the number of subscriptions is larger then the limit, 

2374# paginations methods have to be used 

2375# url = self.url + '/v2/subscriptions?limit=' + str(limit) + 

2376# '&options=count' 

2377# response = self.session.get(url, headers=self.get_header()) 

2378# 

2379# sub_count = float(response.headers["Fiware-Total-Count"]) 

2380# response = json.loads(response.text) 

2381# if sub_count >= limit: 

2382# response = self.get_pagination(url=url, headers=self.get_header(), 

2383# limit=limit, count=sub_count) 

2384# response = json.loads(response) 

2385# 

2386# for existing_subscription in response: 

2387# # check whether the exact same subscriptions already exists 

2388# if existing_subscription["subject"] == subscription_subject: 

2389# exists = True 

2390# break 

2391# try: 

2392# existing_url = existing_subscription["notification"][ 

2393# "http"]["url"] 

2394# except KeyError: 

2395# existing_url = existing_subscription["notification"][ 

2396# "httpCustom"]["url"] 

2397# # check whether both subscriptions notify to the same path 

2398# if existing_url != subscription_url: 

2399# continue 

2400# else: 

2401# # iterate over all entities included in the subscription object 

2402# for entity in subscription_subject["entities"]: 

2403# if 'type' in entity.keys(): 

2404# subscription_type = entity['type'] 

2405# else: 

2406# subscription_type = entity['typePattern'] 

2407# if 'id' in entity.keys(): 

2408# subscription_id = entity['id'] 

2409# else: 

2410# subscription_id = entity["idPattern"] 

2411# # iterate over all entities included in the exisiting 

2412# subscriptions 

2413# for existing_entity in existing_subscription["subject"][ 

2414# "entities"]: 

2415# if "type" in entity.keys(): 

2416# type_existing = entity["type"] 

2417# else: 

2418# type_existing = entity["typePattern"] 

2419# if "id" in entity.keys(): 

2420# id_existing = entity["id"] 

2421# else: 

2422# id_existing = entity["idPattern"] 

2423# # as the ID field is non optional, it has to match 

2424# # check whether the type match 

2425# # if the type field is empty, they match all types 

2426# if (type_existing == subscription_type) or\ 

2427# ('*' in subscription_type) or \ 

2428# ('*' in type_existing)\ 

2429# or (type_existing == "") or ( 

2430# subscription_type == ""): 

2431# # check if on of the subscriptions is a pattern, 

2432# or if they both refer to the same id 

2433# # Get the attrs first, to avoid code duplication 

2434# # last thing to compare is the attributes 

2435# # Assumption -> position is the same as the 

2436# entities _list 

2437# # i == j 

2438# i = subscription_subject["entities"].index(entity) 

2439# j = existing_subscription["subject"][ 

2440# "entities"].index(existing_entity) 

2441# try: 

2442# subscription_attrs = subscription_subject[ 

2443# "condition"]["attrs"][i] 

2444# except (KeyError, IndexError): 

2445# subscription_attrs = [] 

2446# try: 

2447# existing_attrs = existing_subscription[ 

2448# "subject"]["condition"]["attrs"][j] 

2449# except (KeyError, IndexError): 

2450# existing_attrs = [] 

2451# 

2452# if (".*" in subscription_id) or ('.*' in 

2453# id_existing) or (subscription_id == id_existing): 

2454# # Attributes have to match, or the have to 

2455# be an empty array 

2456# if (subscription_attrs == existing_attrs) or 

2457# (subscription_attrs == []) or (existing_attrs == []): 

2458# exists = True 

2459# # if they do not match completely or subscribe 

2460# to all ids they have to match up to a certain position 

2461# elif ("*" in subscription_id) or ('*' in 

2462# id_existing): 

2463# regex_existing = id_existing.find('*') 

2464# regex_subscription = 

2465# subscription_id.find('*') 

2466# # slice the strings to compare 

2467# if (id_existing[:regex_existing] in 

2468# subscription_id) or (subscription_id[:regex_subscription] in id_existing) or \ 

2469# (id_existing[regex_existing:] in 

2470# subscription_id) or (subscription_id[regex_subscription:] in id_existing): 

2471# if (subscription_attrs == 

2472# existing_attrs) or (subscription_attrs == []) or (existing_attrs == []): 

2473# exists = True 

2474# else: 

2475# continue 

2476# else: 

2477# continue 

2478# else: 

2479# continue 

2480# else: 

2481# continue 

2482# else: 

2483# continue 

2484# return exists 

2485#