Coverage for filip/clients/ngsi_v2/quantumleap.py: 78%

231 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-03-10 13:43 +0000

1""" 

2TimeSeries Module for QuantumLeap API Client 

3""" 

4 

5import logging 

6import time 

7from math import inf 

8from collections import deque 

9from itertools import count, chain 

10from typing import Dict, List, Union, Deque, Optional 

11from urllib.parse import urljoin 

12import requests 

13from pydantic import AnyHttpUrl 

14from pydantic.type_adapter import TypeAdapter 

15from filip import settings 

16from filip.clients.base_http_client import BaseHttpClient 

17from filip.models.base import FiwareHeader 

18from filip.models.ngsi_v2.subscriptions import Message 

19from filip.models.ngsi_v2.timeseries import ( 

20 AggrPeriod, 

21 AggrMethod, 

22 AggrScope, 

23 AttributeValues, 

24 TimeSeries, 

25 TimeSeriesHeader, 

26) 

27 

28 

29logger = logging.getLogger(__name__) 

30 

31 

32class QuantumLeapClient(BaseHttpClient): 

33 """ 

34 Implements functions to use the FIWARE's QuantumLeap, which subscribes to an 

35 Orion Context Broker and stores the subscription data in a time series 

36 database (CrateDB). Further Information: 

37 https://smartsdk.github.io/ngsi-timeseries-api/#quantumleap 

38 https://app.swaggerhub.com/apis/heikkilv/quantumleap-api/ 

39 

40 Args: 

41 url: url of the quantumleap service 

42 session (Optional): 

43 fiware_header: 

44 **kwargs: 

45 """ 

46 

47 def __init__( 

48 self, 

49 url: str = None, 

50 *, 

51 session: requests.Session = None, 

52 fiware_header: FiwareHeader = None, 

53 **kwargs, 

54 ): 

55 # set service url 

56 url = url or settings.QL_URL 

57 super().__init__( 

58 url=url, session=session, fiware_header=fiware_header, **kwargs 

59 ) 

60 

61 # META API ENDPOINTS 

62 def get_version(self) -> Dict: 

63 """ 

64 Gets version of QuantumLeap-Service. 

65 

66 Returns: 

67 Dictionary with response 

68 """ 

69 url = urljoin(self.base_url, "version") 

70 try: 

71 res = self.get(url=url, headers=self.headers) 

72 if res.ok: 

73 return res.json() 

74 res.raise_for_status() 

75 except requests.exceptions.RequestException as err: 

76 self.logger.error(err) 

77 raise 

78 

79 def get_health(self) -> Dict: 

80 """ 

81 This endpoint is intended for administrators of QuantumLeap. Using the 

82 information returned by this endpoint they can diagnose problems in the 

83 service or its dependencies. This information is also useful for cloud 

84 tools such as orchestrators and load balancers with rules based on 

85 health-checks. Due to the lack of a standardized response format, we 

86 base the implementation on the draft of 

87 https://inadarei.github.io/rfc-healthcheck/ 

88 

89 Returns: 

90 Dictionary with response 

91 """ 

92 url = urljoin(self.base_url, "health") 

93 try: 

94 res = self.get(url=url, headers=self.headers) 

95 if res.ok: 

96 return res.json() 

97 res.raise_for_status() 

98 except requests.exceptions.RequestException as err: 

99 self.logger.error(err) 

100 raise 

101 

102 def post_config(self): 

103 """ 

104 (To Be Implemented) Customize your persistence configuration to 

105 better suit your needs. 

106 """ 

107 raise NotImplementedError("Endpoint to be implemented..") 

108 

109 # INPUT API ENDPOINTS 

110 def post_notification(self, notification: Message): 

111 """ 

112 Notify QuantumLeap the arrival of a new NGSI notification. 

113 

114 Args: 

115 notification: Notification Message Object 

116 """ 

117 url = urljoin(self.base_url, "v2/notify") 

118 headers = self.headers.copy() 

119 data = [] 

120 for entity in notification.data: 

121 data.append(entity.model_dump(exclude_none=True)) 

122 data_set = {"data": data, "subscriptionId": notification.subscriptionId} 

123 

124 try: 

125 res = self.post(url=url, headers=headers, json=data_set) 

126 if res.ok: 

127 self.logger.debug(res.text) 

128 else: 

129 res.raise_for_status() 

130 except requests.exceptions.RequestException as err: 

131 msg = ( 

132 f"Could not post notification for subscription id " 

133 f"{notification.subscriptionId}" 

134 ) 

135 self.log_error(err=err, msg=msg) 

136 raise 

137 

138 def post_subscription( 

139 self, 

140 cb_url: Union[AnyHttpUrl, str], 

141 ql_url: Union[AnyHttpUrl, str], 

142 entity_type: str = None, 

143 entity_id: str = None, 

144 id_pattern: str = None, 

145 attributes: str = None, 

146 observed_attributes: str = None, 

147 notified_attributes: str = None, 

148 throttling: int = None, 

149 time_index_attribute: str = None, 

150 ): 

151 """ 

152 Subscribe QL to process Orion notifications of certain type. 

153 This endpoint simplifies the creation of the subscription in orion 

154 that will generate the notifications to be consumed by QuantumLeap in 

155 order to save historical records. If you want an advanced specification 

156 of the notifications, you can always create the subscription in orion 

157 at your will. This endpoint just aims to simplify the common use case. 

158 

159 Args: 

160 cb_url: 

161 url of the context broker 

162 ql_url: 

163 The url where Orion can reach QuantumLeap. Do not include 

164 specific paths. 

165 entity_type (String): 

166 The type of entities for which to create a 

167 subscription, so as to persist historical data of entities of 

168 this type. 

169 entity_id (String): 

170 Id of the entity to track. If specified, it 

171 takes precedence over the idPattern parameter. 

172 id_pattern (String): The pattern covering the entity ids for which 

173 to subscribe. If not specified, QL will track all entities of 

174 the specified type. 

175 attributes (String): Comma-separated list of attribute names to 

176 track. 

177 observed_attributes (String): Comma-separated list of attribute 

178 names to track. 

179 notified_attributes (String): Comma-separated list of attribute 

180 names to be used to restrict the data of which QL will keep a 

181 history. 

182 throttling (int): Minimal period of time in seconds which must 

183 elapse between two consecutive notifications. 

184 time_index_attribute (String): The name of a custom attribute to be 

185 used as a 

186 time index. 

187 """ 

188 raise DeprecationWarning( 

189 "Subscription endpoint of Quantumleap API is " 

190 "deprecated, use the ORION subscription endpoint " 

191 "instead" 

192 ) 

193 

194 def delete_entity(self, entity_id: str, entity_type: Optional[str] = None) -> str: 

195 """ 

196 Given an entity (with type and id), delete all its historical records. 

197 

198 Args: 

199 entity_id (String): Entity id is required. 

200 entity_type (Optional[String]): Entity type if entity_id alone 

201 can not uniquely define the entity. 

202 

203 Raises: 

204 RequestException, if entity was not found 

205 Exception, if deleting was not successful 

206 

207 Returns: 

208 The entity_id of entity that is deleted. 

209 """ 

210 url = urljoin(self.base_url, f"v2/entities/{entity_id}") 

211 headers = self.headers.copy() 

212 if entity_type is not None: 

213 params = {"type": entity_type} 

214 else: 

215 params = {} 

216 

217 # The deletion does not always resolves in a success even if an ok is 

218 # returned. 

219 # Try to delete multiple times with incrementing waits. 

220 # If the entity is no longer found the methode returns with a success 

221 # If the deletion attempt fails after the 10th try, raise an 

222 # Exception: it could not be deleted 

223 counter = 0 

224 while counter < 10: 

225 self.delete(url=url, headers=headers, params=params) 

226 try: 

227 self.get_entity_by_id(entity_id=entity_id, entity_type=entity_type) 

228 except requests.exceptions.RequestException as err: 

229 self.logger.info("Entity id '%s' successfully deleted!", entity_id) 

230 return entity_id 

231 time.sleep(counter * 5) 

232 counter += 1 

233 

234 msg = f"Could not delete QL entity of id {entity_id}" 

235 logger.error(msg=msg) 

236 raise Exception(msg) 

237 

238 def delete_entity_type(self, entity_type: str) -> str: 

239 """ 

240 Given an entity type, delete all the historical records of all 

241 entities of such type. 

242 Args: 

243 entity_type (String): Type of entities data to be deleted. 

244 Returns: 

245 Entity type of the entities deleted. 

246 """ 

247 url = urljoin(self.base_url, f"v2/types/{entity_type}") 

248 headers = self.headers.copy() 

249 try: 

250 res = self.delete(url=url, headers=headers) 

251 if res.ok: 

252 self.logger.info( 

253 "Entities of type '%s' successfully deleted!", entity_type 

254 ) 

255 return entity_type 

256 res.raise_for_status() 

257 except requests.exceptions.RequestException as err: 

258 msg = f"Could not delete entities of type {entity_type}" 

259 self.log_error(err=err, msg=msg) 

260 raise 

261 

262 # QUERY API ENDPOINTS 

263 def __query_builder( 

264 self, 

265 url, 

266 *, 

267 entity_id: str = None, 

268 id_pattern: str = None, 

269 options: str = None, 

270 entity_type: str = None, 

271 aggr_method: Union[str, AggrMethod] = None, 

272 aggr_period: Union[str, AggrPeriod] = None, 

273 from_date: str = None, 

274 to_date: str = None, 

275 last_n: int = None, 

276 limit: int = 10000, 

277 offset: int = 0, 

278 georel: str = None, 

279 geometry: str = None, 

280 coords: str = None, 

281 attrs: str = None, 

282 aggr_scope: Union[str, AggrScope] = None, 

283 ) -> Deque[Dict]: 

284 """ 

285 Private Function to call respective API endpoints, chops large 

286 requests into multiple single requests and merges the 

287 responses 

288 

289 Args: 

290 url: 

291 entity_id: 

292 options: 

293 entity_type: 

294 aggr_method: 

295 aggr_period: 

296 from_date: 

297 to_date: 

298 last_n: 

299 limit: 

300 Maximum number of results to retrieve in a single response. 

301 offset: 

302 Offset to apply to the response results. For example, if the 

303 query was to return 10 results and you use an offset of 1, the 

304 response will return the last 9 values. Make sure you don't 

305 give more offset than the number of results. 

306 georel: 

307 geometry: 

308 coords: 

309 attrs: 

310 aggr_scope: 

311 id_pattern (str): The pattern covering the entity ids for which 

312 to subscribe. The pattern follow regular expressions (POSIX 

313 Extendede) e.g. ".*", "Room.*". Detail information: 

314 https://en.wikibooks.org/wiki/Regular_Expressions/POSIX-Extended_Regular_Expressions 

315 

316 Returns: 

317 Dict 

318 """ 

319 assert ( 

320 id_pattern is None or entity_id is None 

321 ), "Cannot have both id and idPattern as parameter." 

322 params = {} 

323 headers = self.headers.copy() 

324 max_records_per_request = 10000 

325 # create a double ending queue 

326 res_q: Deque[Dict] = deque([]) 

327 

328 if options: 

329 params.update({"options": options}) 

330 if entity_type: 

331 params.update({"type": entity_type}) 

332 if aggr_method: 

333 aggr_method = AggrMethod(aggr_method) 

334 params.update({"aggrMethod": aggr_method.value}) 

335 if aggr_period: 

336 aggr_period = AggrPeriod(aggr_period) 

337 params.update({"aggrPeriod": aggr_period.value}) 

338 if from_date: 

339 params.update({"fromDate": from_date}) 

340 if to_date: 

341 params.update({"toDate": to_date}) 

342 # These values are required for the integrated pagination mechanism 

343 # maximum items per request 

344 if limit is None: 

345 limit = inf 

346 if offset is None: 

347 offset = 0 

348 if georel: 

349 params.update({"georel": georel}) 

350 if coords: 

351 params.update({"coords": coords}) 

352 if geometry: 

353 params.update({"geometry": geometry}) 

354 if attrs: 

355 params.update({"attrs": attrs}) 

356 if aggr_scope: 

357 aggr_scope = AggrScope(aggr_scope) 

358 params.update({"aggr_scope": aggr_scope.value}) 

359 if entity_id: 

360 params.update({"id": entity_id}) 

361 if id_pattern: 

362 params.update({"idPattern": id_pattern}) 

363 

364 # This loop will chop large requests into smaller junks. 

365 # The individual functions will then merge the final response models 

366 for i in count(0, max_records_per_request): 

367 try: 

368 params["offset"] = offset + i 

369 

370 params["limit"] = min(limit - i, max_records_per_request) 

371 if params["limit"] <= 0: 

372 break 

373 

374 if last_n: 

375 params["lastN"] = min(last_n - i, max_records_per_request) 

376 if params["lastN"] <= 0: 

377 break 

378 

379 res = self.get(url=url, params=params, headers=headers) 

380 

381 if res.ok: 

382 self.logger.debug("Received: %s", res.json()) 

383 

384 # revert append direction when using last_n 

385 if last_n: 

386 res_q.appendleft(res.json()) 

387 else: 

388 res_q.append(res.json()) 

389 res.raise_for_status() 

390 

391 except requests.exceptions.RequestException as err: 

392 if ( 

393 err.response.status_code == 404 

394 and err.response.json().get("error") == "Not Found" 

395 and len(res_q) > 0 

396 ): 

397 break 

398 else: 

399 msg = "Could not load entity data" 

400 self.log_error(err=err, msg=msg) 

401 raise 

402 

403 self.logger.info("Successfully retrieved entity data") 

404 return res_q 

405 

406 # v2/entities 

407 def get_entities( 

408 self, 

409 *, 

410 entity_type: str = None, 

411 id_pattern: str = None, 

412 from_date: str = None, 

413 to_date: str = None, 

414 limit: int = 10000, 

415 offset: int = None, 

416 ) -> List[TimeSeriesHeader]: 

417 """ 

418 Get list of all available entities and their context information 

419 about EntityType and last update date. 

420 

421 Args: 

422 entity_type (str): Comma-separated list of entity types whose data 

423 are to be included in the response. Use only one (no comma) 

424 when required. If used to resolve ambiguity for the given 

425 entityId, make sure the given entityId exists for this 

426 entityType. 

427 id_pattern (str): The pattern covering the entity ids for which 

428 to subscribe. The pattern follow regular expressions (POSIX 

429 Extendede) e.g. ".*", "Room.*". Detail information: 

430 https://en.wikibooks.org/wiki/Regular_Expressions/POSIX-Extended_Regular_Expressions 

431 from_date (str): The starting date and time (inclusive) from which 

432 the context information is queried. Must be in ISO8601 format 

433 (e.g., 2018-01-05T15:44:34) 

434 to_date (str): The final date and time (inclusive) from which the 

435 context information is queried. Must be in ISO8601 format 

436 (e.g., 2018-01-05T15:44:34). 

437 limit (int): Maximum number of results to be retrieved. 

438 Default value : 10000 

439 offset (int): Offset for the results. 

440 

441 Returns: 

442 List of TimeSeriesHeader 

443 """ 

444 url = urljoin(self.base_url, "v2/entities") 

445 res = self.__query_builder( 

446 url=url, 

447 id_pattern=id_pattern, 

448 entity_type=entity_type, 

449 from_date=from_date, 

450 to_date=to_date, 

451 limit=limit, 

452 offset=offset, 

453 ) 

454 

455 ta = TypeAdapter(List[TimeSeriesHeader]) 

456 return ta.validate_python(res[0]) 

457 

458 # /entities/{entityId} 

459 def get_entity_by_id( 

460 self, 

461 entity_id: str, 

462 *, 

463 attrs: str = None, 

464 entity_type: str = None, 

465 aggr_method: Union[str, AggrMethod] = None, 

466 aggr_period: Union[str, AggrPeriod] = None, 

467 from_date: str = None, 

468 to_date: str = None, 

469 last_n: int = None, 

470 limit: int = 10000, 

471 offset: int = None, 

472 georel: str = None, 

473 geometry: str = None, 

474 coords: str = None, 

475 options: str = None, 

476 ) -> TimeSeries: 

477 """ 

478 History of N attributes of a given entity instance 

479 For example, query max water level of the central tank throughout the 

480 last year. Queries can get more 

481 sophisticated with the use of filters and query attributes. 

482 

483 Args: 

484 entity_id (String): Entity id is required. 

485 attrs (String): 

486 Comma-separated list of attribute names whose data are to be 

487 included in the response. The attributes are retrieved in the 

488 order specified by this parameter. If not specified, all 

489 attributes are included in the response in arbitrary order. 

490 entity_type (String): Comma-separated list of entity types whose 

491 data are to be included in the response. 

492 aggr_method (String): 

493 The function to apply to the raw data filtered by the query 

494 parameters. If not given, the returned data are the same raw 

495 inserted data. 

496 

497 Allowed values: count, sum, avg, min, max 

498 aggr_period (String): 

499 If not defined, the aggregation will apply to all the values 

500 contained in the search result. If defined, the aggregation 

501 function will instead be applied N times, once for each 

502 period, and all those results will be considered for the 

503 response. For example, a query asking for the average 

504 temperature of an attribute will typically return 1 value. 

505 However, with an aggregationPeriod of day, you get the daily 

506 average of the temperature instead (more than one value 

507 assuming you had measurements across many days within the 

508 scope of your search result). aggrPeriod must be accompanied 

509 by an aggrMethod, and the aggrMethod will be applied to all 

510 the numeric attributes specified in attrs; the rest of the 

511 non-numerical attrs will be ignored. By default, the response 

512 is grouped by entity_id. See aggrScope to create aggregation 

513 across entities: 

514 

515 Allowed values: year, month, day, hour, minute, second 

516 

517 from_date (String): 

518 The starting date and time (inclusive) from which the context 

519 information is queried. Must be in ISO8601 format (e.g., 

520 2018-01-05T15:44:34) 

521 to_date (String): 

522 The final date and time (inclusive) from which the context 

523 information is queried. Must be in ISO8601 format (e.g., 

524 2018-01-05T15:44:34) 

525 last_n (int): 

526 Used to request only the last N values that satisfy the 

527 request conditions. 

528 limit (int): Maximum number of results to be retrieved. 

529 Default value : 10000 

530 offset (int): 

531 Offset to apply to the response results. 

532 georel (String): 

533 It specifies a spatial relationship between matching entities 

534 and a reference shape (geometry). This parameter is used to 

535 perform geographical queries with the same semantics as in the 

536 FIWARE-NGSI v2 Specification. Full details can be found in the 

537 Geographical Queries section of the specification: 

538 https://fiware.github.io/specifications/ngsiv2/stable/. 

539 geometry (String): 

540 Required if georel is specified. point, line, polygon, box 

541 coords (String): 

542 Optional but required if georel is specified. This parameter 

543 defines the reference shape (geometry) in terms of WGS 84 

544 coordinates and has the same semantics as in the 

545 FIWARE-NGSI v2 Specification, except we only accept coordinates 

546 in decimal degrees---e.g. 40.714,-74.006 is okay, but not 

547 40 42' 51'',74 0' 21''. Full details can be found in the 

548 Geographical Queries section of the specification: 

549 https://fiware.github.io/specifications/ngsiv2/stable/. 

550 options (String): Key value pair options. 

551 

552 Returns: 

553 TimeSeries 

554 """ 

555 url = urljoin(self.base_url, f"v2/entities/{entity_id}") 

556 res_q = self.__query_builder( 

557 url=url, 

558 attrs=attrs, 

559 options=options, 

560 entity_type=entity_type, 

561 aggr_method=aggr_method, 

562 aggr_period=aggr_period, 

563 from_date=from_date, 

564 to_date=to_date, 

565 last_n=last_n, 

566 limit=limit, 

567 offset=offset, 

568 georel=georel, 

569 geometry=geometry, 

570 coords=coords, 

571 ) 

572 # merge response chunks 

573 res = TimeSeries.model_validate(res_q.popleft()) 

574 for item in res_q: 

575 res.extend(TimeSeries.model_validate(item)) 

576 

577 return res 

578 

579 # /entities/{entityId}/value 

580 def get_entity_values_by_id( 

581 self, 

582 entity_id: str, 

583 *, 

584 attrs: str = None, 

585 entity_type: str = None, 

586 aggr_method: Union[str, AggrMethod] = None, 

587 aggr_period: Union[str, AggrPeriod] = None, 

588 from_date: str = None, 

589 to_date: str = None, 

590 last_n: int = None, 

591 limit: int = 10000, 

592 offset: int = None, 

593 georel: str = None, 

594 geometry: str = None, 

595 coords: str = None, 

596 options: str = None, 

597 ) -> TimeSeries: 

598 """ 

599 History of N attributes (values only) of a given entity instance 

600 For example, query the average pressure, temperature and humidity ( 

601 values only, no metadata) of this 

602 month in the weather station WS1. 

603 

604 Args: 

605 entity_id (String): Entity id is required. 

606 attrs (String): Comma-separated list of attribute names 

607 entity_type (String): Comma-separated list of entity types whose 

608 data are to be included in the response. 

609 aggr_method (String): The function to apply to the raw data 

610 filtered. count, sum, avg, min, max 

611 aggr_period (String): year, month, day, hour, minute, second 

612 from_date (String): Starting date and time inclusive. 

613 to_date (String): Final date and time inclusive. 

614 last_n (int): Request only the last N values. 

615 limit (int): Maximum number of results to be retrieved. 

616 Default value : 10000 

617 offset (int): Offset for the results. 

618 georel (String): Geographical pattern 

619 geometry (String): Required if georel is specified. point, line, 

620 polygon, box 

621 coords (String): Required if georel is specified. 

622 e.g. 40.714,-74.006 

623 options (String): Key value pair options. 

624 

625 Returns: 

626 Response Model 

627 """ 

628 url = urljoin(self.base_url, f"v2/entities/{entity_id}/value") 

629 res_q = self.__query_builder( 

630 url=url, 

631 attrs=attrs, 

632 options=options, 

633 entity_type=entity_type, 

634 aggr_method=aggr_method, 

635 aggr_period=aggr_period, 

636 from_date=from_date, 

637 to_date=to_date, 

638 last_n=last_n, 

639 limit=limit, 

640 offset=offset, 

641 georel=georel, 

642 geometry=geometry, 

643 coords=coords, 

644 ) 

645 

646 # merge response chunks 

647 res = TimeSeries(entityId=entity_id, **res_q.popleft()) 

648 for item in res_q: 

649 res.extend(TimeSeries(entityId=entity_id, **item)) 

650 

651 return res 

652 

653 # /entities/{entityId}/attrs/{attrName} 

654 def get_entity_attr_by_id( 

655 self, 

656 entity_id: str, 

657 attr_name: str, 

658 *, 

659 entity_type: str = None, 

660 aggr_method: Union[str, AggrMethod] = None, 

661 aggr_period: Union[str, AggrPeriod] = None, 

662 from_date: str = None, 

663 to_date: str = None, 

664 last_n: int = None, 

665 limit: int = 10000, 

666 offset: int = None, 

667 georel: str = None, 

668 geometry: str = None, 

669 coords: str = None, 

670 options: str = None, 

671 ) -> TimeSeries: 

672 """ 

673 History of an attribute of a given entity instance 

674 For example, query max water level of the central tank throughout the 

675 last year. Queries can get more 

676 sophisticated with the use of filters and query attributes. 

677 

678 Args: 

679 entity_id (String): Entity id is required. 

680 attr_name (String): The attribute name is required. 

681 entity_type (String): Comma-separated list of entity types whose 

682 data are to be included in the response. 

683 aggr_method (String): The function to apply to the raw data 

684 filtered. count, sum, avg, min, max 

685 aggr_period (String): year, month, day, hour, minute, second 

686 from_date (String): Starting date and time inclusive. 

687 to_date (String): Final date and time inclusive. 

688 last_n (int): Request only the last N values. 

689 limit (int): Maximum number of results to be retrieved. 

690 Default value : 10000 

691 offset (int): Offset for the results. 

692 georel (String): Geographical pattern 

693 geometry (String): Required if georel is specified. point, line, 

694 polygon, box 

695 coords (String): Required if georel is specified. 

696 e.g. 40.714,-74.006 

697 options (String): Key value pair options. 

698 

699 Returns: 

700 Response Model 

701 """ 

702 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs" f"/{attr_name}") 

703 req_q = self.__query_builder( 

704 url=url, 

705 entity_id=entity_id, 

706 options=options, 

707 entity_type=entity_type, 

708 aggr_method=aggr_method, 

709 aggr_period=aggr_period, 

710 from_date=from_date, 

711 to_date=to_date, 

712 last_n=last_n, 

713 limit=limit, 

714 offset=offset, 

715 georel=georel, 

716 geometry=geometry, 

717 coords=coords, 

718 ) 

719 

720 # merge response chunks 

721 first = req_q.popleft() 

722 res = TimeSeries( 

723 entityId=entity_id, 

724 index=first.get("index"), 

725 attributes=[AttributeValues(**first)], 

726 ) 

727 for item in req_q: 

728 res.extend( 

729 TimeSeries( 

730 entityId=entity_id, 

731 index=item.get("index"), 

732 attributes=[AttributeValues(**item)], 

733 ) 

734 ) 

735 

736 return res 

737 

738 # /entities/{entityId}/attrs/{attrName}/value 

739 def get_entity_attr_values_by_id( 

740 self, 

741 entity_id: str, 

742 attr_name: str, 

743 *, 

744 entity_type: str = None, 

745 aggr_method: Union[str, AggrMethod] = None, 

746 aggr_period: Union[str, AggrPeriod] = None, 

747 from_date: str = None, 

748 to_date: str = None, 

749 last_n: int = None, 

750 limit: int = 10000, 

751 offset: int = None, 

752 georel: str = None, 

753 geometry: str = None, 

754 coords: str = None, 

755 options: str = None, 

756 ) -> TimeSeries: 

757 """ 

758 History of an attribute (values only) of a given entity instance 

759 Similar to the previous, but focusing on the values regardless of the 

760 metadata. 

761 

762 Args: 

763 entity_id (String): Entity id is required. 

764 attr_name (String): The attribute name is required. 

765 entity_type (String): Comma-separated list of entity types whose 

766 data are to be included in the response. 

767 aggr_method (String): The function to apply to the raw data 

768 filtered. count, sum, avg, min, max 

769 aggr_period (String): year, month, day, hour, minute, second 

770 from_date (String): Starting date and time inclusive. 

771 to_date (String): Final date and time inclusive. 

772 last_n (int): Request only the last N values. 

773 limit (int): Maximum number of results to be retrieved. 

774 Default value : 10000 

775 offset (int): Offset for the results. 

776 georel (String): Geographical pattern 

777 geometry (String): Required if georel is specified. point, line, 

778 polygon, box 

779 coords (String): Required if georel is specified. 

780 e.g. 40.714,-74.006 

781 options (String): Key value pair options. 

782 

783 Returns: 

784 Response Model 

785 """ 

786 url = urljoin( 

787 self.base_url, f"v2/entities/{entity_id}/attrs" f"/{attr_name}/value" 

788 ) 

789 res_q = self.__query_builder( 

790 url=url, 

791 options=options, 

792 entity_type=entity_type, 

793 aggr_method=aggr_method, 

794 aggr_period=aggr_period, 

795 from_date=from_date, 

796 to_date=to_date, 

797 last_n=last_n, 

798 limit=limit, 

799 offset=offset, 

800 georel=georel, 

801 geometry=geometry, 

802 coords=coords, 

803 ) 

804 # merge response chunks 

805 first = res_q.popleft() 

806 res = TimeSeries( 

807 entityId=entity_id, 

808 index=first.get("index"), 

809 attributes=[ 

810 AttributeValues(attrName=attr_name, values=first.get("values")) 

811 ], 

812 ) 

813 for item in res_q: 

814 res.extend( 

815 TimeSeries( 

816 entityId=entity_id, 

817 index=item.get("index"), 

818 attributes=[ 

819 AttributeValues(attrName=attr_name, values=item.get("values")) 

820 ], 

821 ) 

822 ) 

823 

824 return res 

825 

826 # /types/{entityType} 

827 def get_entity_by_type( 

828 self, 

829 entity_type: str, 

830 *, 

831 attrs: str = None, 

832 entity_id: str = None, 

833 id_pattern: str = None, 

834 aggr_method: Union[str, AggrMethod] = None, 

835 aggr_period: Union[str, AggrPeriod] = None, 

836 from_date: str = None, 

837 to_date: str = None, 

838 last_n: int = None, 

839 limit: int = 10000, 

840 offset: int = None, 

841 georel: str = None, 

842 geometry: str = None, 

843 coords: str = None, 

844 options: str = None, 

845 aggr_scope: Union[str, AggrScope] = None, 

846 ) -> List[TimeSeries]: 

847 """ 

848 History of N attributes of N entities of the same type. 

849 For example, query the average pressure, temperature and humidity of 

850 this month in all the weather stations. 

851 """ 

852 url = urljoin(self.base_url, f"v2/types/{entity_type}") 

853 res_q = self.__query_builder( 

854 url=url, 

855 entity_id=entity_id, 

856 id_pattern=id_pattern, 

857 attrs=attrs, 

858 options=options, 

859 aggr_method=aggr_method, 

860 aggr_period=aggr_period, 

861 from_date=from_date, 

862 to_date=to_date, 

863 last_n=last_n, 

864 limit=limit, 

865 offset=offset, 

866 georel=georel, 

867 geometry=geometry, 

868 coords=coords, 

869 aggr_scope=aggr_scope, 

870 ) 

871 

872 # merge chunks of response 

873 res = [ 

874 TimeSeries(entityType=entity_type, **item) 

875 for item in res_q.popleft().get("entities") 

876 ] 

877 

878 for chunk in res_q: 

879 chunk = [ 

880 TimeSeries(entityType=entity_type, **item) 

881 for item in chunk.get("entities") 

882 ] 

883 for new, old in zip(chunk, res): 

884 old.extend(new) 

885 

886 return res 

887 

888 # /types/{entityType}/value 

889 def get_entity_values_by_type( 

890 self, 

891 entity_type: str, 

892 *, 

893 attrs: str = None, 

894 entity_id: str = None, 

895 id_pattern: str = None, 

896 aggr_method: Union[str, AggrMethod] = None, 

897 aggr_period: Union[str, AggrPeriod] = None, 

898 from_date: str = None, 

899 to_date: str = None, 

900 last_n: int = None, 

901 limit: int = 10000, 

902 offset: int = None, 

903 georel: str = None, 

904 geometry: str = None, 

905 coords: str = None, 

906 options: str = None, 

907 aggr_scope: Union[str, AggrScope] = None, 

908 ) -> List[TimeSeries]: 

909 """ 

910 History of N attributes (values only) of N entities of the same type. 

911 For example, query the average pressure, temperature and humidity ( 

912 values only, no metadata) of this month in 

913 all the weather stations. 

914 """ 

915 url = urljoin(self.base_url, f"v2/types/{entity_type}/value") 

916 res_q = self.__query_builder( 

917 url=url, 

918 entity_id=entity_id, 

919 id_pattern=id_pattern, 

920 attrs=attrs, 

921 options=options, 

922 entity_type=entity_type, 

923 aggr_method=aggr_method, 

924 aggr_period=aggr_period, 

925 from_date=from_date, 

926 to_date=to_date, 

927 last_n=last_n, 

928 limit=limit, 

929 offset=offset, 

930 georel=georel, 

931 geometry=geometry, 

932 coords=coords, 

933 aggr_scope=aggr_scope, 

934 ) 

935 # merge chunks of response 

936 res = [ 

937 TimeSeries(entityType=entity_type, **item) 

938 for item in res_q.popleft().get("values") 

939 ] 

940 

941 for chunk in res_q: 

942 chunk = [ 

943 TimeSeries(entityType=entity_type, **item) 

944 for item in chunk.get("values") 

945 ] 

946 for new, old in zip(chunk, res): 

947 old.extend(new) 

948 

949 return res 

950 

951 # /types/{entityType}/attrs/{attrName} 

952 def get_entity_attr_by_type( 

953 self, 

954 entity_type: str, 

955 attr_name: str, 

956 *, 

957 entity_id: str = None, 

958 id_pattern: str = None, 

959 aggr_method: Union[str, AggrMethod] = None, 

960 aggr_period: Union[str, AggrPeriod] = None, 

961 from_date: str = None, 

962 to_date: str = None, 

963 last_n: int = None, 

964 limit: int = 10000, 

965 offset: int = None, 

966 georel: str = None, 

967 geometry: str = None, 

968 coords: str = None, 

969 options: str = None, 

970 aggr_scope: Union[str, AggrScope] = None, 

971 ) -> List[TimeSeries]: 

972 """ 

973 History of an attribute of N entities of the same type. 

974 For example, query the pressure measurements of this month in all the 

975 weather stations. Note in the response, 

976 the index and values arrays are parallel. Also, when using 

977 aggrMethod, the aggregation is done by-entity 

978 instance. In this case, the index array is just the fromDate and 

979 toDate values user specified in the request 

980 (if any). 

981 

982 Args: 

983 entity_type (String): Entity type is required. 

984 attr_name (String): The attribute name is required. 

985 entity_id (String): Comma-separated list of entity ids whose data 

986 are to be included in the response. 

987 aggr_method (String): The function to apply to the raw data 

988 filtered. count, sum, avg, min, max 

989 aggr_period (String): year, month, day, hour, minute, second 

990 aggr_scope (str): Optional. (This parameter is not yet supported). 

991 When the query results cover historical data for multiple 

992 entities instances, you can define the aggregation method to be 

993 applied for each entity instance [entity] or across 

994 them [global] 

995 from_date (String): Starting date and time inclusive. 

996 to_date (String): Final date and time inclusive. 

997 last_n (int): Request only the last N values. 

998 limit (int): Maximum number of results to be retrieved. 

999 Default value : 10000 

1000 offset (int): Offset for the results. 

1001 georel (String): Geographical pattern 

1002 geometry (String): Required if georel is specified. point, line, 

1003 polygon, box 

1004 coords (String): Required if georel is specified. 

1005 e.g. 40.714,-74.006 

1006 options (String): Key value pair options. 

1007 

1008 Returns: 

1009 Response Model 

1010 """ 

1011 url = urljoin(self.base_url, f"v2/types/{entity_type}/attrs" f"/{attr_name}") 

1012 res_q = self.__query_builder( 

1013 url=url, 

1014 entity_id=entity_id, 

1015 id_pattern=id_pattern, 

1016 options=options, 

1017 entity_type=entity_type, 

1018 aggr_method=aggr_method, 

1019 aggr_period=aggr_period, 

1020 from_date=from_date, 

1021 to_date=to_date, 

1022 last_n=last_n, 

1023 limit=limit, 

1024 offset=offset, 

1025 georel=georel, 

1026 geometry=geometry, 

1027 coords=coords, 

1028 aggr_scope=aggr_scope, 

1029 ) 

1030 

1031 # merge chunks of response 

1032 first = res_q.popleft() 

1033 res = [ 

1034 TimeSeries( 

1035 index=item.get("index"), 

1036 entityType=entity_type, 

1037 entityId=item.get("entityId"), 

1038 attributes=[ 

1039 AttributeValues( 

1040 attrName=first.get("attrName"), values=item.get("values") 

1041 ) 

1042 ], 

1043 ) 

1044 for item in first.get("entities") 

1045 ] 

1046 

1047 for chunk in res_q: 

1048 chunk = [ 

1049 TimeSeries( 

1050 index=item.get("index"), 

1051 entityType=entity_type, 

1052 entityId=item.get("entityId"), 

1053 attributes=[ 

1054 AttributeValues( 

1055 attrName=chunk.get("attrName"), values=item.get("values") 

1056 ) 

1057 ], 

1058 ) 

1059 for item in chunk.get("entities") 

1060 ] 

1061 for new, old in zip(chunk, res): 

1062 old.extend(new) 

1063 

1064 return res 

1065 

1066 # /types/{entityType}/attrs/{attrName}/value 

1067 def get_entity_attr_values_by_type( 

1068 self, 

1069 entity_type: str, 

1070 attr_name: str, 

1071 *, 

1072 entity_id: str = None, 

1073 id_pattern: str = None, 

1074 aggr_method: Union[str, AggrMethod] = None, 

1075 aggr_period: Union[str, AggrPeriod] = None, 

1076 from_date: str = None, 

1077 to_date: str = None, 

1078 last_n: int = None, 

1079 limit: int = 10000, 

1080 offset: int = None, 

1081 georel: str = None, 

1082 geometry: str = None, 

1083 coords: str = None, 

1084 options: str = None, 

1085 aggr_scope: Union[str, AggrScope] = None, 

1086 ) -> List[TimeSeries]: 

1087 """ 

1088 History of an attribute (values only) of N entities of the same type. 

1089 For example, query the average pressure (values only, no metadata) of 

1090 this month in all the weather stations. 

1091 

1092 Args: 

1093 aggr_scope: 

1094 entity_type (String): Entity type is required. 

1095 attr_name (String): The attribute name is required. 

1096 entity_id (String): Comma-separated list of entity ids whose data 

1097 are to be included in the response. 

1098 aggr_method (String): The function to apply to the raw data 

1099 filtered. count, sum, avg, min, max 

1100 aggr_period (String): year, month, day, hour, minute, second 

1101 aggr_scope (String): 

1102 from_date (String): Starting date and time inclusive. 

1103 to_date (String): Final date and time inclusive. 

1104 last_n (int): Request only the last N values. 

1105 limit (int): Maximum number of results to be retrieved. 

1106 Default value : 10000 

1107 offset (int): Offset for the results. 

1108 georel (String): Geographical pattern 

1109 geometry (String): Required if georel is specified. point, line, 

1110 polygon, box 

1111 coords (String): Required if georel is specified. 

1112 e.g. 40.714,-74.006 

1113 options (String): Key value pair options. 

1114 

1115 Returns: 

1116 Response Model 

1117 """ 

1118 url = urljoin( 

1119 self.base_url, f"v2/types/{entity_type}/attrs/" f"{attr_name}/value" 

1120 ) 

1121 res_q = self.__query_builder( 

1122 url=url, 

1123 entity_id=entity_id, 

1124 id_pattern=id_pattern, 

1125 options=options, 

1126 entity_type=entity_type, 

1127 aggr_method=aggr_method, 

1128 aggr_period=aggr_period, 

1129 from_date=from_date, 

1130 to_date=to_date, 

1131 last_n=last_n, 

1132 limit=limit, 

1133 offset=offset, 

1134 georel=georel, 

1135 geometry=geometry, 

1136 coords=coords, 

1137 aggr_scope=aggr_scope, 

1138 ) 

1139 

1140 # merge chunks of response 

1141 res = [ 

1142 TimeSeries( 

1143 index=item.get("index"), 

1144 entityType=entity_type, 

1145 entityId=item.get("entityId"), 

1146 attributes=[ 

1147 AttributeValues(attrName=attr_name, values=item.get("values")) 

1148 ], 

1149 ) 

1150 for item in res_q.popleft().get("values") 

1151 ] 

1152 

1153 for chunk in res_q: 

1154 chunk = [ 

1155 TimeSeries( 

1156 index=item.get("index"), 

1157 entityType=entity_type, 

1158 entityId=item.get("entityId"), 

1159 attributes=[ 

1160 AttributeValues(attrName=attr_name, values=item.get("values")) 

1161 ], 

1162 ) 

1163 for item in chunk.get("values") 

1164 ] 

1165 

1166 for new, old in zip(chunk, res): 

1167 old.extend(new) 

1168 return res 

1169 

1170 # v2/attrs 

1171 def get_entity_by_attrs( 

1172 self, 

1173 *, 

1174 entity_type: str = None, 

1175 from_date: str = None, 

1176 to_date: str = None, 

1177 limit: int = 10000, 

1178 offset: int = None, 

1179 ) -> List[TimeSeries]: 

1180 """ 

1181 Get list of timeseries data grouped by each existing attribute name. 

1182 The timeseries data include all entities corresponding to each 

1183 attribute name as well as the index and values of this attribute in 

1184 this entity. 

1185 

1186 Args: 

1187 entity_type (str): Comma-separated list of entity types whose data 

1188 are to be included in the response. Use only one (no comma) 

1189 when required. If used to resolve ambiguity for the given 

1190 entityId, make sure the given entityId exists for this 

1191 entityType. 

1192 from_date (str): The starting date and time (inclusive) from which 

1193 the context information is queried. Must be in ISO8601 format 

1194 (e.g., 2018-01-05T15:44:34) 

1195 to_date (str): The final date and time (inclusive) from which the 

1196 context information is queried. Must be in ISO8601 format 

1197 (e.g., 2018-01-05T15:44:34). 

1198 limit (int): Maximum number of results to be retrieved. 

1199 Default value : 10000 

1200 offset (int): Offset for the results. 

1201 

1202 Returns: 

1203 List of TimeSeriesEntities 

1204 """ 

1205 url = urljoin(self.base_url, "v2/attrs") 

1206 res_q = self.__query_builder( 

1207 url=url, 

1208 entity_type=entity_type, 

1209 from_date=from_date, 

1210 to_date=to_date, 

1211 limit=limit, 

1212 offset=offset, 

1213 ) 

1214 first = res_q.popleft() 

1215 

1216 res = chain.from_iterable( 

1217 map(lambda x: self.transform_attr_response_model(x), first.get("attrs")) 

1218 ) 

1219 for chunk in res_q: 

1220 chunk = chain.from_iterable( 

1221 map(lambda x: self.transform_attr_response_model(x), chunk.get("attrs")) 

1222 ) 

1223 

1224 for new, old in zip(chunk, res): 

1225 old.extend(new) 

1226 

1227 return list(res) 

1228 

1229 # v2/attrs/{attr_name} 

1230 def get_entity_by_attr_name( 

1231 self, 

1232 *, 

1233 attr_name: str, 

1234 entity_type: str = None, 

1235 from_date: str = None, 

1236 to_date: str = None, 

1237 limit: int = 10000, 

1238 offset: int = None, 

1239 ) -> List[TimeSeries]: 

1240 """ 

1241 Get list of all entities containing this attribute name, as well as 

1242 getting the index and values of this attribute in every corresponding 

1243 entity. 

1244 

1245 Args: 

1246 attr_name (str): The attribute name in interest. 

1247 entity_type (str): Comma-separated list of entity types whose data 

1248 are to be included in the response. Use only one (no comma) 

1249 when required. If used to resolve ambiguity for the given 

1250 entityId, make sure the given entityId exists for this 

1251 entityType. 

1252 from_date (str): The starting date and time (inclusive) from which 

1253 the context information is queried. Must be in ISO8601 format 

1254 (e.g., 2018-01-05T15:44:34) 

1255 to_date (str): The final date and time (inclusive) from which the 

1256 context information is queried. Must be in ISO8601 format 

1257 (e.g., 2018-01-05T15:44:34). 

1258 limit (int): Maximum number of results to be retrieved. 

1259 Default value : 10000 

1260 offset (int): Offset for the results. 

1261 

1262 Returns: 

1263 List of TimeSeries 

1264 """ 

1265 url = urljoin(self.base_url, f"/v2/attrs/{attr_name}") 

1266 res_q = self.__query_builder( 

1267 url=url, 

1268 entity_type=entity_type, 

1269 from_date=from_date, 

1270 to_date=to_date, 

1271 limit=limit, 

1272 offset=offset, 

1273 ) 

1274 

1275 first = res_q.popleft() 

1276 res = self.transform_attr_response_model(first) 

1277 

1278 for chunk in res_q: 

1279 chunk = self.transform_attr_response_model(chunk) 

1280 for new, old in zip(chunk, res): 

1281 old.extend(new) 

1282 return list(res) 

1283 

1284 def transform_attr_response_model(self, attr_response): 

1285 res = [] 

1286 attr_name = attr_response.get("attrName") 

1287 for entity_group in attr_response.get("types"): 

1288 timeseries = map( 

1289 lambda entity: TimeSeries( 

1290 entityId=entity.get("entityId"), 

1291 entityType=entity_group.get("entityType"), 

1292 index=entity.get("index"), 

1293 attributes=[ 

1294 AttributeValues(attrName=attr_name, values=entity.get("values")) 

1295 ], 

1296 ), 

1297 entity_group.get("entities"), 

1298 ) 

1299 res.append(timeseries) 

1300 return chain.from_iterable(res)