Coverage for filip/clients/ngsi_v2/quantumleap.py: 78%
231 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-20 16:54 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-20 16:54 +0000
1"""
2TimeSeries Module for QuantumLeap API Client
3"""
4import logging
5import time
6from math import inf
7from collections import deque
8from itertools import count,chain
9from typing import Dict, List, Union, Deque, Optional
10from urllib.parse import urljoin
11import requests
12from pydantic import AnyHttpUrl
13from pydantic.type_adapter import TypeAdapter
14from filip import settings
15from filip.clients.base_http_client import BaseHttpClient
16from filip.models.base import FiwareHeader
17from filip.models.ngsi_v2.subscriptions import Message
18from filip.models.ngsi_v2.timeseries import \
19 AggrPeriod, \
20 AggrMethod, \
21 AggrScope, \
22 AttributeValues, \
23 TimeSeries, \
24 TimeSeriesHeader
27logger = logging.getLogger(__name__)
30class QuantumLeapClient(BaseHttpClient):
31 """
32 Implements functions to use the FIWARE's QuantumLeap, which subscribes to an
33 Orion Context Broker and stores the subscription data in a time series
34 database (CrateDB). Further Information:
35 https://smartsdk.github.io/ngsi-timeseries-api/#quantumleap
36 https://app.swaggerhub.com/apis/heikkilv/quantumleap-api/
38 Args:
39 url: url of the quantumleap service
40 session (Optional):
41 fiware_header:
42 **kwargs:
43 """
45 def __init__(self,
46 url: str = None,
47 *,
48 session: requests.Session = None,
49 fiware_header: FiwareHeader = None,
50 **kwargs):
51 # set service url
52 url = url or settings.QL_URL
53 super().__init__(url=url,
54 session=session,
55 fiware_header=fiware_header,
56 **kwargs)
58 # META API ENDPOINTS
59 def get_version(self) -> Dict:
60 """
61 Gets version of QuantumLeap-Service.
63 Returns:
64 Dictionary with response
65 """
66 url = urljoin(self.base_url, 'version')
67 try:
68 res = self.get(url=url, headers=self.headers)
69 if res.ok:
70 return res.json()
71 res.raise_for_status()
72 except requests.exceptions.RequestException as err:
73 self.logger.error(err)
74 raise
76 def get_health(self) -> Dict:
77 """
78 This endpoint is intended for administrators of QuantumLeap. Using the
79 information returned by this endpoint they can diagnose problems in the
80 service or its dependencies. This information is also useful for cloud
81 tools such as orchestrators and load balancers with rules based on
82 health-checks. Due to the lack of a standardized response format, we
83 base the implementation on the draft of
84 https://inadarei.github.io/rfc-healthcheck/
86 Returns:
87 Dictionary with response
88 """
89 url = urljoin(self.base_url, 'health')
90 try:
91 res = self.get(url=url, headers=self.headers)
92 if res.ok:
93 return res.json()
94 res.raise_for_status()
95 except requests.exceptions.RequestException as err:
96 self.logger.error(err)
97 raise
99 def post_config(self):
100 """
101 (To Be Implemented) Customize your persistence configuration to
102 better suit your needs.
103 """
104 raise NotImplementedError("Endpoint to be implemented..")
106 # INPUT API ENDPOINTS
107 def post_notification(self, notification: Message):
108 """
109 Notify QuantumLeap the arrival of a new NGSI notification.
111 Args:
112 notification: Notification Message Object
113 """
114 url = urljoin(self.base_url, 'v2/notify')
115 headers = self.headers.copy()
116 data = []
117 for entity in notification.data:
118 data.append(entity.model_dump(exclude_none=True))
119 data_set = {
120 "data": data,
121 "subscriptionId": notification.subscriptionId
122 }
124 try:
125 res = self.post(
126 url=url,
127 headers=headers,
128 json=data_set)
129 if res.ok:
130 self.logger.debug(res.text)
131 else:
132 res.raise_for_status()
133 except requests.exceptions.RequestException as err:
134 msg = f"Could not post notification for subscription id " \
135 f"{notification.subscriptionId}"
136 self.log_error(err=err, msg=msg)
137 raise
139 def post_subscription(self,
140 cb_url: Union[AnyHttpUrl, str],
141 ql_url: Union[AnyHttpUrl, str],
142 entity_type: str = None,
143 entity_id: str = None,
144 id_pattern: str = None,
145 attributes: str = None,
146 observed_attributes: str = None,
147 notified_attributes: str = None,
148 throttling: int = None,
149 time_index_attribute: str = None):
150 """
151 Subscribe QL to process Orion notifications of certain type.
152 This endpoint simplifies the creation of the subscription in orion
153 that will generate the notifications to be consumed by QuantumLeap in
154 order to save historical records. If you want an advanced specification
155 of the notifications, you can always create the subscription in orion
156 at your will. This endpoint just aims to simplify the common use case.
158 Args:
159 cb_url:
160 url of the context broker
161 ql_url:
162 The url where Orion can reach QuantumLeap. Do not include
163 specific paths.
164 entity_type (String):
165 The type of entities for which to create a
166 subscription, so as to persist historical data of entities of
167 this type.
168 entity_id (String):
169 Id of the entity to track. If specified, it
170 takes precedence over the idPattern parameter.
171 id_pattern (String): The pattern covering the entity ids for which
172 to subscribe. If not specified, QL will track all entities of
173 the specified type.
174 attributes (String): Comma-separated list of attribute names to
175 track.
176 observed_attributes (String): Comma-separated list of attribute
177 names to track.
178 notified_attributes (String): Comma-separated list of attribute
179 names to be used to restrict the data of which QL will keep a
180 history.
181 throttling (int): Minimal period of time in seconds which must
182 elapse between two consecutive notifications.
183 time_index_attribute (String): The name of a custom attribute to be
184 used as a
185 time index.
186 """
187 raise DeprecationWarning("Subscription endpoint of Quantumleap API is "
188 "deprecated, use the ORION subscription endpoint "
189 "instead")
191 def delete_entity(self, entity_id: str,
192 entity_type: Optional[str] = None) -> str:
193 """
194 Given an entity (with type and id), delete all its historical records.
196 Args:
197 entity_id (String): Entity id is required.
198 entity_type (Optional[String]): Entity type if entity_id alone
199 can not uniquely define the entity.
201 Raises:
202 RequestException, if entity was not found
203 Exception, if deleting was not successful
205 Returns:
206 The entity_id of entity that is deleted.
207 """
208 url = urljoin(self.base_url, f'v2/entities/{entity_id}')
209 headers = self.headers.copy()
210 if entity_type is not None:
211 params = {'type': entity_type}
212 else:
213 params = {}
215 # The deletion does not always resolves in a success even if an ok is
216 # returned.
217 # Try to delete multiple times with incrementing waits.
218 # If the entity is no longer found the methode returns with a success
219 # If the deletion attempt fails after the 10th try, raise an
220 # Exception: it could not be deleted
221 counter = 0
222 while counter < 10:
223 self.delete(url=url, headers=headers, params=params)
224 try:
225 self.get_entity_by_id(entity_id=entity_id,
226 entity_type=entity_type)
227 except requests.exceptions.RequestException as err:
228 self.logger.info("Entity id '%s' successfully deleted!",
229 entity_id)
230 return entity_id
231 time.sleep(counter * 5)
232 counter += 1
234 msg = f"Could not delete QL entity of id {entity_id}"
235 logger.error(msg=msg)
236 raise Exception(msg)
238 def delete_entity_type(self, entity_type: str) -> str:
239 """
240 Given an entity type, delete all the historical records of all
241 entities of such type.
242 Args:
243 entity_type (String): Type of entities data to be deleted.
244 Returns:
245 Entity type of the entities deleted.
246 """
247 url = urljoin(self.base_url, f'v2/types/{entity_type}')
248 headers = self.headers.copy()
249 try:
250 res = self.delete(url=url, headers=headers)
251 if res.ok:
252 self.logger.info("Entities of type '%s' successfully deleted!",
253 entity_type)
254 return entity_type
255 res.raise_for_status()
256 except requests.exceptions.RequestException as err:
257 msg = f"Could not delete entities of type {entity_type}"
258 self.log_error(err=err, msg=msg)
259 raise
261 # QUERY API ENDPOINTS
262 def __query_builder(self,
263 url,
264 *,
265 entity_id: str = None,
266 id_pattern: str = None,
267 options: str = None,
268 entity_type: str = None,
269 aggr_method: Union[str, AggrMethod] = None,
270 aggr_period: Union[str, AggrPeriod] = None,
271 from_date: str = None,
272 to_date: str = None,
273 last_n: int = None,
274 limit: int = 10000,
275 offset: int = 0,
276 georel: str = None,
277 geometry: str = None,
278 coords: str = None,
279 attrs: str = None,
280 aggr_scope: Union[str, AggrScope] = None
281 ) -> Deque[Dict]:
282 """
283 Private Function to call respective API endpoints, chops large
284 requests into multiple single requests and merges the
285 responses
287 Args:
288 url:
289 entity_id:
290 options:
291 entity_type:
292 aggr_method:
293 aggr_period:
294 from_date:
295 to_date:
296 last_n:
297 limit:
298 Maximum number of results to retrieve in a single response.
299 offset:
300 Offset to apply to the response results. For example, if the
301 query was to return 10 results and you use an offset of 1, the
302 response will return the last 9 values. Make sure you don't
303 give more offset than the number of results.
304 georel:
305 geometry:
306 coords:
307 attrs:
308 aggr_scope:
309 id_pattern (str): The pattern covering the entity ids for which
310 to subscribe. The pattern follow regular expressions (POSIX
311 Extendede) e.g. ".*", "Room.*". Detail information:
312 https://en.wikibooks.org/wiki/Regular_Expressions/POSIX-Extended_Regular_Expressions
314 Returns:
315 Dict
316 """
317 assert (id_pattern is None or entity_id is None), "Cannot have both id and idPattern as parameter."
318 params = {}
319 headers = self.headers.copy()
320 max_records_per_request = 10000
321 # create a double ending queue
322 res_q: Deque[Dict] = deque([])
324 if options:
325 params.update({'options': options})
326 if entity_type:
327 params.update({'type': entity_type})
328 if aggr_method:
329 aggr_method = AggrMethod(aggr_method)
330 params.update({'aggrMethod': aggr_method.value})
331 if aggr_period:
332 aggr_period = AggrPeriod(aggr_period)
333 params.update({'aggrPeriod': aggr_period.value})
334 if from_date:
335 params.update({'fromDate': from_date})
336 if to_date:
337 params.update({'toDate': to_date})
338 # These values are required for the integrated pagination mechanism
339 # maximum items per request
340 if limit is None:
341 limit = inf
342 if offset is None:
343 offset = 0
344 if georel:
345 params.update({'georel': georel})
346 if coords:
347 params.update({'coords': coords})
348 if geometry:
349 params.update({'geometry': geometry})
350 if attrs:
351 params.update({'attrs': attrs})
352 if aggr_scope:
353 aggr_scope = AggrScope(aggr_scope)
354 params.update({'aggr_scope': aggr_scope.value})
355 if entity_id:
356 params.update({'id': entity_id})
357 if id_pattern:
358 params.update({'idPattern': id_pattern})
360 # This loop will chop large requests into smaller junks.
361 # The individual functions will then merge the final response models
362 for i in count(0, max_records_per_request):
363 try:
364 params['offset'] = offset + i
366 params['limit'] = min(limit - i, max_records_per_request)
367 if params['limit'] <= 0:
368 break
370 if last_n:
371 params['lastN'] = min(last_n - i, max_records_per_request)
372 if params['lastN'] <= 0:
373 break
375 res = self.get(url=url, params=params, headers=headers)
377 if res.ok:
378 self.logger.debug('Received: %s', res.json())
380 # revert append direction when using last_n
381 if last_n:
382 res_q.appendleft(res.json())
383 else:
384 res_q.append(res.json())
385 res.raise_for_status()
387 except requests.exceptions.RequestException as err:
388 if err.response.status_code == 404 and \
389 err.response.json().get('error') == 'Not Found' and \
390 len(res_q) > 0:
391 break
392 else:
393 msg = "Could not load entity data"
394 self.log_error(err=err, msg=msg)
395 raise
397 self.logger.info("Successfully retrieved entity data")
398 return res_q
400 # v2/entities
401 def get_entities(self, *,
402 entity_type: str = None,
403 id_pattern: str = None,
404 from_date: str = None,
405 to_date: str = None,
406 limit: int = 10000,
407 offset: int = None
408 ) -> List[TimeSeriesHeader]:
409 """
410 Get list of all available entities and their context information
411 about EntityType and last update date.
413 Args:
414 entity_type (str): Comma-separated list of entity types whose data
415 are to be included in the response. Use only one (no comma)
416 when required. If used to resolve ambiguity for the given
417 entityId, make sure the given entityId exists for this
418 entityType.
419 id_pattern (str): The pattern covering the entity ids for which
420 to subscribe. The pattern follow regular expressions (POSIX
421 Extendede) e.g. ".*", "Room.*". Detail information:
422 https://en.wikibooks.org/wiki/Regular_Expressions/POSIX-Extended_Regular_Expressions
423 from_date (str): The starting date and time (inclusive) from which
424 the context information is queried. Must be in ISO8601 format
425 (e.g., 2018-01-05T15:44:34)
426 to_date (str): The final date and time (inclusive) from which the
427 context information is queried. Must be in ISO8601 format
428 (e.g., 2018-01-05T15:44:34).
429 limit (int): Maximum number of results to be retrieved.
430 Default value : 10000
431 offset (int): Offset for the results.
433 Returns:
434 List of TimeSeriesHeader
435 """
436 url = urljoin(self.base_url, 'v2/entities')
437 res = self.__query_builder(url=url,
438 id_pattern=id_pattern,
439 entity_type=entity_type,
440 from_date=from_date,
441 to_date=to_date,
442 limit=limit,
443 offset=offset)
445 ta = TypeAdapter(List[TimeSeriesHeader])
446 return ta.validate_python(res[0])
448 # /entities/{entityId}
449 def get_entity_by_id(self,
450 entity_id: str,
451 *,
452 attrs: str = None,
453 entity_type: str = None,
454 aggr_method: Union[str, AggrMethod] = None,
455 aggr_period: Union[str, AggrPeriod] = None,
456 from_date: str = None,
457 to_date: str = None,
458 last_n: int = None,
459 limit: int = 10000,
460 offset: int = None,
461 georel: str = None,
462 geometry: str = None,
463 coords: str = None,
464 options: str = None
465 ) -> TimeSeries:
467 """
468 History of N attributes of a given entity instance
469 For example, query max water level of the central tank throughout the
470 last year. Queries can get more
471 sophisticated with the use of filters and query attributes.
473 Args:
474 entity_id (String): Entity id is required.
475 attrs (String):
476 Comma-separated list of attribute names whose data are to be
477 included in the response. The attributes are retrieved in the
478 order specified by this parameter. If not specified, all
479 attributes are included in the response in arbitrary order.
480 entity_type (String): Comma-separated list of entity types whose
481 data are to be included in the response.
482 aggr_method (String):
483 The function to apply to the raw data filtered by the query
484 parameters. If not given, the returned data are the same raw
485 inserted data.
487 Allowed values: count, sum, avg, min, max
488 aggr_period (String):
489 If not defined, the aggregation will apply to all the values
490 contained in the search result. If defined, the aggregation
491 function will instead be applied N times, once for each
492 period, and all those results will be considered for the
493 response. For example, a query asking for the average
494 temperature of an attribute will typically return 1 value.
495 However, with an aggregationPeriod of day, you get the daily
496 average of the temperature instead (more than one value
497 assuming you had measurements across many days within the
498 scope of your search result). aggrPeriod must be accompanied
499 by an aggrMethod, and the aggrMethod will be applied to all
500 the numeric attributes specified in attrs; the rest of the
501 non-numerical attrs will be ignored. By default, the response
502 is grouped by entity_id. See aggrScope to create aggregation
503 across entities:
505 Allowed values: year, month, day, hour, minute, second
507 from_date (String):
508 The starting date and time (inclusive) from which the context
509 information is queried. Must be in ISO8601 format (e.g.,
510 2018-01-05T15:44:34)
511 to_date (String):
512 The final date and time (inclusive) from which the context
513 information is queried. Must be in ISO8601 format (e.g.,
514 2018-01-05T15:44:34)
515 last_n (int):
516 Used to request only the last N values that satisfy the
517 request conditions.
518 limit (int): Maximum number of results to be retrieved.
519 Default value : 10000
520 offset (int):
521 Offset to apply to the response results.
522 georel (String):
523 It specifies a spatial relationship between matching entities
524 and a reference shape (geometry). This parameter is used to
525 perform geographical queries with the same semantics as in the
526 FIWARE-NGSI v2 Specification. Full details can be found in the
527 Geographical Queries section of the specification:
528 https://fiware.github.io/specifications/ngsiv2/stable/.
529 geometry (String):
530 Required if georel is specified. point, line, polygon, box
531 coords (String):
532 Optional but required if georel is specified. This parameter
533 defines the reference shape (geometry) in terms of WGS 84
534 coordinates and has the same semantics as in the
535 FIWARE-NGSI v2 Specification, except we only accept coordinates
536 in decimal degrees---e.g. 40.714,-74.006 is okay, but not
537 40 42' 51'',74 0' 21''. Full details can be found in the
538 Geographical Queries section of the specification:
539 https://fiware.github.io/specifications/ngsiv2/stable/.
540 options (String): Key value pair options.
542 Returns:
543 TimeSeries
544 """
545 url = urljoin(self.base_url, f'v2/entities/{entity_id}')
546 res_q = self.__query_builder(url=url,
547 attrs=attrs,
548 options=options,
549 entity_type=entity_type,
550 aggr_method=aggr_method,
551 aggr_period=aggr_period,
552 from_date=from_date,
553 to_date=to_date,
554 last_n=last_n,
555 limit=limit,
556 offset=offset,
557 georel=georel,
558 geometry=geometry,
559 coords=coords)
560 # merge response chunks
561 res = TimeSeries.model_validate(res_q.popleft())
562 for item in res_q:
563 res.extend(TimeSeries.model_validate(item))
565 return res
567 # /entities/{entityId}/value
568 def get_entity_values_by_id(self,
569 entity_id: str,
570 *,
571 attrs: str = None,
572 entity_type: str = None,
573 aggr_method: Union[str, AggrMethod] = None,
574 aggr_period: Union[str, AggrPeriod] = None,
575 from_date: str = None,
576 to_date: str = None,
577 last_n: int = None,
578 limit: int = 10000,
579 offset: int = None,
580 georel: str = None,
581 geometry: str = None,
582 coords: str = None,
583 options: str = None
584 ) -> TimeSeries:
585 """
586 History of N attributes (values only) of a given entity instance
587 For example, query the average pressure, temperature and humidity (
588 values only, no metadata) of this
589 month in the weather station WS1.
591 Args:
592 entity_id (String): Entity id is required.
593 attrs (String): Comma-separated list of attribute names
594 entity_type (String): Comma-separated list of entity types whose
595 data are to be included in the response.
596 aggr_method (String): The function to apply to the raw data
597 filtered. count, sum, avg, min, max
598 aggr_period (String): year, month, day, hour, minute, second
599 from_date (String): Starting date and time inclusive.
600 to_date (String): Final date and time inclusive.
601 last_n (int): Request only the last N values.
602 limit (int): Maximum number of results to be retrieved.
603 Default value : 10000
604 offset (int): Offset for the results.
605 georel (String): Geographical pattern
606 geometry (String): Required if georel is specified. point, line,
607 polygon, box
608 coords (String): Required if georel is specified.
609 e.g. 40.714,-74.006
610 options (String): Key value pair options.
612 Returns:
613 Response Model
614 """
615 url = urljoin(self.base_url, f'v2/entities/{entity_id}/value')
616 res_q = self.__query_builder(url=url,
617 attrs=attrs,
618 options=options,
619 entity_type=entity_type,
620 aggr_method=aggr_method,
621 aggr_period=aggr_period,
622 from_date=from_date,
623 to_date=to_date,
624 last_n=last_n,
625 limit=limit,
626 offset=offset,
627 georel=georel,
628 geometry=geometry,
629 coords=coords)
631 # merge response chunks
632 res = TimeSeries(entityId=entity_id, **res_q.popleft())
633 for item in res_q:
634 res.extend(TimeSeries(entityId=entity_id, **item))
636 return res
638 # /entities/{entityId}/attrs/{attrName}
639 def get_entity_attr_by_id(self,
640 entity_id: str,
641 attr_name: str,
642 *,
643 entity_type: str = None,
644 aggr_method: Union[str, AggrMethod] = None,
645 aggr_period: Union[str, AggrPeriod] = None,
646 from_date: str = None,
647 to_date: str = None,
648 last_n: int = None,
649 limit: int = 10000,
650 offset: int = None,
651 georel: str = None,
652 geometry: str = None,
653 coords: str = None,
654 options: str = None
655 ) -> TimeSeries:
656 """
657 History of an attribute of a given entity instance
658 For example, query max water level of the central tank throughout the
659 last year. Queries can get more
660 sophisticated with the use of filters and query attributes.
662 Args:
663 entity_id (String): Entity id is required.
664 attr_name (String): The attribute name is required.
665 entity_type (String): Comma-separated list of entity types whose
666 data are to be included in the response.
667 aggr_method (String): The function to apply to the raw data
668 filtered. count, sum, avg, min, max
669 aggr_period (String): year, month, day, hour, minute, second
670 from_date (String): Starting date and time inclusive.
671 to_date (String): Final date and time inclusive.
672 last_n (int): Request only the last N values.
673 limit (int): Maximum number of results to be retrieved.
674 Default value : 10000
675 offset (int): Offset for the results.
676 georel (String): Geographical pattern
677 geometry (String): Required if georel is specified. point, line,
678 polygon, box
679 coords (String): Required if georel is specified.
680 e.g. 40.714,-74.006
681 options (String): Key value pair options.
683 Returns:
684 Response Model
685 """
686 url = urljoin(self.base_url, f'v2/entities/{entity_id}/attrs'
687 f'/{attr_name}')
688 req_q = self.__query_builder(url=url,
689 entity_id=entity_id,
690 options=options,
691 entity_type=entity_type,
692 aggr_method=aggr_method,
693 aggr_period=aggr_period,
694 from_date=from_date,
695 to_date=to_date,
696 last_n=last_n,
697 limit=limit,
698 offset=offset,
699 georel=georel,
700 geometry=geometry,
701 coords=coords)
703 # merge response chunks
704 first = req_q.popleft()
705 res = TimeSeries(entityId=entity_id,
706 index=first.get('index'),
707 attributes=[AttributeValues(**first)])
708 for item in req_q:
709 res.extend(TimeSeries(entityId=entity_id,
710 index=item.get('index'),
711 attributes=[AttributeValues(**item)]))
713 return res
715 # /entities/{entityId}/attrs/{attrName}/value
716 def get_entity_attr_values_by_id(self,
717 entity_id: str,
718 attr_name: str,
719 *,
720 entity_type: str = None,
721 aggr_method: Union[str, AggrMethod] = None,
722 aggr_period: Union[str, AggrPeriod] = None,
723 from_date: str = None,
724 to_date: str = None,
725 last_n: int = None,
726 limit: int = 10000,
727 offset: int = None,
728 georel: str = None,
729 geometry: str = None,
730 coords: str = None,
731 options: str = None
732 ) -> TimeSeries:
733 """
734 History of an attribute (values only) of a given entity instance
735 Similar to the previous, but focusing on the values regardless of the
736 metadata.
738 Args:
739 entity_id (String): Entity id is required.
740 attr_name (String): The attribute name is required.
741 entity_type (String): Comma-separated list of entity types whose
742 data are to be included in the response.
743 aggr_method (String): The function to apply to the raw data
744 filtered. count, sum, avg, min, max
745 aggr_period (String): year, month, day, hour, minute, second
746 from_date (String): Starting date and time inclusive.
747 to_date (String): Final date and time inclusive.
748 last_n (int): Request only the last N values.
749 limit (int): Maximum number of results to be retrieved.
750 Default value : 10000
751 offset (int): Offset for the results.
752 georel (String): Geographical pattern
753 geometry (String): Required if georel is specified. point, line,
754 polygon, box
755 coords (String): Required if georel is specified.
756 e.g. 40.714,-74.006
757 options (String): Key value pair options.
759 Returns:
760 Response Model
761 """
762 url = urljoin(self.base_url, f'v2/entities/{entity_id}/attrs'
763 f'/{attr_name}/value')
764 res_q = self.__query_builder(url=url,
765 options=options,
766 entity_type=entity_type,
767 aggr_method=aggr_method,
768 aggr_period=aggr_period,
769 from_date=from_date,
770 to_date=to_date,
771 last_n=last_n,
772 limit=limit,
773 offset=offset,
774 georel=georel,
775 geometry=geometry,
776 coords=coords)
777 # merge response chunks
778 first = res_q.popleft()
779 res = TimeSeries(
780 entityId=entity_id,
781 index=first.get('index'),
782 attributes=[AttributeValues(attrName=attr_name,
783 values=first.get('values'))])
784 for item in res_q:
785 res.extend(
786 TimeSeries(
787 entityId=entity_id,
788 index=item.get('index'),
789 attributes=[AttributeValues(attrName=attr_name,
790 values=item.get('values'))]))
792 return res
794 # /types/{entityType}
795 def get_entity_by_type(self,
796 entity_type: str,
797 *,
798 attrs: str = None,
799 entity_id: str = None,
800 id_pattern: str = None,
801 aggr_method: Union[str, AggrMethod] = None,
802 aggr_period: Union[str, AggrPeriod] = None,
803 from_date: str = None,
804 to_date: str = None,
805 last_n: int = None,
806 limit: int = 10000,
807 offset: int = None,
808 georel: str = None,
809 geometry: str = None,
810 coords: str = None,
811 options: str = None,
812 aggr_scope: Union[str, AggrScope] = None
813 ) -> List[TimeSeries]:
814 """
815 History of N attributes of N entities of the same type.
816 For example, query the average pressure, temperature and humidity of
817 this month in all the weather stations.
818 """
819 url = urljoin(self.base_url, f'v2/types/{entity_type}')
820 res_q = self.__query_builder(url=url,
821 entity_id=entity_id,
822 id_pattern=id_pattern,
823 attrs=attrs,
824 options=options,
825 aggr_method=aggr_method,
826 aggr_period=aggr_period,
827 from_date=from_date,
828 to_date=to_date,
829 last_n=last_n,
830 limit=limit,
831 offset=offset,
832 georel=georel,
833 geometry=geometry,
834 coords=coords,
835 aggr_scope=aggr_scope)
837 # merge chunks of response
838 res = [TimeSeries(entityType=entity_type, **item)
839 for item in res_q.popleft().get('entities')]
841 for chunk in res_q:
842 chunk = [TimeSeries(entityType=entity_type, **item)
843 for item in chunk.get('entities')]
844 for new, old in zip(chunk, res):
845 old.extend(new)
847 return res
849 # /types/{entityType}/value
850 def get_entity_values_by_type(self,
851 entity_type: str,
852 *,
853 attrs: str = None,
854 entity_id: str = None,
855 id_pattern: str = None,
856 aggr_method: Union[str, AggrMethod] = None,
857 aggr_period: Union[str, AggrPeriod] = None,
858 from_date: str = None,
859 to_date: str = None,
860 last_n: int = None,
861 limit: int = 10000,
862 offset: int = None,
863 georel: str = None,
864 geometry: str = None,
865 coords: str = None,
866 options: str = None,
867 aggr_scope: Union[str, AggrScope] = None
868 ) -> List[TimeSeries]:
869 """
870 History of N attributes (values only) of N entities of the same type.
871 For example, query the average pressure, temperature and humidity (
872 values only, no metadata) of this month in
873 all the weather stations.
874 """
875 url = urljoin(self.base_url, f'v2/types/{entity_type}/value')
876 res_q = self.__query_builder(url=url,
877 entity_id=entity_id,
878 id_pattern=id_pattern,
879 attrs=attrs,
880 options=options,
881 entity_type=entity_type,
882 aggr_method=aggr_method,
883 aggr_period=aggr_period,
884 from_date=from_date,
885 to_date=to_date,
886 last_n=last_n,
887 limit=limit,
888 offset=offset,
889 georel=georel,
890 geometry=geometry,
891 coords=coords,
892 aggr_scope=aggr_scope)
893 # merge chunks of response
894 res = [TimeSeries(entityType=entity_type, **item)
895 for item in res_q.popleft().get('values')]
897 for chunk in res_q:
898 chunk = [TimeSeries(entityType=entity_type, **item)
899 for item in chunk.get('values')]
900 for new, old in zip(chunk, res):
901 old.extend(new)
903 return res
905 # /types/{entityType}/attrs/{attrName}
906 def get_entity_attr_by_type(self,
907 entity_type: str,
908 attr_name: str,
909 *,
910 entity_id: str = None,
911 id_pattern: str = None,
912 aggr_method: Union[str, AggrMethod] = None,
913 aggr_period: Union[str, AggrPeriod] = None,
914 from_date: str = None,
915 to_date: str = None,
916 last_n: int = None,
917 limit: int = 10000,
918 offset: int = None,
919 georel: str = None,
920 geometry: str = None,
921 coords: str = None,
922 options: str = None,
923 aggr_scope: Union[str, AggrScope] = None
924 ) -> List[TimeSeries]:
925 """
926 History of an attribute of N entities of the same type.
927 For example, query the pressure measurements of this month in all the
928 weather stations. Note in the response,
929 the index and values arrays are parallel. Also, when using
930 aggrMethod, the aggregation is done by-entity
931 instance. In this case, the index array is just the fromDate and
932 toDate values user specified in the request
933 (if any).
935 Args:
936 entity_type (String): Entity type is required.
937 attr_name (String): The attribute name is required.
938 entity_id (String): Comma-separated list of entity ids whose data
939 are to be included in the response.
940 aggr_method (String): The function to apply to the raw data
941 filtered. count, sum, avg, min, max
942 aggr_period (String): year, month, day, hour, minute, second
943 aggr_scope (str): Optional. (This parameter is not yet supported).
944 When the query results cover historical data for multiple
945 entities instances, you can define the aggregation method to be
946 applied for each entity instance [entity] or across
947 them [global]
948 from_date (String): Starting date and time inclusive.
949 to_date (String): Final date and time inclusive.
950 last_n (int): Request only the last N values.
951 limit (int): Maximum number of results to be retrieved.
952 Default value : 10000
953 offset (int): Offset for the results.
954 georel (String): Geographical pattern
955 geometry (String): Required if georel is specified. point, line,
956 polygon, box
957 coords (String): Required if georel is specified.
958 e.g. 40.714,-74.006
959 options (String): Key value pair options.
961 Returns:
962 Response Model
963 """
964 url = urljoin(self.base_url, f'v2/types/{entity_type}/attrs'
965 f'/{attr_name}')
966 res_q = self.__query_builder(url=url,
967 entity_id=entity_id,
968 id_pattern=id_pattern,
969 options=options,
970 entity_type=entity_type,
971 aggr_method=aggr_method,
972 aggr_period=aggr_period,
973 from_date=from_date,
974 to_date=to_date,
975 last_n=last_n,
976 limit=limit,
977 offset=offset,
978 georel=georel,
979 geometry=geometry,
980 coords=coords,
981 aggr_scope=aggr_scope)
983 # merge chunks of response
984 first = res_q.popleft()
985 res = [TimeSeries(index=item.get('index'),
986 entityType=entity_type,
987 entityId=item.get('entityId'),
988 attributes=[
989 AttributeValues(
990 attrName=first.get('attrName'),
991 values=item.get('values'))])
992 for item in first.get('entities')]
994 for chunk in res_q:
995 chunk = [TimeSeries(index=item.get('index'),
996 entityType=entity_type,
997 entityId=item.get('entityId'),
998 attributes=[
999 AttributeValues(
1000 attrName=chunk.get('attrName'),
1001 values=item.get('values'))])
1002 for item in chunk.get('entities')]
1003 for new, old in zip(chunk, res):
1004 old.extend(new)
1006 return res
1008 # /types/{entityType}/attrs/{attrName}/value
1009 def get_entity_attr_values_by_type(self,
1010 entity_type: str,
1011 attr_name: str,
1012 *,
1013 entity_id: str = None,
1014 id_pattern: str = None,
1015 aggr_method: Union[
1016 str, AggrMethod] = None,
1017 aggr_period: Union[
1018 str, AggrPeriod] = None,
1019 from_date: str = None,
1020 to_date: str = None,
1021 last_n: int = None,
1022 limit: int = 10000,
1023 offset: int = None,
1024 georel: str = None,
1025 geometry: str = None,
1026 coords: str = None,
1027 options: str = None,
1028 aggr_scope: Union[str, AggrScope] = None
1029 ) -> List[TimeSeries]:
1030 """
1031 History of an attribute (values only) of N entities of the same type.
1032 For example, query the average pressure (values only, no metadata) of
1033 this month in all the weather stations.
1035 Args:
1036 aggr_scope:
1037 entity_type (String): Entity type is required.
1038 attr_name (String): The attribute name is required.
1039 entity_id (String): Comma-separated list of entity ids whose data
1040 are to be included in the response.
1041 aggr_method (String): The function to apply to the raw data
1042 filtered. count, sum, avg, min, max
1043 aggr_period (String): year, month, day, hour, minute, second
1044 aggr_scope (String):
1045 from_date (String): Starting date and time inclusive.
1046 to_date (String): Final date and time inclusive.
1047 last_n (int): Request only the last N values.
1048 limit (int): Maximum number of results to be retrieved.
1049 Default value : 10000
1050 offset (int): Offset for the results.
1051 georel (String): Geographical pattern
1052 geometry (String): Required if georel is specified. point, line,
1053 polygon, box
1054 coords (String): Required if georel is specified.
1055 e.g. 40.714,-74.006
1056 options (String): Key value pair options.
1058 Returns:
1059 Response Model
1060 """
1061 url = urljoin(self.base_url, f'v2/types/{entity_type}/attrs/'
1062 f'{attr_name}/value')
1063 res_q = self.__query_builder(url=url,
1064 entity_id=entity_id,
1065 id_pattern=id_pattern,
1066 options=options,
1067 entity_type=entity_type,
1068 aggr_method=aggr_method,
1069 aggr_period=aggr_period,
1070 from_date=from_date,
1071 to_date=to_date,
1072 last_n=last_n,
1073 limit=limit,
1074 offset=offset,
1075 georel=georel,
1076 geometry=geometry,
1077 coords=coords,
1078 aggr_scope=aggr_scope)
1080 # merge chunks of response
1081 res = [TimeSeries(index=item.get('index'),
1082 entityType=entity_type,
1083 entityId=item.get('entityId'),
1084 attributes=[
1085 AttributeValues(attrName=attr_name,
1086 values=item.get('values'))])
1087 for item in res_q.popleft().get('values')]
1089 for chunk in res_q:
1090 chunk = [TimeSeries(index=item.get('index'),
1091 entityType=entity_type,
1092 entityId=item.get('entityId'),
1093 attributes=[
1094 AttributeValues(attrName=attr_name,
1095 values=item.get('values'))])
1096 for item in chunk.get('values')]
1098 for new, old in zip(chunk, res):
1099 old.extend(new)
1100 return res
1102 # v2/attrs
1103 def get_entity_by_attrs(self, *,
1104 entity_type: str = None,
1105 from_date: str = None,
1106 to_date: str = None,
1107 limit: int = 10000,
1108 offset: int = None
1109 ) -> List[TimeSeries]:
1110 """
1111 Get list of timeseries data grouped by each existing attribute name.
1112 The timeseries data include all entities corresponding to each
1113 attribute name as well as the index and values of this attribute in
1114 this entity.
1116 Args:
1117 entity_type (str): Comma-separated list of entity types whose data
1118 are to be included in the response. Use only one (no comma)
1119 when required. If used to resolve ambiguity for the given
1120 entityId, make sure the given entityId exists for this
1121 entityType.
1122 from_date (str): The starting date and time (inclusive) from which
1123 the context information is queried. Must be in ISO8601 format
1124 (e.g., 2018-01-05T15:44:34)
1125 to_date (str): The final date and time (inclusive) from which the
1126 context information is queried. Must be in ISO8601 format
1127 (e.g., 2018-01-05T15:44:34).
1128 limit (int): Maximum number of results to be retrieved.
1129 Default value : 10000
1130 offset (int): Offset for the results.
1132 Returns:
1133 List of TimeSeriesEntities
1134 """
1135 url = urljoin(self.base_url, 'v2/attrs')
1136 res_q = self.__query_builder(url=url,
1137 entity_type=entity_type,
1138 from_date=from_date,
1139 to_date=to_date,
1140 limit=limit,
1141 offset=offset)
1142 first = res_q.popleft()
1144 res = chain.from_iterable(map(lambda x: self.transform_attr_response_model(x),
1145 first.get("attrs")))
1146 for chunk in res_q:
1147 chunk = chain.from_iterable(map(lambda x: self.transform_attr_response_model(x),
1148 chunk.get("attrs")))
1150 for new, old in zip(chunk, res):
1151 old.extend(new)
1153 return list(res)
1155 # v2/attrs/{attr_name}
1156 def get_entity_by_attr_name(self, *,
1157 attr_name: str,
1158 entity_type: str = None,
1159 from_date: str = None,
1160 to_date: str = None,
1161 limit: int = 10000,
1162 offset: int = None
1163 ) -> List[TimeSeries]:
1164 """
1165 Get list of all entities containing this attribute name, as well as
1166 getting the index and values of this attribute in every corresponding
1167 entity.
1169 Args:
1170 attr_name (str): The attribute name in interest.
1171 entity_type (str): Comma-separated list of entity types whose data
1172 are to be included in the response. Use only one (no comma)
1173 when required. If used to resolve ambiguity for the given
1174 entityId, make sure the given entityId exists for this
1175 entityType.
1176 from_date (str): The starting date and time (inclusive) from which
1177 the context information is queried. Must be in ISO8601 format
1178 (e.g., 2018-01-05T15:44:34)
1179 to_date (str): The final date and time (inclusive) from which the
1180 context information is queried. Must be in ISO8601 format
1181 (e.g., 2018-01-05T15:44:34).
1182 limit (int): Maximum number of results to be retrieved.
1183 Default value : 10000
1184 offset (int): Offset for the results.
1186 Returns:
1187 List of TimeSeries
1188 """
1189 url = urljoin(self.base_url, f'/v2/attrs/{attr_name}')
1190 res_q = self.__query_builder(url=url,
1191 entity_type=entity_type,
1192 from_date=from_date,
1193 to_date=to_date,
1194 limit=limit,
1195 offset=offset)
1197 first = res_q.popleft()
1198 res = self.transform_attr_response_model(first)
1200 for chunk in res_q:
1201 chunk = self.transform_attr_response_model(chunk)
1202 for new, old in zip(chunk, res):
1203 old.extend(new)
1204 return list(res)
1206 def transform_attr_response_model(self, attr_response):
1207 res = []
1208 attr_name = attr_response.get("attrName")
1209 for entity_group in attr_response.get("types"):
1210 timeseries = map(lambda entity:
1211 TimeSeries(entityId=entity.get("entityId"),
1212 entityType=entity_group.get("entityType"),
1213 index=entity.get("index"),
1214 attributes=[
1215 AttributeValues(attrName=attr_name,
1216 values=entity.get("values"))]
1217 ),
1218 entity_group.get("entities"))
1219 res.append(timeseries)
1220 return chain.from_iterable(res)