Coverage for filip/clients/ngsi_v2/quantumleap.py: 78%

232 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-05 11:07 +0000

1""" 

2TimeSeries Module for QuantumLeap API Client 

3""" 

4 

5import logging 

6import time 

7from math import inf 

8from collections import deque 

9from itertools import count, chain 

10from typing import Dict, List, Union, Deque, Optional 

11from urllib.parse import urljoin 

12import requests 

13from pydantic import AnyHttpUrl 

14from pydantic.type_adapter import TypeAdapter 

15from filip import settings 

16from filip.clients.base_http_client import BaseHttpClient 

17from filip.models.base import FiwareHeader 

18from filip.models.ngsi_v2.subscriptions import Message 

19from filip.models.ngsi_ld.context import MessageLD 

20from filip.models.ngsi_v2.timeseries import ( 

21 AggrPeriod, 

22 AggrMethod, 

23 AggrScope, 

24 AttributeValues, 

25 TimeSeries, 

26 TimeSeriesHeader, 

27) 

28from filip.clients.exceptions import BaseHttpClientException 

29 

30 

31logger = logging.getLogger(__name__) 

32 

33 

34class QuantumLeapClient(BaseHttpClient): 

35 """ 

36 Implements functions to use the FIWARE's QuantumLeap, which subscribes to an 

37 Orion Context Broker and stores the subscription data in a time series 

38 database (CrateDB). Further Information: 

39 https://smartsdk.github.io/ngsi-timeseries-api/#quantumleap 

40 https://app.swaggerhub.com/apis/heikkilv/quantumleap-api/ 

41 

42 Args: 

43 url: url of the quantumleap service 

44 session (Optional): 

45 fiware_header: 

46 **kwargs: 

47 """ 

48 

49 def __init__( 

50 self, 

51 url: str = None, 

52 *, 

53 session: requests.Session = None, 

54 fiware_header: FiwareHeader = None, 

55 **kwargs, 

56 ): 

57 # set service url 

58 url = url or settings.QL_URL 

59 super().__init__( 

60 url=url, session=session, fiware_header=fiware_header, **kwargs 

61 ) 

62 

63 # META API ENDPOINTS 

64 def get_version(self) -> Dict: 

65 """ 

66 Gets version of QuantumLeap-Service. 

67 

68 Returns: 

69 Dictionary with response 

70 """ 

71 url = urljoin(self.base_url, "version") 

72 try: 

73 res = self.get(url=url, headers=self.headers) 

74 if res.ok: 

75 return res.json() 

76 res.raise_for_status() 

77 except requests.exceptions.RequestException as err: 

78 self.logger.error(err) 

79 raise BaseHttpClientException( 

80 message=err.response, response=err.response 

81 ) from err 

82 

83 def get_health(self) -> Dict: 

84 """ 

85 This endpoint is intended for administrators of QuantumLeap. Using the 

86 information returned by this endpoint they can diagnose problems in the 

87 service or its dependencies. This information is also useful for cloud 

88 tools such as orchestrators and load balancers with rules based on 

89 health-checks. Due to the lack of a standardized response format, we 

90 base the implementation on the draft of 

91 https://inadarei.github.io/rfc-healthcheck/ 

92 

93 Returns: 

94 Dictionary with response 

95 """ 

96 url = urljoin(self.base_url, "health") 

97 try: 

98 res = self.get(url=url, headers=self.headers) 

99 if res.ok: 

100 return res.json() 

101 res.raise_for_status() 

102 except requests.exceptions.RequestException as err: 

103 self.logger.error(err) 

104 raise BaseHttpClientException( 

105 message=err.response.text, response=err.response 

106 ) from err 

107 

108 def post_config(self): 

109 """ 

110 (To Be Implemented) Customize your persistence configuration to 

111 better suit your needs. 

112 """ 

113 raise NotImplementedError("Endpoint to be implemented..") 

114 

115 # INPUT API ENDPOINTS 

116 def post_notification(self, notification: Union[Message, MessageLD]): 

117 """ 

118 Notify QuantumLeap the arrival of a new NGSI notification. 

119 

120 Args: 

121 notification: Notification Message Object 

122 """ 

123 url = urljoin(self.base_url, "v2/notify") 

124 headers = self.headers.copy() 

125 data = [] 

126 for entity in notification.data: 

127 data.append(entity.model_dump(exclude_none=True)) 

128 data_set = {"data": data, "subscriptionId": notification.subscriptionId} 

129 

130 try: 

131 res = self.post(url=url, headers=headers, json=data_set) 

132 if res.ok: 

133 self.logger.debug(res.text) 

134 else: 

135 res.raise_for_status() 

136 except requests.exceptions.RequestException as err: 

137 msg = ( 

138 f"Could not post notification for subscription id " 

139 f"{notification.subscriptionId}" 

140 ) 

141 self.log_error(err=err, msg=msg) 

142 raise BaseHttpClientException(message=msg, response=err.response) from err 

143 

144 def post_subscription( 

145 self, 

146 cb_url: Union[AnyHttpUrl, str], 

147 ql_url: Union[AnyHttpUrl, str], 

148 entity_type: str = None, 

149 entity_id: str = None, 

150 id_pattern: str = None, 

151 attributes: str = None, 

152 observed_attributes: str = None, 

153 notified_attributes: str = None, 

154 throttling: int = None, 

155 time_index_attribute: str = None, 

156 ): 

157 """ 

158 Subscribe QL to process Orion notifications of certain type. 

159 This endpoint simplifies the creation of the subscription in orion 

160 that will generate the notifications to be consumed by QuantumLeap in 

161 order to save historical records. If you want an advanced specification 

162 of the notifications, you can always create the subscription in orion 

163 at your will. This endpoint just aims to simplify the common use case. 

164 

165 Args: 

166 cb_url: 

167 url of the context broker 

168 ql_url: 

169 The url where Orion can reach QuantumLeap. Do not include 

170 specific paths. 

171 entity_type (String): 

172 The type of entities for which to create a 

173 subscription, so as to persist historical data of entities of 

174 this type. 

175 entity_id (String): 

176 Id of the entity to track. If specified, it 

177 takes precedence over the idPattern parameter. 

178 id_pattern (String): The pattern covering the entity ids for which 

179 to subscribe. If not specified, QL will track all entities of 

180 the specified type. 

181 attributes (String): Comma-separated list of attribute names to 

182 track. 

183 observed_attributes (String): Comma-separated list of attribute 

184 names to track. 

185 notified_attributes (String): Comma-separated list of attribute 

186 names to be used to restrict the data of which QL will keep a 

187 history. 

188 throttling (int): Minimal period of time in seconds which must 

189 elapse between two consecutive notifications. 

190 time_index_attribute (String): The name of a custom attribute to be 

191 used as a 

192 time index. 

193 """ 

194 raise DeprecationWarning( 

195 "Subscription endpoint of Quantumleap API is " 

196 "deprecated, use the ORION subscription endpoint " 

197 "instead" 

198 ) 

199 

200 def delete_entity(self, entity_id: str, entity_type: Optional[str] = None) -> str: 

201 """ 

202 Given an entity (with type and id), delete all its historical records. 

203 

204 Args: 

205 entity_id (String): Entity id is required. 

206 entity_type (Optional[String]): Entity type if entity_id alone 

207 can not uniquely define the entity. 

208 

209 Raises: 

210 RequestException, if entity was not found 

211 Exception, if deleting was not successful 

212 

213 Returns: 

214 The entity_id of entity that is deleted. 

215 """ 

216 url = urljoin(self.base_url, f"v2/entities/{entity_id}") 

217 headers = self.headers.copy() 

218 if entity_type is not None: 

219 params = {"type": entity_type} 

220 else: 

221 params = {} 

222 

223 # The deletion does not always resolves in a success even if an ok is 

224 # returned. 

225 # Try to delete multiple times with incrementing waits. 

226 # If the entity is no longer found the methode returns with a success 

227 # If the deletion attempt fails after the 10th try, raise an 

228 # Exception: it could not be deleted 

229 counter = 0 

230 while counter < 10: 

231 self.delete(url=url, headers=headers, params=params) 

232 try: 

233 self.get_entity_by_id(entity_id=entity_id, entity_type=entity_type) 

234 except requests.exceptions.RequestException as err: 

235 self.logger.info("Entity id '%s' successfully deleted!", entity_id) 

236 return entity_id 

237 time.sleep(counter * 5) 

238 counter += 1 

239 

240 msg = f"Could not delete QL entity of id {entity_id}" 

241 logger.error(msg=msg) 

242 raise Exception(msg) 

243 

244 def delete_entity_type(self, entity_type: str) -> str: 

245 """ 

246 Given an entity type, delete all the historical records of all 

247 entities of such type. 

248 Args: 

249 entity_type (String): Type of entities data to be deleted. 

250 Returns: 

251 Entity type of the entities deleted. 

252 """ 

253 url = urljoin(self.base_url, f"v2/types/{entity_type}") 

254 headers = self.headers.copy() 

255 try: 

256 res = self.delete(url=url, headers=headers) 

257 if res.ok: 

258 self.logger.info( 

259 "Entities of type '%s' successfully deleted!", entity_type 

260 ) 

261 return entity_type 

262 res.raise_for_status() 

263 except requests.exceptions.RequestException as err: 

264 msg = f"Could not delete entities of type {entity_type}" 

265 self.log_error(err=err, msg=msg) 

266 raise BaseHttpClientException(message=msg, response=err.response) from err 

267 

268 # QUERY API ENDPOINTS 

269 def __query_builder( 

270 self, 

271 url, 

272 *, 

273 entity_id: str = None, 

274 id_pattern: str = None, 

275 options: str = None, 

276 entity_type: str = None, 

277 aggr_method: Union[str, AggrMethod] = None, 

278 aggr_period: Union[str, AggrPeriod] = None, 

279 from_date: str = None, 

280 to_date: str = None, 

281 last_n: int = None, 

282 limit: int = 10000, 

283 offset: int = 0, 

284 georel: str = None, 

285 geometry: str = None, 

286 coords: str = None, 

287 attrs: str = None, 

288 aggr_scope: Union[str, AggrScope] = None, 

289 ) -> Deque[Dict]: 

290 """ 

291 Private Function to call respective API endpoints, chops large 

292 requests into multiple single requests and merges the 

293 responses 

294 

295 Args: 

296 url: 

297 entity_id: 

298 options: 

299 entity_type: 

300 aggr_method: 

301 aggr_period: 

302 from_date: 

303 to_date: 

304 last_n: 

305 limit: 

306 Maximum number of results to retrieve in a single response. 

307 offset: 

308 Offset to apply to the response results. For example, if the 

309 query was to return 10 results and you use an offset of 1, the 

310 response will return the last 9 values. Make sure you don't 

311 give more offset than the number of results. 

312 georel: 

313 geometry: 

314 coords: 

315 attrs: 

316 aggr_scope: 

317 id_pattern (str): The pattern covering the entity ids for which 

318 to subscribe. The pattern follow regular expressions (POSIX 

319 Extendede) e.g. ".*", "Room.*". Detail information: 

320 https://en.wikibooks.org/wiki/Regular_Expressions/POSIX-Extended_Regular_Expressions 

321 

322 Returns: 

323 Dict 

324 """ 

325 assert ( 

326 id_pattern is None or entity_id is None 

327 ), "Cannot have both id and idPattern as parameter." 

328 params = {} 

329 headers = self.headers.copy() 

330 max_records_per_request = 10000 

331 # create a double ending queue 

332 res_q: Deque[Dict] = deque([]) 

333 

334 if options: 

335 params.update({"options": options}) 

336 if entity_type: 

337 params.update({"type": entity_type}) 

338 if aggr_method: 

339 aggr_method = AggrMethod(aggr_method) 

340 params.update({"aggrMethod": aggr_method.value}) 

341 if aggr_period: 

342 aggr_period = AggrPeriod(aggr_period) 

343 params.update({"aggrPeriod": aggr_period.value}) 

344 if from_date: 

345 params.update({"fromDate": from_date}) 

346 if to_date: 

347 params.update({"toDate": to_date}) 

348 # These values are required for the integrated pagination mechanism 

349 # maximum items per request 

350 if limit is None: 

351 limit = inf 

352 if offset is None: 

353 offset = 0 

354 if georel: 

355 params.update({"georel": georel}) 

356 if coords: 

357 params.update({"coords": coords}) 

358 if geometry: 

359 params.update({"geometry": geometry}) 

360 if attrs: 

361 params.update({"attrs": attrs}) 

362 if aggr_scope: 

363 aggr_scope = AggrScope(aggr_scope) 

364 params.update({"aggr_scope": aggr_scope.value}) 

365 if entity_id: 

366 params.update({"id": entity_id}) 

367 if id_pattern: 

368 params.update({"idPattern": id_pattern}) 

369 

370 # This loop will chop large requests into smaller junks. 

371 # The individual functions will then merge the final response models 

372 for i in count(0, max_records_per_request): 

373 try: 

374 params["offset"] = offset + i 

375 

376 params["limit"] = min(limit - i, max_records_per_request) 

377 if params["limit"] <= 0: 

378 break 

379 

380 if last_n: 

381 params["lastN"] = min(last_n - i, max_records_per_request) 

382 if params["lastN"] <= 0: 

383 break 

384 

385 res = self.get(url=url, params=params, headers=headers) 

386 

387 if res.ok: 

388 self.logger.debug("Received: %s", res.json()) 

389 

390 # revert append direction when using last_n 

391 if last_n: 

392 res_q.appendleft(res.json()) 

393 else: 

394 res_q.append(res.json()) 

395 res.raise_for_status() 

396 

397 except requests.exceptions.RequestException as err: 

398 if ( 

399 err.response.status_code == 404 

400 and err.response.json().get("error") == "Not Found" 

401 and len(res_q) > 0 

402 ): 

403 break 

404 else: 

405 msg = "Could not load entity data" 

406 self.log_error(err=err, msg=msg) 

407 raise BaseHttpClientException( 

408 message=msg, response=err.response 

409 ) from err 

410 

411 self.logger.info("Successfully retrieved entity data") 

412 return res_q 

413 

414 # v2/entities 

415 def get_entities( 

416 self, 

417 *, 

418 entity_type: str = None, 

419 id_pattern: str = None, 

420 from_date: str = None, 

421 to_date: str = None, 

422 limit: int = 10000, 

423 offset: int = None, 

424 ) -> List[TimeSeriesHeader]: 

425 """ 

426 Get list of all available entities and their context information 

427 about EntityType and last update date. 

428 

429 Args: 

430 entity_type (str): Comma-separated list of entity types whose data 

431 are to be included in the response. Use only one (no comma) 

432 when required. If used to resolve ambiguity for the given 

433 entityId, make sure the given entityId exists for this 

434 entityType. 

435 id_pattern (str): The pattern covering the entity ids for which 

436 to subscribe. The pattern follow regular expressions (POSIX 

437 Extendede) e.g. ".*", "Room.*". Detail information: 

438 https://en.wikibooks.org/wiki/Regular_Expressions/POSIX-Extended_Regular_Expressions 

439 from_date (str): The starting date and time (inclusive) from which 

440 the context information is queried. Must be in ISO8601 format 

441 (e.g., 2018-01-05T15:44:34) 

442 to_date (str): The final date and time (inclusive) from which the 

443 context information is queried. Must be in ISO8601 format 

444 (e.g., 2018-01-05T15:44:34). 

445 limit (int): Maximum number of results to be retrieved. 

446 Default value : 10000 

447 offset (int): Offset for the results. 

448 

449 Returns: 

450 List of TimeSeriesHeader 

451 """ 

452 url = urljoin(self.base_url, "v2/entities") 

453 res = self.__query_builder( 

454 url=url, 

455 id_pattern=id_pattern, 

456 entity_type=entity_type, 

457 from_date=from_date, 

458 to_date=to_date, 

459 limit=limit, 

460 offset=offset, 

461 ) 

462 

463 ta = TypeAdapter(List[TimeSeriesHeader]) 

464 return ta.validate_python(res[0]) 

465 

466 # /entities/{entityId} 

467 def get_entity_by_id( 

468 self, 

469 entity_id: str, 

470 *, 

471 attrs: str = None, 

472 entity_type: str = None, 

473 aggr_method: Union[str, AggrMethod] = None, 

474 aggr_period: Union[str, AggrPeriod] = None, 

475 from_date: str = None, 

476 to_date: str = None, 

477 last_n: int = None, 

478 limit: int = 10000, 

479 offset: int = None, 

480 georel: str = None, 

481 geometry: str = None, 

482 coords: str = None, 

483 options: str = None, 

484 ) -> TimeSeries: 

485 """ 

486 History of N attributes of a given entity instance 

487 For example, query max water level of the central tank throughout the 

488 last year. Queries can get more 

489 sophisticated with the use of filters and query attributes. 

490 

491 Args: 

492 entity_id (String): Entity id is required. 

493 attrs (String): 

494 Comma-separated list of attribute names whose data are to be 

495 included in the response. The attributes are retrieved in the 

496 order specified by this parameter. If not specified, all 

497 attributes are included in the response in arbitrary order. 

498 entity_type (String): Comma-separated list of entity types whose 

499 data are to be included in the response. 

500 aggr_method (String): 

501 The function to apply to the raw data filtered by the query 

502 parameters. If not given, the returned data are the same raw 

503 inserted data. 

504 

505 Allowed values: count, sum, avg, min, max 

506 aggr_period (String): 

507 If not defined, the aggregation will apply to all the values 

508 contained in the search result. If defined, the aggregation 

509 function will instead be applied N times, once for each 

510 period, and all those results will be considered for the 

511 response. For example, a query asking for the average 

512 temperature of an attribute will typically return 1 value. 

513 However, with an aggregationPeriod of day, you get the daily 

514 average of the temperature instead (more than one value 

515 assuming you had measurements across many days within the 

516 scope of your search result). aggrPeriod must be accompanied 

517 by an aggrMethod, and the aggrMethod will be applied to all 

518 the numeric attributes specified in attrs; the rest of the 

519 non-numerical attrs will be ignored. By default, the response 

520 is grouped by entity_id. See aggrScope to create aggregation 

521 across entities: 

522 

523 Allowed values: year, month, day, hour, minute, second 

524 

525 from_date (String): 

526 The starting date and time (inclusive) from which the context 

527 information is queried. Must be in ISO8601 format (e.g., 

528 2018-01-05T15:44:34) 

529 to_date (String): 

530 The final date and time (inclusive) from which the context 

531 information is queried. Must be in ISO8601 format (e.g., 

532 2018-01-05T15:44:34) 

533 last_n (int): 

534 Used to request only the last N values that satisfy the 

535 request conditions. 

536 limit (int): Maximum number of results to be retrieved. 

537 Default value : 10000 

538 offset (int): 

539 Offset to apply to the response results. 

540 georel (String): 

541 It specifies a spatial relationship between matching entities 

542 and a reference shape (geometry). This parameter is used to 

543 perform geographical queries with the same semantics as in the 

544 FIWARE-NGSI v2 Specification. Full details can be found in the 

545 Geographical Queries section of the specification: 

546 https://fiware.github.io/specifications/ngsiv2/stable/. 

547 geometry (String): 

548 Required if georel is specified. point, line, polygon, box 

549 coords (String): 

550 Optional but required if georel is specified. This parameter 

551 defines the reference shape (geometry) in terms of WGS 84 

552 coordinates and has the same semantics as in the 

553 FIWARE-NGSI v2 Specification, except we only accept coordinates 

554 in decimal degrees---e.g. 40.714,-74.006 is okay, but not 

555 40 42' 51'',74 0' 21''. Full details can be found in the 

556 Geographical Queries section of the specification: 

557 https://fiware.github.io/specifications/ngsiv2/stable/. 

558 options (String): Key value pair options. 

559 

560 Returns: 

561 TimeSeries 

562 """ 

563 url = urljoin(self.base_url, f"v2/entities/{entity_id}") 

564 res_q = self.__query_builder( 

565 url=url, 

566 attrs=attrs, 

567 options=options, 

568 entity_type=entity_type, 

569 aggr_method=aggr_method, 

570 aggr_period=aggr_period, 

571 from_date=from_date, 

572 to_date=to_date, 

573 last_n=last_n, 

574 limit=limit, 

575 offset=offset, 

576 georel=georel, 

577 geometry=geometry, 

578 coords=coords, 

579 ) 

580 # merge response chunks 

581 res = TimeSeries.model_validate(res_q.popleft()) 

582 for item in res_q: 

583 res.extend(TimeSeries.model_validate(item)) 

584 

585 return res 

586 

587 # /entities/{entityId}/value 

588 def get_entity_values_by_id( 

589 self, 

590 entity_id: str, 

591 *, 

592 attrs: str = None, 

593 entity_type: str = None, 

594 aggr_method: Union[str, AggrMethod] = None, 

595 aggr_period: Union[str, AggrPeriod] = None, 

596 from_date: str = None, 

597 to_date: str = None, 

598 last_n: int = None, 

599 limit: int = 10000, 

600 offset: int = None, 

601 georel: str = None, 

602 geometry: str = None, 

603 coords: str = None, 

604 options: str = None, 

605 ) -> TimeSeries: 

606 """ 

607 History of N attributes (values only) of a given entity instance 

608 For example, query the average pressure, temperature and humidity ( 

609 values only, no metadata) of this 

610 month in the weather station WS1. 

611 

612 Args: 

613 entity_id (String): Entity id is required. 

614 attrs (String): Comma-separated list of attribute names 

615 entity_type (String): Comma-separated list of entity types whose 

616 data are to be included in the response. 

617 aggr_method (String): The function to apply to the raw data 

618 filtered. count, sum, avg, min, max 

619 aggr_period (String): year, month, day, hour, minute, second 

620 from_date (String): Starting date and time inclusive. 

621 to_date (String): Final date and time inclusive. 

622 last_n (int): Request only the last N values. 

623 limit (int): Maximum number of results to be retrieved. 

624 Default value : 10000 

625 offset (int): Offset for the results. 

626 georel (String): Geographical pattern 

627 geometry (String): Required if georel is specified. point, line, 

628 polygon, box 

629 coords (String): Required if georel is specified. 

630 e.g. 40.714,-74.006 

631 options (String): Key value pair options. 

632 

633 Returns: 

634 Response Model 

635 """ 

636 url = urljoin(self.base_url, f"v2/entities/{entity_id}/value") 

637 res_q = self.__query_builder( 

638 url=url, 

639 attrs=attrs, 

640 options=options, 

641 entity_type=entity_type, 

642 aggr_method=aggr_method, 

643 aggr_period=aggr_period, 

644 from_date=from_date, 

645 to_date=to_date, 

646 last_n=last_n, 

647 limit=limit, 

648 offset=offset, 

649 georel=georel, 

650 geometry=geometry, 

651 coords=coords, 

652 ) 

653 

654 # merge response chunks 

655 res = TimeSeries(entityId=entity_id, **res_q.popleft()) 

656 for item in res_q: 

657 res.extend(TimeSeries(entityId=entity_id, **item)) 

658 

659 return res 

660 

661 # /entities/{entityId}/attrs/{attrName} 

662 def get_entity_attr_by_id( 

663 self, 

664 entity_id: str, 

665 attr_name: str, 

666 *, 

667 entity_type: str = None, 

668 aggr_method: Union[str, AggrMethod] = None, 

669 aggr_period: Union[str, AggrPeriod] = None, 

670 from_date: str = None, 

671 to_date: str = None, 

672 last_n: int = None, 

673 limit: int = 10000, 

674 offset: int = None, 

675 georel: str = None, 

676 geometry: str = None, 

677 coords: str = None, 

678 options: str = None, 

679 ) -> TimeSeries: 

680 """ 

681 History of an attribute of a given entity instance 

682 For example, query max water level of the central tank throughout the 

683 last year. Queries can get more 

684 sophisticated with the use of filters and query attributes. 

685 

686 Args: 

687 entity_id (String): Entity id is required. 

688 attr_name (String): The attribute name is required. 

689 entity_type (String): Comma-separated list of entity types whose 

690 data are to be included in the response. 

691 aggr_method (String): The function to apply to the raw data 

692 filtered. count, sum, avg, min, max 

693 aggr_period (String): year, month, day, hour, minute, second 

694 from_date (String): Starting date and time inclusive. 

695 to_date (String): Final date and time inclusive. 

696 last_n (int): Request only the last N values. 

697 limit (int): Maximum number of results to be retrieved. 

698 Default value : 10000 

699 offset (int): Offset for the results. 

700 georel (String): Geographical pattern 

701 geometry (String): Required if georel is specified. point, line, 

702 polygon, box 

703 coords (String): Required if georel is specified. 

704 e.g. 40.714,-74.006 

705 options (String): Key value pair options. 

706 

707 Returns: 

708 Response Model 

709 """ 

710 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs" f"/{attr_name}") 

711 req_q = self.__query_builder( 

712 url=url, 

713 entity_id=entity_id, 

714 options=options, 

715 entity_type=entity_type, 

716 aggr_method=aggr_method, 

717 aggr_period=aggr_period, 

718 from_date=from_date, 

719 to_date=to_date, 

720 last_n=last_n, 

721 limit=limit, 

722 offset=offset, 

723 georel=georel, 

724 geometry=geometry, 

725 coords=coords, 

726 ) 

727 

728 # merge response chunks 

729 first = req_q.popleft() 

730 res = TimeSeries( 

731 entityId=entity_id, 

732 index=first.get("index"), 

733 attributes=[AttributeValues(**first)], 

734 ) 

735 for item in req_q: 

736 res.extend( 

737 TimeSeries( 

738 entityId=entity_id, 

739 index=item.get("index"), 

740 attributes=[AttributeValues(**item)], 

741 ) 

742 ) 

743 

744 return res 

745 

746 # /entities/{entityId}/attrs/{attrName}/value 

747 def get_entity_attr_values_by_id( 

748 self, 

749 entity_id: str, 

750 attr_name: str, 

751 *, 

752 entity_type: str = None, 

753 aggr_method: Union[str, AggrMethod] = None, 

754 aggr_period: Union[str, AggrPeriod] = None, 

755 from_date: str = None, 

756 to_date: str = None, 

757 last_n: int = None, 

758 limit: int = 10000, 

759 offset: int = None, 

760 georel: str = None, 

761 geometry: str = None, 

762 coords: str = None, 

763 options: str = None, 

764 ) -> TimeSeries: 

765 """ 

766 History of an attribute (values only) of a given entity instance 

767 Similar to the previous, but focusing on the values regardless of the 

768 metadata. 

769 

770 Args: 

771 entity_id (String): Entity id is required. 

772 attr_name (String): The attribute name is required. 

773 entity_type (String): Comma-separated list of entity types whose 

774 data are to be included in the response. 

775 aggr_method (String): The function to apply to the raw data 

776 filtered. count, sum, avg, min, max 

777 aggr_period (String): year, month, day, hour, minute, second 

778 from_date (String): Starting date and time inclusive. 

779 to_date (String): Final date and time inclusive. 

780 last_n (int): Request only the last N values. 

781 limit (int): Maximum number of results to be retrieved. 

782 Default value : 10000 

783 offset (int): Offset for the results. 

784 georel (String): Geographical pattern 

785 geometry (String): Required if georel is specified. point, line, 

786 polygon, box 

787 coords (String): Required if georel is specified. 

788 e.g. 40.714,-74.006 

789 options (String): Key value pair options. 

790 

791 Returns: 

792 Response Model 

793 """ 

794 url = urljoin( 

795 self.base_url, f"v2/entities/{entity_id}/attrs" f"/{attr_name}/value" 

796 ) 

797 res_q = self.__query_builder( 

798 url=url, 

799 options=options, 

800 entity_type=entity_type, 

801 aggr_method=aggr_method, 

802 aggr_period=aggr_period, 

803 from_date=from_date, 

804 to_date=to_date, 

805 last_n=last_n, 

806 limit=limit, 

807 offset=offset, 

808 georel=georel, 

809 geometry=geometry, 

810 coords=coords, 

811 ) 

812 # merge response chunks 

813 first = res_q.popleft() 

814 res = TimeSeries( 

815 entityId=entity_id, 

816 index=first.get("index"), 

817 attributes=[ 

818 AttributeValues(attrName=attr_name, values=first.get("values")) 

819 ], 

820 ) 

821 for item in res_q: 

822 res.extend( 

823 TimeSeries( 

824 entityId=entity_id, 

825 index=item.get("index"), 

826 attributes=[ 

827 AttributeValues(attrName=attr_name, values=item.get("values")) 

828 ], 

829 ) 

830 ) 

831 

832 return res 

833 

834 # /types/{entityType} 

835 def get_entity_by_type( 

836 self, 

837 entity_type: str, 

838 *, 

839 attrs: str = None, 

840 entity_id: str = None, 

841 id_pattern: str = None, 

842 aggr_method: Union[str, AggrMethod] = None, 

843 aggr_period: Union[str, AggrPeriod] = None, 

844 from_date: str = None, 

845 to_date: str = None, 

846 last_n: int = None, 

847 limit: int = 10000, 

848 offset: int = None, 

849 georel: str = None, 

850 geometry: str = None, 

851 coords: str = None, 

852 options: str = None, 

853 aggr_scope: Union[str, AggrScope] = None, 

854 ) -> List[TimeSeries]: 

855 """ 

856 History of N attributes of N entities of the same type. 

857 For example, query the average pressure, temperature and humidity of 

858 this month in all the weather stations. 

859 """ 

860 url = urljoin(self.base_url, f"v2/types/{entity_type}") 

861 res_q = self.__query_builder( 

862 url=url, 

863 entity_id=entity_id, 

864 id_pattern=id_pattern, 

865 attrs=attrs, 

866 options=options, 

867 aggr_method=aggr_method, 

868 aggr_period=aggr_period, 

869 from_date=from_date, 

870 to_date=to_date, 

871 last_n=last_n, 

872 limit=limit, 

873 offset=offset, 

874 georel=georel, 

875 geometry=geometry, 

876 coords=coords, 

877 aggr_scope=aggr_scope, 

878 ) 

879 

880 # merge chunks of response 

881 res = [ 

882 TimeSeries(entityType=entity_type, **item) 

883 for item in res_q.popleft().get("entities") 

884 ] 

885 

886 for chunk in res_q: 

887 chunk = [ 

888 TimeSeries(entityType=entity_type, **item) 

889 for item in chunk.get("entities") 

890 ] 

891 for new, old in zip(chunk, res): 

892 old.extend(new) 

893 

894 return res 

895 

896 # /types/{entityType}/value 

897 def get_entity_values_by_type( 

898 self, 

899 entity_type: str, 

900 *, 

901 attrs: str = None, 

902 entity_id: str = None, 

903 id_pattern: str = None, 

904 aggr_method: Union[str, AggrMethod] = None, 

905 aggr_period: Union[str, AggrPeriod] = None, 

906 from_date: str = None, 

907 to_date: str = None, 

908 last_n: int = None, 

909 limit: int = 10000, 

910 offset: int = None, 

911 georel: str = None, 

912 geometry: str = None, 

913 coords: str = None, 

914 options: str = None, 

915 aggr_scope: Union[str, AggrScope] = None, 

916 ) -> List[TimeSeries]: 

917 """ 

918 History of N attributes (values only) of N entities of the same type. 

919 For example, query the average pressure, temperature and humidity ( 

920 values only, no metadata) of this month in 

921 all the weather stations. 

922 """ 

923 url = urljoin(self.base_url, f"v2/types/{entity_type}/value") 

924 res_q = self.__query_builder( 

925 url=url, 

926 entity_id=entity_id, 

927 id_pattern=id_pattern, 

928 attrs=attrs, 

929 options=options, 

930 entity_type=entity_type, 

931 aggr_method=aggr_method, 

932 aggr_period=aggr_period, 

933 from_date=from_date, 

934 to_date=to_date, 

935 last_n=last_n, 

936 limit=limit, 

937 offset=offset, 

938 georel=georel, 

939 geometry=geometry, 

940 coords=coords, 

941 aggr_scope=aggr_scope, 

942 ) 

943 # merge chunks of response 

944 res = [ 

945 TimeSeries(entityType=entity_type, **item) 

946 for item in res_q.popleft().get("values") 

947 ] 

948 

949 for chunk in res_q: 

950 chunk = [ 

951 TimeSeries(entityType=entity_type, **item) 

952 for item in chunk.get("values") 

953 ] 

954 for new, old in zip(chunk, res): 

955 old.extend(new) 

956 

957 return res 

958 

959 # /types/{entityType}/attrs/{attrName} 

960 def get_entity_attr_by_type( 

961 self, 

962 entity_type: str, 

963 attr_name: str, 

964 *, 

965 entity_id: str = None, 

966 id_pattern: str = None, 

967 aggr_method: Union[str, AggrMethod] = None, 

968 aggr_period: Union[str, AggrPeriod] = None, 

969 from_date: str = None, 

970 to_date: str = None, 

971 last_n: int = None, 

972 limit: int = 10000, 

973 offset: int = None, 

974 georel: str = None, 

975 geometry: str = None, 

976 coords: str = None, 

977 options: str = None, 

978 aggr_scope: Union[str, AggrScope] = None, 

979 ) -> List[TimeSeries]: 

980 """ 

981 History of an attribute of N entities of the same type. 

982 For example, query the pressure measurements of this month in all the 

983 weather stations. Note in the response, 

984 the index and values arrays are parallel. Also, when using 

985 aggrMethod, the aggregation is done by-entity 

986 instance. In this case, the index array is just the fromDate and 

987 toDate values user specified in the request 

988 (if any). 

989 

990 Args: 

991 entity_type (String): Entity type is required. 

992 attr_name (String): The attribute name is required. 

993 entity_id (String): Comma-separated list of entity ids whose data 

994 are to be included in the response. 

995 aggr_method (String): The function to apply to the raw data 

996 filtered. count, sum, avg, min, max 

997 aggr_period (String): year, month, day, hour, minute, second 

998 aggr_scope (str): Optional. (This parameter is not yet supported). 

999 When the query results cover historical data for multiple 

1000 entities instances, you can define the aggregation method to be 

1001 applied for each entity instance [entity] or across 

1002 them [global] 

1003 from_date (String): Starting date and time inclusive. 

1004 to_date (String): Final date and time inclusive. 

1005 last_n (int): Request only the last N values. 

1006 limit (int): Maximum number of results to be retrieved. 

1007 Default value : 10000 

1008 offset (int): Offset for the results. 

1009 georel (String): Geographical pattern 

1010 geometry (String): Required if georel is specified. point, line, 

1011 polygon, box 

1012 coords (String): Required if georel is specified. 

1013 e.g. 40.714,-74.006 

1014 options (String): Key value pair options. 

1015 

1016 Returns: 

1017 Response Model 

1018 """ 

1019 url = urljoin(self.base_url, f"v2/types/{entity_type}/attrs" f"/{attr_name}") 

1020 res_q = self.__query_builder( 

1021 url=url, 

1022 entity_id=entity_id, 

1023 id_pattern=id_pattern, 

1024 options=options, 

1025 entity_type=entity_type, 

1026 aggr_method=aggr_method, 

1027 aggr_period=aggr_period, 

1028 from_date=from_date, 

1029 to_date=to_date, 

1030 last_n=last_n, 

1031 limit=limit, 

1032 offset=offset, 

1033 georel=georel, 

1034 geometry=geometry, 

1035 coords=coords, 

1036 aggr_scope=aggr_scope, 

1037 ) 

1038 

1039 # merge chunks of response 

1040 first = res_q.popleft() 

1041 res = [ 

1042 TimeSeries( 

1043 index=item.get("index"), 

1044 entityType=entity_type, 

1045 entityId=item.get("entityId"), 

1046 attributes=[ 

1047 AttributeValues( 

1048 attrName=first.get("attrName"), values=item.get("values") 

1049 ) 

1050 ], 

1051 ) 

1052 for item in first.get("entities") 

1053 ] 

1054 

1055 for chunk in res_q: 

1056 chunk = [ 

1057 TimeSeries( 

1058 index=item.get("index"), 

1059 entityType=entity_type, 

1060 entityId=item.get("entityId"), 

1061 attributes=[ 

1062 AttributeValues( 

1063 attrName=chunk.get("attrName"), values=item.get("values") 

1064 ) 

1065 ], 

1066 ) 

1067 for item in chunk.get("entities") 

1068 ] 

1069 for new, old in zip(chunk, res): 

1070 old.extend(new) 

1071 

1072 return res 

1073 

1074 # /types/{entityType}/attrs/{attrName}/value 

1075 def get_entity_attr_values_by_type( 

1076 self, 

1077 entity_type: str, 

1078 attr_name: str, 

1079 *, 

1080 entity_id: str = None, 

1081 id_pattern: str = None, 

1082 aggr_method: Union[str, AggrMethod] = None, 

1083 aggr_period: Union[str, AggrPeriod] = None, 

1084 from_date: str = None, 

1085 to_date: str = None, 

1086 last_n: int = None, 

1087 limit: int = 10000, 

1088 offset: int = None, 

1089 georel: str = None, 

1090 geometry: str = None, 

1091 coords: str = None, 

1092 options: str = None, 

1093 aggr_scope: Union[str, AggrScope] = None, 

1094 ) -> List[TimeSeries]: 

1095 """ 

1096 History of an attribute (values only) of N entities of the same type. 

1097 For example, query the average pressure (values only, no metadata) of 

1098 this month in all the weather stations. 

1099 

1100 Args: 

1101 aggr_scope: 

1102 entity_type (String): Entity type is required. 

1103 attr_name (String): The attribute name is required. 

1104 entity_id (String): Comma-separated list of entity ids whose data 

1105 are to be included in the response. 

1106 aggr_method (String): The function to apply to the raw data 

1107 filtered. count, sum, avg, min, max 

1108 aggr_period (String): year, month, day, hour, minute, second 

1109 aggr_scope (String): 

1110 from_date (String): Starting date and time inclusive. 

1111 to_date (String): Final date and time inclusive. 

1112 last_n (int): Request only the last N values. 

1113 limit (int): Maximum number of results to be retrieved. 

1114 Default value : 10000 

1115 offset (int): Offset for the results. 

1116 georel (String): Geographical pattern 

1117 geometry (String): Required if georel is specified. point, line, 

1118 polygon, box 

1119 coords (String): Required if georel is specified. 

1120 e.g. 40.714,-74.006 

1121 options (String): Key value pair options. 

1122 

1123 Returns: 

1124 Response Model 

1125 """ 

1126 url = urljoin( 

1127 self.base_url, f"v2/types/{entity_type}/attrs/" f"{attr_name}/value" 

1128 ) 

1129 res_q = self.__query_builder( 

1130 url=url, 

1131 entity_id=entity_id, 

1132 id_pattern=id_pattern, 

1133 options=options, 

1134 entity_type=entity_type, 

1135 aggr_method=aggr_method, 

1136 aggr_period=aggr_period, 

1137 from_date=from_date, 

1138 to_date=to_date, 

1139 last_n=last_n, 

1140 limit=limit, 

1141 offset=offset, 

1142 georel=georel, 

1143 geometry=geometry, 

1144 coords=coords, 

1145 aggr_scope=aggr_scope, 

1146 ) 

1147 

1148 # merge chunks of response 

1149 res = [ 

1150 TimeSeries( 

1151 index=item.get("index"), 

1152 entityType=entity_type, 

1153 entityId=item.get("entityId"), 

1154 attributes=[ 

1155 AttributeValues(attrName=attr_name, values=item.get("values")) 

1156 ], 

1157 ) 

1158 for item in res_q.popleft().get("values") 

1159 ] 

1160 

1161 for chunk in res_q: 

1162 chunk = [ 

1163 TimeSeries( 

1164 index=item.get("index"), 

1165 entityType=entity_type, 

1166 entityId=item.get("entityId"), 

1167 attributes=[ 

1168 AttributeValues(attrName=attr_name, values=item.get("values")) 

1169 ], 

1170 ) 

1171 for item in chunk.get("values") 

1172 ] 

1173 

1174 for new, old in zip(chunk, res): 

1175 old.extend(new) 

1176 return res 

1177 

1178 # v2/attrs 

1179 def get_entity_by_attrs( 

1180 self, 

1181 *, 

1182 entity_type: str = None, 

1183 from_date: str = None, 

1184 to_date: str = None, 

1185 limit: int = 10000, 

1186 offset: int = None, 

1187 ) -> List[TimeSeries]: 

1188 """ 

1189 Get list of timeseries data grouped by each existing attribute name. 

1190 The timeseries data include all entities corresponding to each 

1191 attribute name as well as the index and values of this attribute in 

1192 this entity. 

1193 

1194 Args: 

1195 entity_type (str): Comma-separated list of entity types whose data 

1196 are to be included in the response. Use only one (no comma) 

1197 when required. If used to resolve ambiguity for the given 

1198 entityId, make sure the given entityId exists for this 

1199 entityType. 

1200 from_date (str): The starting date and time (inclusive) from which 

1201 the context information is queried. Must be in ISO8601 format 

1202 (e.g., 2018-01-05T15:44:34) 

1203 to_date (str): The final date and time (inclusive) from which the 

1204 context information is queried. Must be in ISO8601 format 

1205 (e.g., 2018-01-05T15:44:34). 

1206 limit (int): Maximum number of results to be retrieved. 

1207 Default value : 10000 

1208 offset (int): Offset for the results. 

1209 

1210 Returns: 

1211 List of TimeSeriesEntities 

1212 """ 

1213 url = urljoin(self.base_url, "v2/attrs") 

1214 res_q = self.__query_builder( 

1215 url=url, 

1216 entity_type=entity_type, 

1217 from_date=from_date, 

1218 to_date=to_date, 

1219 limit=limit, 

1220 offset=offset, 

1221 ) 

1222 first = res_q.popleft() 

1223 

1224 res = chain.from_iterable( 

1225 map(lambda x: self.transform_attr_response_model(x), first.get("attrs")) 

1226 ) 

1227 for chunk in res_q: 

1228 chunk = chain.from_iterable( 

1229 map(lambda x: self.transform_attr_response_model(x), chunk.get("attrs")) 

1230 ) 

1231 

1232 for new, old in zip(chunk, res): 

1233 old.extend(new) 

1234 

1235 return list(res) 

1236 

1237 # v2/attrs/{attr_name} 

1238 def get_entity_by_attr_name( 

1239 self, 

1240 *, 

1241 attr_name: str, 

1242 entity_type: str = None, 

1243 from_date: str = None, 

1244 to_date: str = None, 

1245 limit: int = 10000, 

1246 offset: int = None, 

1247 ) -> List[TimeSeries]: 

1248 """ 

1249 Get list of all entities containing this attribute name, as well as 

1250 getting the index and values of this attribute in every corresponding 

1251 entity. 

1252 

1253 Args: 

1254 attr_name (str): The attribute name in interest. 

1255 entity_type (str): Comma-separated list of entity types whose data 

1256 are to be included in the response. Use only one (no comma) 

1257 when required. If used to resolve ambiguity for the given 

1258 entityId, make sure the given entityId exists for this 

1259 entityType. 

1260 from_date (str): The starting date and time (inclusive) from which 

1261 the context information is queried. Must be in ISO8601 format 

1262 (e.g., 2018-01-05T15:44:34) 

1263 to_date (str): The final date and time (inclusive) from which the 

1264 context information is queried. Must be in ISO8601 format 

1265 (e.g., 2018-01-05T15:44:34). 

1266 limit (int): Maximum number of results to be retrieved. 

1267 Default value : 10000 

1268 offset (int): Offset for the results. 

1269 

1270 Returns: 

1271 List of TimeSeries 

1272 """ 

1273 url = urljoin(self.base_url, f"/v2/attrs/{attr_name}") 

1274 res_q = self.__query_builder( 

1275 url=url, 

1276 entity_type=entity_type, 

1277 from_date=from_date, 

1278 to_date=to_date, 

1279 limit=limit, 

1280 offset=offset, 

1281 ) 

1282 

1283 first = res_q.popleft() 

1284 res = self.transform_attr_response_model(first) 

1285 

1286 for chunk in res_q: 

1287 chunk = self.transform_attr_response_model(chunk) 

1288 for new, old in zip(chunk, res): 

1289 old.extend(new) 

1290 return list(res) 

1291 

1292 def transform_attr_response_model(self, attr_response): 

1293 res = [] 

1294 attr_name = attr_response.get("attrName") 

1295 for entity_group in attr_response.get("types"): 

1296 timeseries = map( 

1297 lambda entity: TimeSeries( 

1298 entityId=entity.get("entityId"), 

1299 entityType=entity_group.get("entityType"), 

1300 index=entity.get("index"), 

1301 attributes=[ 

1302 AttributeValues(attrName=attr_name, values=entity.get("values")) 

1303 ], 

1304 ), 

1305 entity_group.get("entities"), 

1306 ) 

1307 res.append(timeseries) 

1308 return chain.from_iterable(res)