Coverage for filip/clients/ngsi_v2/quantumleap.py: 78%

231 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-11-20 16:54 +0000

1""" 

2TimeSeries Module for QuantumLeap API Client 

3""" 

4import logging 

5import time 

6from math import inf 

7from collections import deque 

8from itertools import count,chain 

9from typing import Dict, List, Union, Deque, Optional 

10from urllib.parse import urljoin 

11import requests 

12from pydantic import AnyHttpUrl 

13from pydantic.type_adapter import TypeAdapter 

14from filip import settings 

15from filip.clients.base_http_client import BaseHttpClient 

16from filip.models.base import FiwareHeader 

17from filip.models.ngsi_v2.subscriptions import Message 

18from filip.models.ngsi_v2.timeseries import \ 

19 AggrPeriod, \ 

20 AggrMethod, \ 

21 AggrScope, \ 

22 AttributeValues, \ 

23 TimeSeries, \ 

24 TimeSeriesHeader 

25 

26 

27logger = logging.getLogger(__name__) 

28 

29 

30class QuantumLeapClient(BaseHttpClient): 

31 """ 

32 Implements functions to use the FIWARE's QuantumLeap, which subscribes to an 

33 Orion Context Broker and stores the subscription data in a time series 

34 database (CrateDB). Further Information: 

35 https://smartsdk.github.io/ngsi-timeseries-api/#quantumleap 

36 https://app.swaggerhub.com/apis/heikkilv/quantumleap-api/ 

37 

38 Args: 

39 url: url of the quantumleap service 

40 session (Optional): 

41 fiware_header: 

42 **kwargs: 

43 """ 

44 

45 def __init__(self, 

46 url: str = None, 

47 *, 

48 session: requests.Session = None, 

49 fiware_header: FiwareHeader = None, 

50 **kwargs): 

51 # set service url 

52 url = url or settings.QL_URL 

53 super().__init__(url=url, 

54 session=session, 

55 fiware_header=fiware_header, 

56 **kwargs) 

57 

58 # META API ENDPOINTS 

59 def get_version(self) -> Dict: 

60 """ 

61 Gets version of QuantumLeap-Service. 

62 

63 Returns: 

64 Dictionary with response 

65 """ 

66 url = urljoin(self.base_url, 'version') 

67 try: 

68 res = self.get(url=url, headers=self.headers) 

69 if res.ok: 

70 return res.json() 

71 res.raise_for_status() 

72 except requests.exceptions.RequestException as err: 

73 self.logger.error(err) 

74 raise 

75 

76 def get_health(self) -> Dict: 

77 """ 

78 This endpoint is intended for administrators of QuantumLeap. Using the 

79 information returned by this endpoint they can diagnose problems in the 

80 service or its dependencies. This information is also useful for cloud 

81 tools such as orchestrators and load balancers with rules based on 

82 health-checks. Due to the lack of a standardized response format, we 

83 base the implementation on the draft of 

84 https://inadarei.github.io/rfc-healthcheck/ 

85 

86 Returns: 

87 Dictionary with response 

88 """ 

89 url = urljoin(self.base_url, 'health') 

90 try: 

91 res = self.get(url=url, headers=self.headers) 

92 if res.ok: 

93 return res.json() 

94 res.raise_for_status() 

95 except requests.exceptions.RequestException as err: 

96 self.logger.error(err) 

97 raise 

98 

99 def post_config(self): 

100 """ 

101 (To Be Implemented) Customize your persistence configuration to 

102 better suit your needs. 

103 """ 

104 raise NotImplementedError("Endpoint to be implemented..") 

105 

106 # INPUT API ENDPOINTS 

107 def post_notification(self, notification: Message): 

108 """ 

109 Notify QuantumLeap the arrival of a new NGSI notification. 

110 

111 Args: 

112 notification: Notification Message Object 

113 """ 

114 url = urljoin(self.base_url, 'v2/notify') 

115 headers = self.headers.copy() 

116 data = [] 

117 for entity in notification.data: 

118 data.append(entity.model_dump(exclude_none=True)) 

119 data_set = { 

120 "data": data, 

121 "subscriptionId": notification.subscriptionId 

122 } 

123 

124 try: 

125 res = self.post( 

126 url=url, 

127 headers=headers, 

128 json=data_set) 

129 if res.ok: 

130 self.logger.debug(res.text) 

131 else: 

132 res.raise_for_status() 

133 except requests.exceptions.RequestException as err: 

134 msg = f"Could not post notification for subscription id " \ 

135 f"{notification.subscriptionId}" 

136 self.log_error(err=err, msg=msg) 

137 raise 

138 

139 def post_subscription(self, 

140 cb_url: Union[AnyHttpUrl, str], 

141 ql_url: Union[AnyHttpUrl, str], 

142 entity_type: str = None, 

143 entity_id: str = None, 

144 id_pattern: str = None, 

145 attributes: str = None, 

146 observed_attributes: str = None, 

147 notified_attributes: str = None, 

148 throttling: int = None, 

149 time_index_attribute: str = None): 

150 """ 

151 Subscribe QL to process Orion notifications of certain type. 

152 This endpoint simplifies the creation of the subscription in orion 

153 that will generate the notifications to be consumed by QuantumLeap in 

154 order to save historical records. If you want an advanced specification 

155 of the notifications, you can always create the subscription in orion 

156 at your will. This endpoint just aims to simplify the common use case. 

157 

158 Args: 

159 cb_url: 

160 url of the context broker 

161 ql_url: 

162 The url where Orion can reach QuantumLeap. Do not include 

163 specific paths. 

164 entity_type (String): 

165 The type of entities for which to create a 

166 subscription, so as to persist historical data of entities of 

167 this type. 

168 entity_id (String): 

169 Id of the entity to track. If specified, it 

170 takes precedence over the idPattern parameter. 

171 id_pattern (String): The pattern covering the entity ids for which 

172 to subscribe. If not specified, QL will track all entities of 

173 the specified type. 

174 attributes (String): Comma-separated list of attribute names to 

175 track. 

176 observed_attributes (String): Comma-separated list of attribute 

177 names to track. 

178 notified_attributes (String): Comma-separated list of attribute 

179 names to be used to restrict the data of which QL will keep a 

180 history. 

181 throttling (int): Minimal period of time in seconds which must 

182 elapse between two consecutive notifications. 

183 time_index_attribute (String): The name of a custom attribute to be 

184 used as a 

185 time index. 

186 """ 

187 raise DeprecationWarning("Subscription endpoint of Quantumleap API is " 

188 "deprecated, use the ORION subscription endpoint " 

189 "instead") 

190 

191 def delete_entity(self, entity_id: str, 

192 entity_type: Optional[str] = None) -> str: 

193 """ 

194 Given an entity (with type and id), delete all its historical records. 

195 

196 Args: 

197 entity_id (String): Entity id is required. 

198 entity_type (Optional[String]): Entity type if entity_id alone 

199 can not uniquely define the entity. 

200 

201 Raises: 

202 RequestException, if entity was not found 

203 Exception, if deleting was not successful 

204 

205 Returns: 

206 The entity_id of entity that is deleted. 

207 """ 

208 url = urljoin(self.base_url, f'v2/entities/{entity_id}') 

209 headers = self.headers.copy() 

210 if entity_type is not None: 

211 params = {'type': entity_type} 

212 else: 

213 params = {} 

214 

215 # The deletion does not always resolves in a success even if an ok is 

216 # returned. 

217 # Try to delete multiple times with incrementing waits. 

218 # If the entity is no longer found the methode returns with a success 

219 # If the deletion attempt fails after the 10th try, raise an 

220 # Exception: it could not be deleted 

221 counter = 0 

222 while counter < 10: 

223 self.delete(url=url, headers=headers, params=params) 

224 try: 

225 self.get_entity_by_id(entity_id=entity_id, 

226 entity_type=entity_type) 

227 except requests.exceptions.RequestException as err: 

228 self.logger.info("Entity id '%s' successfully deleted!", 

229 entity_id) 

230 return entity_id 

231 time.sleep(counter * 5) 

232 counter += 1 

233 

234 msg = f"Could not delete QL entity of id {entity_id}" 

235 logger.error(msg=msg) 

236 raise Exception(msg) 

237 

238 def delete_entity_type(self, entity_type: str) -> str: 

239 """ 

240 Given an entity type, delete all the historical records of all 

241 entities of such type. 

242 Args: 

243 entity_type (String): Type of entities data to be deleted. 

244 Returns: 

245 Entity type of the entities deleted. 

246 """ 

247 url = urljoin(self.base_url, f'v2/types/{entity_type}') 

248 headers = self.headers.copy() 

249 try: 

250 res = self.delete(url=url, headers=headers) 

251 if res.ok: 

252 self.logger.info("Entities of type '%s' successfully deleted!", 

253 entity_type) 

254 return entity_type 

255 res.raise_for_status() 

256 except requests.exceptions.RequestException as err: 

257 msg = f"Could not delete entities of type {entity_type}" 

258 self.log_error(err=err, msg=msg) 

259 raise 

260 

261 # QUERY API ENDPOINTS 

262 def __query_builder(self, 

263 url, 

264 *, 

265 entity_id: str = None, 

266 id_pattern: str = None, 

267 options: str = None, 

268 entity_type: str = None, 

269 aggr_method: Union[str, AggrMethod] = None, 

270 aggr_period: Union[str, AggrPeriod] = None, 

271 from_date: str = None, 

272 to_date: str = None, 

273 last_n: int = None, 

274 limit: int = 10000, 

275 offset: int = 0, 

276 georel: str = None, 

277 geometry: str = None, 

278 coords: str = None, 

279 attrs: str = None, 

280 aggr_scope: Union[str, AggrScope] = None 

281 ) -> Deque[Dict]: 

282 """ 

283 Private Function to call respective API endpoints, chops large 

284 requests into multiple single requests and merges the 

285 responses 

286 

287 Args: 

288 url: 

289 entity_id: 

290 options: 

291 entity_type: 

292 aggr_method: 

293 aggr_period: 

294 from_date: 

295 to_date: 

296 last_n: 

297 limit: 

298 Maximum number of results to retrieve in a single response. 

299 offset: 

300 Offset to apply to the response results. For example, if the 

301 query was to return 10 results and you use an offset of 1, the 

302 response will return the last 9 values. Make sure you don't 

303 give more offset than the number of results. 

304 georel: 

305 geometry: 

306 coords: 

307 attrs: 

308 aggr_scope: 

309 id_pattern (str): The pattern covering the entity ids for which 

310 to subscribe. The pattern follow regular expressions (POSIX 

311 Extendede) e.g. ".*", "Room.*". Detail information: 

312 https://en.wikibooks.org/wiki/Regular_Expressions/POSIX-Extended_Regular_Expressions 

313 

314 Returns: 

315 Dict 

316 """ 

317 assert (id_pattern is None or entity_id is None), "Cannot have both id and idPattern as parameter." 

318 params = {} 

319 headers = self.headers.copy() 

320 max_records_per_request = 10000 

321 # create a double ending queue 

322 res_q: Deque[Dict] = deque([]) 

323 

324 if options: 

325 params.update({'options': options}) 

326 if entity_type: 

327 params.update({'type': entity_type}) 

328 if aggr_method: 

329 aggr_method = AggrMethod(aggr_method) 

330 params.update({'aggrMethod': aggr_method.value}) 

331 if aggr_period: 

332 aggr_period = AggrPeriod(aggr_period) 

333 params.update({'aggrPeriod': aggr_period.value}) 

334 if from_date: 

335 params.update({'fromDate': from_date}) 

336 if to_date: 

337 params.update({'toDate': to_date}) 

338 # These values are required for the integrated pagination mechanism 

339 # maximum items per request 

340 if limit is None: 

341 limit = inf 

342 if offset is None: 

343 offset = 0 

344 if georel: 

345 params.update({'georel': georel}) 

346 if coords: 

347 params.update({'coords': coords}) 

348 if geometry: 

349 params.update({'geometry': geometry}) 

350 if attrs: 

351 params.update({'attrs': attrs}) 

352 if aggr_scope: 

353 aggr_scope = AggrScope(aggr_scope) 

354 params.update({'aggr_scope': aggr_scope.value}) 

355 if entity_id: 

356 params.update({'id': entity_id}) 

357 if id_pattern: 

358 params.update({'idPattern': id_pattern}) 

359 

360 # This loop will chop large requests into smaller junks. 

361 # The individual functions will then merge the final response models 

362 for i in count(0, max_records_per_request): 

363 try: 

364 params['offset'] = offset + i 

365 

366 params['limit'] = min(limit - i, max_records_per_request) 

367 if params['limit'] <= 0: 

368 break 

369 

370 if last_n: 

371 params['lastN'] = min(last_n - i, max_records_per_request) 

372 if params['lastN'] <= 0: 

373 break 

374 

375 res = self.get(url=url, params=params, headers=headers) 

376 

377 if res.ok: 

378 self.logger.debug('Received: %s', res.json()) 

379 

380 # revert append direction when using last_n 

381 if last_n: 

382 res_q.appendleft(res.json()) 

383 else: 

384 res_q.append(res.json()) 

385 res.raise_for_status() 

386 

387 except requests.exceptions.RequestException as err: 

388 if err.response.status_code == 404 and \ 

389 err.response.json().get('error') == 'Not Found' and \ 

390 len(res_q) > 0: 

391 break 

392 else: 

393 msg = "Could not load entity data" 

394 self.log_error(err=err, msg=msg) 

395 raise 

396 

397 self.logger.info("Successfully retrieved entity data") 

398 return res_q 

399 

400 # v2/entities 

401 def get_entities(self, *, 

402 entity_type: str = None, 

403 id_pattern: str = None, 

404 from_date: str = None, 

405 to_date: str = None, 

406 limit: int = 10000, 

407 offset: int = None 

408 ) -> List[TimeSeriesHeader]: 

409 """ 

410 Get list of all available entities and their context information 

411 about EntityType and last update date. 

412 

413 Args: 

414 entity_type (str): Comma-separated list of entity types whose data 

415 are to be included in the response. Use only one (no comma) 

416 when required. If used to resolve ambiguity for the given 

417 entityId, make sure the given entityId exists for this 

418 entityType. 

419 id_pattern (str): The pattern covering the entity ids for which 

420 to subscribe. The pattern follow regular expressions (POSIX 

421 Extendede) e.g. ".*", "Room.*". Detail information: 

422 https://en.wikibooks.org/wiki/Regular_Expressions/POSIX-Extended_Regular_Expressions 

423 from_date (str): The starting date and time (inclusive) from which 

424 the context information is queried. Must be in ISO8601 format 

425 (e.g., 2018-01-05T15:44:34) 

426 to_date (str): The final date and time (inclusive) from which the 

427 context information is queried. Must be in ISO8601 format 

428 (e.g., 2018-01-05T15:44:34). 

429 limit (int): Maximum number of results to be retrieved. 

430 Default value : 10000 

431 offset (int): Offset for the results. 

432 

433 Returns: 

434 List of TimeSeriesHeader 

435 """ 

436 url = urljoin(self.base_url, 'v2/entities') 

437 res = self.__query_builder(url=url, 

438 id_pattern=id_pattern, 

439 entity_type=entity_type, 

440 from_date=from_date, 

441 to_date=to_date, 

442 limit=limit, 

443 offset=offset) 

444 

445 ta = TypeAdapter(List[TimeSeriesHeader]) 

446 return ta.validate_python(res[0]) 

447 

448 # /entities/{entityId} 

449 def get_entity_by_id(self, 

450 entity_id: str, 

451 *, 

452 attrs: str = None, 

453 entity_type: str = None, 

454 aggr_method: Union[str, AggrMethod] = None, 

455 aggr_period: Union[str, AggrPeriod] = None, 

456 from_date: str = None, 

457 to_date: str = None, 

458 last_n: int = None, 

459 limit: int = 10000, 

460 offset: int = None, 

461 georel: str = None, 

462 geometry: str = None, 

463 coords: str = None, 

464 options: str = None 

465 ) -> TimeSeries: 

466 

467 """ 

468 History of N attributes of a given entity instance 

469 For example, query max water level of the central tank throughout the 

470 last year. Queries can get more 

471 sophisticated with the use of filters and query attributes. 

472 

473 Args: 

474 entity_id (String): Entity id is required. 

475 attrs (String): 

476 Comma-separated list of attribute names whose data are to be 

477 included in the response. The attributes are retrieved in the 

478 order specified by this parameter. If not specified, all 

479 attributes are included in the response in arbitrary order. 

480 entity_type (String): Comma-separated list of entity types whose 

481 data are to be included in the response. 

482 aggr_method (String): 

483 The function to apply to the raw data filtered by the query 

484 parameters. If not given, the returned data are the same raw 

485 inserted data. 

486 

487 Allowed values: count, sum, avg, min, max 

488 aggr_period (String): 

489 If not defined, the aggregation will apply to all the values 

490 contained in the search result. If defined, the aggregation 

491 function will instead be applied N times, once for each 

492 period, and all those results will be considered for the 

493 response. For example, a query asking for the average 

494 temperature of an attribute will typically return 1 value. 

495 However, with an aggregationPeriod of day, you get the daily 

496 average of the temperature instead (more than one value 

497 assuming you had measurements across many days within the 

498 scope of your search result). aggrPeriod must be accompanied 

499 by an aggrMethod, and the aggrMethod will be applied to all 

500 the numeric attributes specified in attrs; the rest of the 

501 non-numerical attrs will be ignored. By default, the response 

502 is grouped by entity_id. See aggrScope to create aggregation 

503 across entities: 

504 

505 Allowed values: year, month, day, hour, minute, second 

506 

507 from_date (String): 

508 The starting date and time (inclusive) from which the context 

509 information is queried. Must be in ISO8601 format (e.g., 

510 2018-01-05T15:44:34) 

511 to_date (String): 

512 The final date and time (inclusive) from which the context 

513 information is queried. Must be in ISO8601 format (e.g., 

514 2018-01-05T15:44:34) 

515 last_n (int): 

516 Used to request only the last N values that satisfy the 

517 request conditions. 

518 limit (int): Maximum number of results to be retrieved. 

519 Default value : 10000 

520 offset (int): 

521 Offset to apply to the response results. 

522 georel (String): 

523 It specifies a spatial relationship between matching entities 

524 and a reference shape (geometry). This parameter is used to 

525 perform geographical queries with the same semantics as in the 

526 FIWARE-NGSI v2 Specification. Full details can be found in the 

527 Geographical Queries section of the specification: 

528 https://fiware.github.io/specifications/ngsiv2/stable/. 

529 geometry (String): 

530 Required if georel is specified. point, line, polygon, box 

531 coords (String): 

532 Optional but required if georel is specified. This parameter 

533 defines the reference shape (geometry) in terms of WGS 84 

534 coordinates and has the same semantics as in the 

535 FIWARE-NGSI v2 Specification, except we only accept coordinates 

536 in decimal degrees---e.g. 40.714,-74.006 is okay, but not 

537 40 42' 51'',74 0' 21''. Full details can be found in the 

538 Geographical Queries section of the specification: 

539 https://fiware.github.io/specifications/ngsiv2/stable/. 

540 options (String): Key value pair options. 

541 

542 Returns: 

543 TimeSeries 

544 """ 

545 url = urljoin(self.base_url, f'v2/entities/{entity_id}') 

546 res_q = self.__query_builder(url=url, 

547 attrs=attrs, 

548 options=options, 

549 entity_type=entity_type, 

550 aggr_method=aggr_method, 

551 aggr_period=aggr_period, 

552 from_date=from_date, 

553 to_date=to_date, 

554 last_n=last_n, 

555 limit=limit, 

556 offset=offset, 

557 georel=georel, 

558 geometry=geometry, 

559 coords=coords) 

560 # merge response chunks 

561 res = TimeSeries.model_validate(res_q.popleft()) 

562 for item in res_q: 

563 res.extend(TimeSeries.model_validate(item)) 

564 

565 return res 

566 

567 # /entities/{entityId}/value 

568 def get_entity_values_by_id(self, 

569 entity_id: str, 

570 *, 

571 attrs: str = None, 

572 entity_type: str = None, 

573 aggr_method: Union[str, AggrMethod] = None, 

574 aggr_period: Union[str, AggrPeriod] = None, 

575 from_date: str = None, 

576 to_date: str = None, 

577 last_n: int = None, 

578 limit: int = 10000, 

579 offset: int = None, 

580 georel: str = None, 

581 geometry: str = None, 

582 coords: str = None, 

583 options: str = None 

584 ) -> TimeSeries: 

585 """ 

586 History of N attributes (values only) of a given entity instance 

587 For example, query the average pressure, temperature and humidity ( 

588 values only, no metadata) of this 

589 month in the weather station WS1. 

590 

591 Args: 

592 entity_id (String): Entity id is required. 

593 attrs (String): Comma-separated list of attribute names 

594 entity_type (String): Comma-separated list of entity types whose 

595 data are to be included in the response. 

596 aggr_method (String): The function to apply to the raw data 

597 filtered. count, sum, avg, min, max 

598 aggr_period (String): year, month, day, hour, minute, second 

599 from_date (String): Starting date and time inclusive. 

600 to_date (String): Final date and time inclusive. 

601 last_n (int): Request only the last N values. 

602 limit (int): Maximum number of results to be retrieved. 

603 Default value : 10000 

604 offset (int): Offset for the results. 

605 georel (String): Geographical pattern 

606 geometry (String): Required if georel is specified. point, line, 

607 polygon, box 

608 coords (String): Required if georel is specified. 

609 e.g. 40.714,-74.006 

610 options (String): Key value pair options. 

611 

612 Returns: 

613 Response Model 

614 """ 

615 url = urljoin(self.base_url, f'v2/entities/{entity_id}/value') 

616 res_q = self.__query_builder(url=url, 

617 attrs=attrs, 

618 options=options, 

619 entity_type=entity_type, 

620 aggr_method=aggr_method, 

621 aggr_period=aggr_period, 

622 from_date=from_date, 

623 to_date=to_date, 

624 last_n=last_n, 

625 limit=limit, 

626 offset=offset, 

627 georel=georel, 

628 geometry=geometry, 

629 coords=coords) 

630 

631 # merge response chunks 

632 res = TimeSeries(entityId=entity_id, **res_q.popleft()) 

633 for item in res_q: 

634 res.extend(TimeSeries(entityId=entity_id, **item)) 

635 

636 return res 

637 

638 # /entities/{entityId}/attrs/{attrName} 

639 def get_entity_attr_by_id(self, 

640 entity_id: str, 

641 attr_name: str, 

642 *, 

643 entity_type: str = None, 

644 aggr_method: Union[str, AggrMethod] = None, 

645 aggr_period: Union[str, AggrPeriod] = None, 

646 from_date: str = None, 

647 to_date: str = None, 

648 last_n: int = None, 

649 limit: int = 10000, 

650 offset: int = None, 

651 georel: str = None, 

652 geometry: str = None, 

653 coords: str = None, 

654 options: str = None 

655 ) -> TimeSeries: 

656 """ 

657 History of an attribute of a given entity instance 

658 For example, query max water level of the central tank throughout the 

659 last year. Queries can get more 

660 sophisticated with the use of filters and query attributes. 

661 

662 Args: 

663 entity_id (String): Entity id is required. 

664 attr_name (String): The attribute name is required. 

665 entity_type (String): Comma-separated list of entity types whose 

666 data are to be included in the response. 

667 aggr_method (String): The function to apply to the raw data 

668 filtered. count, sum, avg, min, max 

669 aggr_period (String): year, month, day, hour, minute, second 

670 from_date (String): Starting date and time inclusive. 

671 to_date (String): Final date and time inclusive. 

672 last_n (int): Request only the last N values. 

673 limit (int): Maximum number of results to be retrieved. 

674 Default value : 10000 

675 offset (int): Offset for the results. 

676 georel (String): Geographical pattern 

677 geometry (String): Required if georel is specified. point, line, 

678 polygon, box 

679 coords (String): Required if georel is specified. 

680 e.g. 40.714,-74.006 

681 options (String): Key value pair options. 

682 

683 Returns: 

684 Response Model 

685 """ 

686 url = urljoin(self.base_url, f'v2/entities/{entity_id}/attrs' 

687 f'/{attr_name}') 

688 req_q = self.__query_builder(url=url, 

689 entity_id=entity_id, 

690 options=options, 

691 entity_type=entity_type, 

692 aggr_method=aggr_method, 

693 aggr_period=aggr_period, 

694 from_date=from_date, 

695 to_date=to_date, 

696 last_n=last_n, 

697 limit=limit, 

698 offset=offset, 

699 georel=georel, 

700 geometry=geometry, 

701 coords=coords) 

702 

703 # merge response chunks 

704 first = req_q.popleft() 

705 res = TimeSeries(entityId=entity_id, 

706 index=first.get('index'), 

707 attributes=[AttributeValues(**first)]) 

708 for item in req_q: 

709 res.extend(TimeSeries(entityId=entity_id, 

710 index=item.get('index'), 

711 attributes=[AttributeValues(**item)])) 

712 

713 return res 

714 

715 # /entities/{entityId}/attrs/{attrName}/value 

716 def get_entity_attr_values_by_id(self, 

717 entity_id: str, 

718 attr_name: str, 

719 *, 

720 entity_type: str = None, 

721 aggr_method: Union[str, AggrMethod] = None, 

722 aggr_period: Union[str, AggrPeriod] = None, 

723 from_date: str = None, 

724 to_date: str = None, 

725 last_n: int = None, 

726 limit: int = 10000, 

727 offset: int = None, 

728 georel: str = None, 

729 geometry: str = None, 

730 coords: str = None, 

731 options: str = None 

732 ) -> TimeSeries: 

733 """ 

734 History of an attribute (values only) of a given entity instance 

735 Similar to the previous, but focusing on the values regardless of the 

736 metadata. 

737 

738 Args: 

739 entity_id (String): Entity id is required. 

740 attr_name (String): The attribute name is required. 

741 entity_type (String): Comma-separated list of entity types whose 

742 data are to be included in the response. 

743 aggr_method (String): The function to apply to the raw data 

744 filtered. count, sum, avg, min, max 

745 aggr_period (String): year, month, day, hour, minute, second 

746 from_date (String): Starting date and time inclusive. 

747 to_date (String): Final date and time inclusive. 

748 last_n (int): Request only the last N values. 

749 limit (int): Maximum number of results to be retrieved. 

750 Default value : 10000 

751 offset (int): Offset for the results. 

752 georel (String): Geographical pattern 

753 geometry (String): Required if georel is specified. point, line, 

754 polygon, box 

755 coords (String): Required if georel is specified. 

756 e.g. 40.714,-74.006 

757 options (String): Key value pair options. 

758 

759 Returns: 

760 Response Model 

761 """ 

762 url = urljoin(self.base_url, f'v2/entities/{entity_id}/attrs' 

763 f'/{attr_name}/value') 

764 res_q = self.__query_builder(url=url, 

765 options=options, 

766 entity_type=entity_type, 

767 aggr_method=aggr_method, 

768 aggr_period=aggr_period, 

769 from_date=from_date, 

770 to_date=to_date, 

771 last_n=last_n, 

772 limit=limit, 

773 offset=offset, 

774 georel=georel, 

775 geometry=geometry, 

776 coords=coords) 

777 # merge response chunks 

778 first = res_q.popleft() 

779 res = TimeSeries( 

780 entityId=entity_id, 

781 index=first.get('index'), 

782 attributes=[AttributeValues(attrName=attr_name, 

783 values=first.get('values'))]) 

784 for item in res_q: 

785 res.extend( 

786 TimeSeries( 

787 entityId=entity_id, 

788 index=item.get('index'), 

789 attributes=[AttributeValues(attrName=attr_name, 

790 values=item.get('values'))])) 

791 

792 return res 

793 

794 # /types/{entityType} 

795 def get_entity_by_type(self, 

796 entity_type: str, 

797 *, 

798 attrs: str = None, 

799 entity_id: str = None, 

800 id_pattern: str = None, 

801 aggr_method: Union[str, AggrMethod] = None, 

802 aggr_period: Union[str, AggrPeriod] = None, 

803 from_date: str = None, 

804 to_date: str = None, 

805 last_n: int = None, 

806 limit: int = 10000, 

807 offset: int = None, 

808 georel: str = None, 

809 geometry: str = None, 

810 coords: str = None, 

811 options: str = None, 

812 aggr_scope: Union[str, AggrScope] = None 

813 ) -> List[TimeSeries]: 

814 """ 

815 History of N attributes of N entities of the same type. 

816 For example, query the average pressure, temperature and humidity of 

817 this month in all the weather stations. 

818 """ 

819 url = urljoin(self.base_url, f'v2/types/{entity_type}') 

820 res_q = self.__query_builder(url=url, 

821 entity_id=entity_id, 

822 id_pattern=id_pattern, 

823 attrs=attrs, 

824 options=options, 

825 aggr_method=aggr_method, 

826 aggr_period=aggr_period, 

827 from_date=from_date, 

828 to_date=to_date, 

829 last_n=last_n, 

830 limit=limit, 

831 offset=offset, 

832 georel=georel, 

833 geometry=geometry, 

834 coords=coords, 

835 aggr_scope=aggr_scope) 

836 

837 # merge chunks of response 

838 res = [TimeSeries(entityType=entity_type, **item) 

839 for item in res_q.popleft().get('entities')] 

840 

841 for chunk in res_q: 

842 chunk = [TimeSeries(entityType=entity_type, **item) 

843 for item in chunk.get('entities')] 

844 for new, old in zip(chunk, res): 

845 old.extend(new) 

846 

847 return res 

848 

849 # /types/{entityType}/value 

850 def get_entity_values_by_type(self, 

851 entity_type: str, 

852 *, 

853 attrs: str = None, 

854 entity_id: str = None, 

855 id_pattern: str = None, 

856 aggr_method: Union[str, AggrMethod] = None, 

857 aggr_period: Union[str, AggrPeriod] = None, 

858 from_date: str = None, 

859 to_date: str = None, 

860 last_n: int = None, 

861 limit: int = 10000, 

862 offset: int = None, 

863 georel: str = None, 

864 geometry: str = None, 

865 coords: str = None, 

866 options: str = None, 

867 aggr_scope: Union[str, AggrScope] = None 

868 ) -> List[TimeSeries]: 

869 """ 

870 History of N attributes (values only) of N entities of the same type. 

871 For example, query the average pressure, temperature and humidity ( 

872 values only, no metadata) of this month in 

873 all the weather stations. 

874 """ 

875 url = urljoin(self.base_url, f'v2/types/{entity_type}/value') 

876 res_q = self.__query_builder(url=url, 

877 entity_id=entity_id, 

878 id_pattern=id_pattern, 

879 attrs=attrs, 

880 options=options, 

881 entity_type=entity_type, 

882 aggr_method=aggr_method, 

883 aggr_period=aggr_period, 

884 from_date=from_date, 

885 to_date=to_date, 

886 last_n=last_n, 

887 limit=limit, 

888 offset=offset, 

889 georel=georel, 

890 geometry=geometry, 

891 coords=coords, 

892 aggr_scope=aggr_scope) 

893 # merge chunks of response 

894 res = [TimeSeries(entityType=entity_type, **item) 

895 for item in res_q.popleft().get('values')] 

896 

897 for chunk in res_q: 

898 chunk = [TimeSeries(entityType=entity_type, **item) 

899 for item in chunk.get('values')] 

900 for new, old in zip(chunk, res): 

901 old.extend(new) 

902 

903 return res 

904 

905 # /types/{entityType}/attrs/{attrName} 

906 def get_entity_attr_by_type(self, 

907 entity_type: str, 

908 attr_name: str, 

909 *, 

910 entity_id: str = None, 

911 id_pattern: str = None, 

912 aggr_method: Union[str, AggrMethod] = None, 

913 aggr_period: Union[str, AggrPeriod] = None, 

914 from_date: str = None, 

915 to_date: str = None, 

916 last_n: int = None, 

917 limit: int = 10000, 

918 offset: int = None, 

919 georel: str = None, 

920 geometry: str = None, 

921 coords: str = None, 

922 options: str = None, 

923 aggr_scope: Union[str, AggrScope] = None 

924 ) -> List[TimeSeries]: 

925 """ 

926 History of an attribute of N entities of the same type. 

927 For example, query the pressure measurements of this month in all the 

928 weather stations. Note in the response, 

929 the index and values arrays are parallel. Also, when using 

930 aggrMethod, the aggregation is done by-entity 

931 instance. In this case, the index array is just the fromDate and 

932 toDate values user specified in the request 

933 (if any). 

934 

935 Args: 

936 entity_type (String): Entity type is required. 

937 attr_name (String): The attribute name is required. 

938 entity_id (String): Comma-separated list of entity ids whose data 

939 are to be included in the response. 

940 aggr_method (String): The function to apply to the raw data 

941 filtered. count, sum, avg, min, max 

942 aggr_period (String): year, month, day, hour, minute, second 

943 aggr_scope (str): Optional. (This parameter is not yet supported). 

944 When the query results cover historical data for multiple 

945 entities instances, you can define the aggregation method to be 

946 applied for each entity instance [entity] or across 

947 them [global] 

948 from_date (String): Starting date and time inclusive. 

949 to_date (String): Final date and time inclusive. 

950 last_n (int): Request only the last N values. 

951 limit (int): Maximum number of results to be retrieved. 

952 Default value : 10000 

953 offset (int): Offset for the results. 

954 georel (String): Geographical pattern 

955 geometry (String): Required if georel is specified. point, line, 

956 polygon, box 

957 coords (String): Required if georel is specified. 

958 e.g. 40.714,-74.006 

959 options (String): Key value pair options. 

960 

961 Returns: 

962 Response Model 

963 """ 

964 url = urljoin(self.base_url, f'v2/types/{entity_type}/attrs' 

965 f'/{attr_name}') 

966 res_q = self.__query_builder(url=url, 

967 entity_id=entity_id, 

968 id_pattern=id_pattern, 

969 options=options, 

970 entity_type=entity_type, 

971 aggr_method=aggr_method, 

972 aggr_period=aggr_period, 

973 from_date=from_date, 

974 to_date=to_date, 

975 last_n=last_n, 

976 limit=limit, 

977 offset=offset, 

978 georel=georel, 

979 geometry=geometry, 

980 coords=coords, 

981 aggr_scope=aggr_scope) 

982 

983 # merge chunks of response 

984 first = res_q.popleft() 

985 res = [TimeSeries(index=item.get('index'), 

986 entityType=entity_type, 

987 entityId=item.get('entityId'), 

988 attributes=[ 

989 AttributeValues( 

990 attrName=first.get('attrName'), 

991 values=item.get('values'))]) 

992 for item in first.get('entities')] 

993 

994 for chunk in res_q: 

995 chunk = [TimeSeries(index=item.get('index'), 

996 entityType=entity_type, 

997 entityId=item.get('entityId'), 

998 attributes=[ 

999 AttributeValues( 

1000 attrName=chunk.get('attrName'), 

1001 values=item.get('values'))]) 

1002 for item in chunk.get('entities')] 

1003 for new, old in zip(chunk, res): 

1004 old.extend(new) 

1005 

1006 return res 

1007 

1008 # /types/{entityType}/attrs/{attrName}/value 

1009 def get_entity_attr_values_by_type(self, 

1010 entity_type: str, 

1011 attr_name: str, 

1012 *, 

1013 entity_id: str = None, 

1014 id_pattern: str = None, 

1015 aggr_method: Union[ 

1016 str, AggrMethod] = None, 

1017 aggr_period: Union[ 

1018 str, AggrPeriod] = None, 

1019 from_date: str = None, 

1020 to_date: str = None, 

1021 last_n: int = None, 

1022 limit: int = 10000, 

1023 offset: int = None, 

1024 georel: str = None, 

1025 geometry: str = None, 

1026 coords: str = None, 

1027 options: str = None, 

1028 aggr_scope: Union[str, AggrScope] = None 

1029 ) -> List[TimeSeries]: 

1030 """ 

1031 History of an attribute (values only) of N entities of the same type. 

1032 For example, query the average pressure (values only, no metadata) of 

1033 this month in all the weather stations. 

1034 

1035 Args: 

1036 aggr_scope: 

1037 entity_type (String): Entity type is required. 

1038 attr_name (String): The attribute name is required. 

1039 entity_id (String): Comma-separated list of entity ids whose data 

1040 are to be included in the response. 

1041 aggr_method (String): The function to apply to the raw data 

1042 filtered. count, sum, avg, min, max 

1043 aggr_period (String): year, month, day, hour, minute, second 

1044 aggr_scope (String): 

1045 from_date (String): Starting date and time inclusive. 

1046 to_date (String): Final date and time inclusive. 

1047 last_n (int): Request only the last N values. 

1048 limit (int): Maximum number of results to be retrieved. 

1049 Default value : 10000 

1050 offset (int): Offset for the results. 

1051 georel (String): Geographical pattern 

1052 geometry (String): Required if georel is specified. point, line, 

1053 polygon, box 

1054 coords (String): Required if georel is specified. 

1055 e.g. 40.714,-74.006 

1056 options (String): Key value pair options. 

1057 

1058 Returns: 

1059 Response Model 

1060 """ 

1061 url = urljoin(self.base_url, f'v2/types/{entity_type}/attrs/' 

1062 f'{attr_name}/value') 

1063 res_q = self.__query_builder(url=url, 

1064 entity_id=entity_id, 

1065 id_pattern=id_pattern, 

1066 options=options, 

1067 entity_type=entity_type, 

1068 aggr_method=aggr_method, 

1069 aggr_period=aggr_period, 

1070 from_date=from_date, 

1071 to_date=to_date, 

1072 last_n=last_n, 

1073 limit=limit, 

1074 offset=offset, 

1075 georel=georel, 

1076 geometry=geometry, 

1077 coords=coords, 

1078 aggr_scope=aggr_scope) 

1079 

1080 # merge chunks of response 

1081 res = [TimeSeries(index=item.get('index'), 

1082 entityType=entity_type, 

1083 entityId=item.get('entityId'), 

1084 attributes=[ 

1085 AttributeValues(attrName=attr_name, 

1086 values=item.get('values'))]) 

1087 for item in res_q.popleft().get('values')] 

1088 

1089 for chunk in res_q: 

1090 chunk = [TimeSeries(index=item.get('index'), 

1091 entityType=entity_type, 

1092 entityId=item.get('entityId'), 

1093 attributes=[ 

1094 AttributeValues(attrName=attr_name, 

1095 values=item.get('values'))]) 

1096 for item in chunk.get('values')] 

1097 

1098 for new, old in zip(chunk, res): 

1099 old.extend(new) 

1100 return res 

1101 

1102 # v2/attrs 

1103 def get_entity_by_attrs(self, *, 

1104 entity_type: str = None, 

1105 from_date: str = None, 

1106 to_date: str = None, 

1107 limit: int = 10000, 

1108 offset: int = None 

1109 ) -> List[TimeSeries]: 

1110 """ 

1111 Get list of timeseries data grouped by each existing attribute name. 

1112 The timeseries data include all entities corresponding to each 

1113 attribute name as well as the index and values of this attribute in 

1114 this entity. 

1115 

1116 Args: 

1117 entity_type (str): Comma-separated list of entity types whose data 

1118 are to be included in the response. Use only one (no comma) 

1119 when required. If used to resolve ambiguity for the given 

1120 entityId, make sure the given entityId exists for this 

1121 entityType. 

1122 from_date (str): The starting date and time (inclusive) from which 

1123 the context information is queried. Must be in ISO8601 format 

1124 (e.g., 2018-01-05T15:44:34) 

1125 to_date (str): The final date and time (inclusive) from which the 

1126 context information is queried. Must be in ISO8601 format 

1127 (e.g., 2018-01-05T15:44:34). 

1128 limit (int): Maximum number of results to be retrieved. 

1129 Default value : 10000 

1130 offset (int): Offset for the results. 

1131  

1132 Returns: 

1133 List of TimeSeriesEntities 

1134 """ 

1135 url = urljoin(self.base_url, 'v2/attrs') 

1136 res_q = self.__query_builder(url=url, 

1137 entity_type=entity_type, 

1138 from_date=from_date, 

1139 to_date=to_date, 

1140 limit=limit, 

1141 offset=offset) 

1142 first = res_q.popleft() 

1143 

1144 res = chain.from_iterable(map(lambda x: self.transform_attr_response_model(x), 

1145 first.get("attrs"))) 

1146 for chunk in res_q: 

1147 chunk = chain.from_iterable(map(lambda x: self.transform_attr_response_model(x), 

1148 chunk.get("attrs"))) 

1149 

1150 for new, old in zip(chunk, res): 

1151 old.extend(new) 

1152 

1153 return list(res) 

1154 

1155 # v2/attrs/{attr_name} 

1156 def get_entity_by_attr_name(self, *, 

1157 attr_name: str, 

1158 entity_type: str = None, 

1159 from_date: str = None, 

1160 to_date: str = None, 

1161 limit: int = 10000, 

1162 offset: int = None 

1163 ) -> List[TimeSeries]: 

1164 """ 

1165 Get list of all entities containing this attribute name, as well as 

1166 getting the index and values of this attribute in every corresponding 

1167 entity. 

1168 

1169 Args: 

1170 attr_name (str): The attribute name in interest. 

1171 entity_type (str): Comma-separated list of entity types whose data 

1172 are to be included in the response. Use only one (no comma) 

1173 when required. If used to resolve ambiguity for the given 

1174 entityId, make sure the given entityId exists for this 

1175 entityType. 

1176 from_date (str): The starting date and time (inclusive) from which 

1177 the context information is queried. Must be in ISO8601 format 

1178 (e.g., 2018-01-05T15:44:34) 

1179 to_date (str): The final date and time (inclusive) from which the 

1180 context information is queried. Must be in ISO8601 format 

1181 (e.g., 2018-01-05T15:44:34). 

1182 limit (int): Maximum number of results to be retrieved. 

1183 Default value : 10000 

1184 offset (int): Offset for the results. 

1185 

1186 Returns: 

1187 List of TimeSeries 

1188 """ 

1189 url = urljoin(self.base_url, f'/v2/attrs/{attr_name}') 

1190 res_q = self.__query_builder(url=url, 

1191 entity_type=entity_type, 

1192 from_date=from_date, 

1193 to_date=to_date, 

1194 limit=limit, 

1195 offset=offset) 

1196 

1197 first = res_q.popleft() 

1198 res = self.transform_attr_response_model(first) 

1199 

1200 for chunk in res_q: 

1201 chunk = self.transform_attr_response_model(chunk) 

1202 for new, old in zip(chunk, res): 

1203 old.extend(new) 

1204 return list(res) 

1205 

1206 def transform_attr_response_model(self, attr_response): 

1207 res = [] 

1208 attr_name = attr_response.get("attrName") 

1209 for entity_group in attr_response.get("types"): 

1210 timeseries = map(lambda entity: 

1211 TimeSeries(entityId=entity.get("entityId"), 

1212 entityType=entity_group.get("entityType"), 

1213 index=entity.get("index"), 

1214 attributes=[ 

1215 AttributeValues(attrName=attr_name, 

1216 values=entity.get("values"))] 

1217 ), 

1218 entity_group.get("entities")) 

1219 res.append(timeseries) 

1220 return chain.from_iterable(res)