Coverage for filip/clients/ngsi_v2/quantumleap.py: 78%
231 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-03-10 13:43 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-03-10 13:43 +0000
1"""
2TimeSeries Module for QuantumLeap API Client
3"""
5import logging
6import time
7from math import inf
8from collections import deque
9from itertools import count, chain
10from typing import Dict, List, Union, Deque, Optional
11from urllib.parse import urljoin
12import requests
13from pydantic import AnyHttpUrl
14from pydantic.type_adapter import TypeAdapter
15from filip import settings
16from filip.clients.base_http_client import BaseHttpClient
17from filip.models.base import FiwareHeader
18from filip.models.ngsi_v2.subscriptions import Message
19from filip.models.ngsi_v2.timeseries import (
20 AggrPeriod,
21 AggrMethod,
22 AggrScope,
23 AttributeValues,
24 TimeSeries,
25 TimeSeriesHeader,
26)
29logger = logging.getLogger(__name__)
32class QuantumLeapClient(BaseHttpClient):
33 """
34 Implements functions to use the FIWARE's QuantumLeap, which subscribes to an
35 Orion Context Broker and stores the subscription data in a time series
36 database (CrateDB). Further Information:
37 https://smartsdk.github.io/ngsi-timeseries-api/#quantumleap
38 https://app.swaggerhub.com/apis/heikkilv/quantumleap-api/
40 Args:
41 url: url of the quantumleap service
42 session (Optional):
43 fiware_header:
44 **kwargs:
45 """
47 def __init__(
48 self,
49 url: str = None,
50 *,
51 session: requests.Session = None,
52 fiware_header: FiwareHeader = None,
53 **kwargs,
54 ):
55 # set service url
56 url = url or settings.QL_URL
57 super().__init__(
58 url=url, session=session, fiware_header=fiware_header, **kwargs
59 )
61 # META API ENDPOINTS
62 def get_version(self) -> Dict:
63 """
64 Gets version of QuantumLeap-Service.
66 Returns:
67 Dictionary with response
68 """
69 url = urljoin(self.base_url, "version")
70 try:
71 res = self.get(url=url, headers=self.headers)
72 if res.ok:
73 return res.json()
74 res.raise_for_status()
75 except requests.exceptions.RequestException as err:
76 self.logger.error(err)
77 raise
79 def get_health(self) -> Dict:
80 """
81 This endpoint is intended for administrators of QuantumLeap. Using the
82 information returned by this endpoint they can diagnose problems in the
83 service or its dependencies. This information is also useful for cloud
84 tools such as orchestrators and load balancers with rules based on
85 health-checks. Due to the lack of a standardized response format, we
86 base the implementation on the draft of
87 https://inadarei.github.io/rfc-healthcheck/
89 Returns:
90 Dictionary with response
91 """
92 url = urljoin(self.base_url, "health")
93 try:
94 res = self.get(url=url, headers=self.headers)
95 if res.ok:
96 return res.json()
97 res.raise_for_status()
98 except requests.exceptions.RequestException as err:
99 self.logger.error(err)
100 raise
102 def post_config(self):
103 """
104 (To Be Implemented) Customize your persistence configuration to
105 better suit your needs.
106 """
107 raise NotImplementedError("Endpoint to be implemented..")
109 # INPUT API ENDPOINTS
110 def post_notification(self, notification: Message):
111 """
112 Notify QuantumLeap the arrival of a new NGSI notification.
114 Args:
115 notification: Notification Message Object
116 """
117 url = urljoin(self.base_url, "v2/notify")
118 headers = self.headers.copy()
119 data = []
120 for entity in notification.data:
121 data.append(entity.model_dump(exclude_none=True))
122 data_set = {"data": data, "subscriptionId": notification.subscriptionId}
124 try:
125 res = self.post(url=url, headers=headers, json=data_set)
126 if res.ok:
127 self.logger.debug(res.text)
128 else:
129 res.raise_for_status()
130 except requests.exceptions.RequestException as err:
131 msg = (
132 f"Could not post notification for subscription id "
133 f"{notification.subscriptionId}"
134 )
135 self.log_error(err=err, msg=msg)
136 raise
138 def post_subscription(
139 self,
140 cb_url: Union[AnyHttpUrl, str],
141 ql_url: Union[AnyHttpUrl, str],
142 entity_type: str = None,
143 entity_id: str = None,
144 id_pattern: str = None,
145 attributes: str = None,
146 observed_attributes: str = None,
147 notified_attributes: str = None,
148 throttling: int = None,
149 time_index_attribute: str = None,
150 ):
151 """
152 Subscribe QL to process Orion notifications of certain type.
153 This endpoint simplifies the creation of the subscription in orion
154 that will generate the notifications to be consumed by QuantumLeap in
155 order to save historical records. If you want an advanced specification
156 of the notifications, you can always create the subscription in orion
157 at your will. This endpoint just aims to simplify the common use case.
159 Args:
160 cb_url:
161 url of the context broker
162 ql_url:
163 The url where Orion can reach QuantumLeap. Do not include
164 specific paths.
165 entity_type (String):
166 The type of entities for which to create a
167 subscription, so as to persist historical data of entities of
168 this type.
169 entity_id (String):
170 Id of the entity to track. If specified, it
171 takes precedence over the idPattern parameter.
172 id_pattern (String): The pattern covering the entity ids for which
173 to subscribe. If not specified, QL will track all entities of
174 the specified type.
175 attributes (String): Comma-separated list of attribute names to
176 track.
177 observed_attributes (String): Comma-separated list of attribute
178 names to track.
179 notified_attributes (String): Comma-separated list of attribute
180 names to be used to restrict the data of which QL will keep a
181 history.
182 throttling (int): Minimal period of time in seconds which must
183 elapse between two consecutive notifications.
184 time_index_attribute (String): The name of a custom attribute to be
185 used as a
186 time index.
187 """
188 raise DeprecationWarning(
189 "Subscription endpoint of Quantumleap API is "
190 "deprecated, use the ORION subscription endpoint "
191 "instead"
192 )
194 def delete_entity(self, entity_id: str, entity_type: Optional[str] = None) -> str:
195 """
196 Given an entity (with type and id), delete all its historical records.
198 Args:
199 entity_id (String): Entity id is required.
200 entity_type (Optional[String]): Entity type if entity_id alone
201 can not uniquely define the entity.
203 Raises:
204 RequestException, if entity was not found
205 Exception, if deleting was not successful
207 Returns:
208 The entity_id of entity that is deleted.
209 """
210 url = urljoin(self.base_url, f"v2/entities/{entity_id}")
211 headers = self.headers.copy()
212 if entity_type is not None:
213 params = {"type": entity_type}
214 else:
215 params = {}
217 # The deletion does not always resolves in a success even if an ok is
218 # returned.
219 # Try to delete multiple times with incrementing waits.
220 # If the entity is no longer found the methode returns with a success
221 # If the deletion attempt fails after the 10th try, raise an
222 # Exception: it could not be deleted
223 counter = 0
224 while counter < 10:
225 self.delete(url=url, headers=headers, params=params)
226 try:
227 self.get_entity_by_id(entity_id=entity_id, entity_type=entity_type)
228 except requests.exceptions.RequestException as err:
229 self.logger.info("Entity id '%s' successfully deleted!", entity_id)
230 return entity_id
231 time.sleep(counter * 5)
232 counter += 1
234 msg = f"Could not delete QL entity of id {entity_id}"
235 logger.error(msg=msg)
236 raise Exception(msg)
238 def delete_entity_type(self, entity_type: str) -> str:
239 """
240 Given an entity type, delete all the historical records of all
241 entities of such type.
242 Args:
243 entity_type (String): Type of entities data to be deleted.
244 Returns:
245 Entity type of the entities deleted.
246 """
247 url = urljoin(self.base_url, f"v2/types/{entity_type}")
248 headers = self.headers.copy()
249 try:
250 res = self.delete(url=url, headers=headers)
251 if res.ok:
252 self.logger.info(
253 "Entities of type '%s' successfully deleted!", entity_type
254 )
255 return entity_type
256 res.raise_for_status()
257 except requests.exceptions.RequestException as err:
258 msg = f"Could not delete entities of type {entity_type}"
259 self.log_error(err=err, msg=msg)
260 raise
262 # QUERY API ENDPOINTS
263 def __query_builder(
264 self,
265 url,
266 *,
267 entity_id: str = None,
268 id_pattern: str = None,
269 options: str = None,
270 entity_type: str = None,
271 aggr_method: Union[str, AggrMethod] = None,
272 aggr_period: Union[str, AggrPeriod] = None,
273 from_date: str = None,
274 to_date: str = None,
275 last_n: int = None,
276 limit: int = 10000,
277 offset: int = 0,
278 georel: str = None,
279 geometry: str = None,
280 coords: str = None,
281 attrs: str = None,
282 aggr_scope: Union[str, AggrScope] = None,
283 ) -> Deque[Dict]:
284 """
285 Private Function to call respective API endpoints, chops large
286 requests into multiple single requests and merges the
287 responses
289 Args:
290 url:
291 entity_id:
292 options:
293 entity_type:
294 aggr_method:
295 aggr_period:
296 from_date:
297 to_date:
298 last_n:
299 limit:
300 Maximum number of results to retrieve in a single response.
301 offset:
302 Offset to apply to the response results. For example, if the
303 query was to return 10 results and you use an offset of 1, the
304 response will return the last 9 values. Make sure you don't
305 give more offset than the number of results.
306 georel:
307 geometry:
308 coords:
309 attrs:
310 aggr_scope:
311 id_pattern (str): The pattern covering the entity ids for which
312 to subscribe. The pattern follow regular expressions (POSIX
313 Extendede) e.g. ".*", "Room.*". Detail information:
314 https://en.wikibooks.org/wiki/Regular_Expressions/POSIX-Extended_Regular_Expressions
316 Returns:
317 Dict
318 """
319 assert (
320 id_pattern is None or entity_id is None
321 ), "Cannot have both id and idPattern as parameter."
322 params = {}
323 headers = self.headers.copy()
324 max_records_per_request = 10000
325 # create a double ending queue
326 res_q: Deque[Dict] = deque([])
328 if options:
329 params.update({"options": options})
330 if entity_type:
331 params.update({"type": entity_type})
332 if aggr_method:
333 aggr_method = AggrMethod(aggr_method)
334 params.update({"aggrMethod": aggr_method.value})
335 if aggr_period:
336 aggr_period = AggrPeriod(aggr_period)
337 params.update({"aggrPeriod": aggr_period.value})
338 if from_date:
339 params.update({"fromDate": from_date})
340 if to_date:
341 params.update({"toDate": to_date})
342 # These values are required for the integrated pagination mechanism
343 # maximum items per request
344 if limit is None:
345 limit = inf
346 if offset is None:
347 offset = 0
348 if georel:
349 params.update({"georel": georel})
350 if coords:
351 params.update({"coords": coords})
352 if geometry:
353 params.update({"geometry": geometry})
354 if attrs:
355 params.update({"attrs": attrs})
356 if aggr_scope:
357 aggr_scope = AggrScope(aggr_scope)
358 params.update({"aggr_scope": aggr_scope.value})
359 if entity_id:
360 params.update({"id": entity_id})
361 if id_pattern:
362 params.update({"idPattern": id_pattern})
364 # This loop will chop large requests into smaller junks.
365 # The individual functions will then merge the final response models
366 for i in count(0, max_records_per_request):
367 try:
368 params["offset"] = offset + i
370 params["limit"] = min(limit - i, max_records_per_request)
371 if params["limit"] <= 0:
372 break
374 if last_n:
375 params["lastN"] = min(last_n - i, max_records_per_request)
376 if params["lastN"] <= 0:
377 break
379 res = self.get(url=url, params=params, headers=headers)
381 if res.ok:
382 self.logger.debug("Received: %s", res.json())
384 # revert append direction when using last_n
385 if last_n:
386 res_q.appendleft(res.json())
387 else:
388 res_q.append(res.json())
389 res.raise_for_status()
391 except requests.exceptions.RequestException as err:
392 if (
393 err.response.status_code == 404
394 and err.response.json().get("error") == "Not Found"
395 and len(res_q) > 0
396 ):
397 break
398 else:
399 msg = "Could not load entity data"
400 self.log_error(err=err, msg=msg)
401 raise
403 self.logger.info("Successfully retrieved entity data")
404 return res_q
406 # v2/entities
407 def get_entities(
408 self,
409 *,
410 entity_type: str = None,
411 id_pattern: str = None,
412 from_date: str = None,
413 to_date: str = None,
414 limit: int = 10000,
415 offset: int = None,
416 ) -> List[TimeSeriesHeader]:
417 """
418 Get list of all available entities and their context information
419 about EntityType and last update date.
421 Args:
422 entity_type (str): Comma-separated list of entity types whose data
423 are to be included in the response. Use only one (no comma)
424 when required. If used to resolve ambiguity for the given
425 entityId, make sure the given entityId exists for this
426 entityType.
427 id_pattern (str): The pattern covering the entity ids for which
428 to subscribe. The pattern follow regular expressions (POSIX
429 Extendede) e.g. ".*", "Room.*". Detail information:
430 https://en.wikibooks.org/wiki/Regular_Expressions/POSIX-Extended_Regular_Expressions
431 from_date (str): The starting date and time (inclusive) from which
432 the context information is queried. Must be in ISO8601 format
433 (e.g., 2018-01-05T15:44:34)
434 to_date (str): The final date and time (inclusive) from which the
435 context information is queried. Must be in ISO8601 format
436 (e.g., 2018-01-05T15:44:34).
437 limit (int): Maximum number of results to be retrieved.
438 Default value : 10000
439 offset (int): Offset for the results.
441 Returns:
442 List of TimeSeriesHeader
443 """
444 url = urljoin(self.base_url, "v2/entities")
445 res = self.__query_builder(
446 url=url,
447 id_pattern=id_pattern,
448 entity_type=entity_type,
449 from_date=from_date,
450 to_date=to_date,
451 limit=limit,
452 offset=offset,
453 )
455 ta = TypeAdapter(List[TimeSeriesHeader])
456 return ta.validate_python(res[0])
458 # /entities/{entityId}
459 def get_entity_by_id(
460 self,
461 entity_id: str,
462 *,
463 attrs: str = None,
464 entity_type: str = None,
465 aggr_method: Union[str, AggrMethod] = None,
466 aggr_period: Union[str, AggrPeriod] = None,
467 from_date: str = None,
468 to_date: str = None,
469 last_n: int = None,
470 limit: int = 10000,
471 offset: int = None,
472 georel: str = None,
473 geometry: str = None,
474 coords: str = None,
475 options: str = None,
476 ) -> TimeSeries:
477 """
478 History of N attributes of a given entity instance
479 For example, query max water level of the central tank throughout the
480 last year. Queries can get more
481 sophisticated with the use of filters and query attributes.
483 Args:
484 entity_id (String): Entity id is required.
485 attrs (String):
486 Comma-separated list of attribute names whose data are to be
487 included in the response. The attributes are retrieved in the
488 order specified by this parameter. If not specified, all
489 attributes are included in the response in arbitrary order.
490 entity_type (String): Comma-separated list of entity types whose
491 data are to be included in the response.
492 aggr_method (String):
493 The function to apply to the raw data filtered by the query
494 parameters. If not given, the returned data are the same raw
495 inserted data.
497 Allowed values: count, sum, avg, min, max
498 aggr_period (String):
499 If not defined, the aggregation will apply to all the values
500 contained in the search result. If defined, the aggregation
501 function will instead be applied N times, once for each
502 period, and all those results will be considered for the
503 response. For example, a query asking for the average
504 temperature of an attribute will typically return 1 value.
505 However, with an aggregationPeriod of day, you get the daily
506 average of the temperature instead (more than one value
507 assuming you had measurements across many days within the
508 scope of your search result). aggrPeriod must be accompanied
509 by an aggrMethod, and the aggrMethod will be applied to all
510 the numeric attributes specified in attrs; the rest of the
511 non-numerical attrs will be ignored. By default, the response
512 is grouped by entity_id. See aggrScope to create aggregation
513 across entities:
515 Allowed values: year, month, day, hour, minute, second
517 from_date (String):
518 The starting date and time (inclusive) from which the context
519 information is queried. Must be in ISO8601 format (e.g.,
520 2018-01-05T15:44:34)
521 to_date (String):
522 The final date and time (inclusive) from which the context
523 information is queried. Must be in ISO8601 format (e.g.,
524 2018-01-05T15:44:34)
525 last_n (int):
526 Used to request only the last N values that satisfy the
527 request conditions.
528 limit (int): Maximum number of results to be retrieved.
529 Default value : 10000
530 offset (int):
531 Offset to apply to the response results.
532 georel (String):
533 It specifies a spatial relationship between matching entities
534 and a reference shape (geometry). This parameter is used to
535 perform geographical queries with the same semantics as in the
536 FIWARE-NGSI v2 Specification. Full details can be found in the
537 Geographical Queries section of the specification:
538 https://fiware.github.io/specifications/ngsiv2/stable/.
539 geometry (String):
540 Required if georel is specified. point, line, polygon, box
541 coords (String):
542 Optional but required if georel is specified. This parameter
543 defines the reference shape (geometry) in terms of WGS 84
544 coordinates and has the same semantics as in the
545 FIWARE-NGSI v2 Specification, except we only accept coordinates
546 in decimal degrees---e.g. 40.714,-74.006 is okay, but not
547 40 42' 51'',74 0' 21''. Full details can be found in the
548 Geographical Queries section of the specification:
549 https://fiware.github.io/specifications/ngsiv2/stable/.
550 options (String): Key value pair options.
552 Returns:
553 TimeSeries
554 """
555 url = urljoin(self.base_url, f"v2/entities/{entity_id}")
556 res_q = self.__query_builder(
557 url=url,
558 attrs=attrs,
559 options=options,
560 entity_type=entity_type,
561 aggr_method=aggr_method,
562 aggr_period=aggr_period,
563 from_date=from_date,
564 to_date=to_date,
565 last_n=last_n,
566 limit=limit,
567 offset=offset,
568 georel=georel,
569 geometry=geometry,
570 coords=coords,
571 )
572 # merge response chunks
573 res = TimeSeries.model_validate(res_q.popleft())
574 for item in res_q:
575 res.extend(TimeSeries.model_validate(item))
577 return res
579 # /entities/{entityId}/value
580 def get_entity_values_by_id(
581 self,
582 entity_id: str,
583 *,
584 attrs: str = None,
585 entity_type: str = None,
586 aggr_method: Union[str, AggrMethod] = None,
587 aggr_period: Union[str, AggrPeriod] = None,
588 from_date: str = None,
589 to_date: str = None,
590 last_n: int = None,
591 limit: int = 10000,
592 offset: int = None,
593 georel: str = None,
594 geometry: str = None,
595 coords: str = None,
596 options: str = None,
597 ) -> TimeSeries:
598 """
599 History of N attributes (values only) of a given entity instance
600 For example, query the average pressure, temperature and humidity (
601 values only, no metadata) of this
602 month in the weather station WS1.
604 Args:
605 entity_id (String): Entity id is required.
606 attrs (String): Comma-separated list of attribute names
607 entity_type (String): Comma-separated list of entity types whose
608 data are to be included in the response.
609 aggr_method (String): The function to apply to the raw data
610 filtered. count, sum, avg, min, max
611 aggr_period (String): year, month, day, hour, minute, second
612 from_date (String): Starting date and time inclusive.
613 to_date (String): Final date and time inclusive.
614 last_n (int): Request only the last N values.
615 limit (int): Maximum number of results to be retrieved.
616 Default value : 10000
617 offset (int): Offset for the results.
618 georel (String): Geographical pattern
619 geometry (String): Required if georel is specified. point, line,
620 polygon, box
621 coords (String): Required if georel is specified.
622 e.g. 40.714,-74.006
623 options (String): Key value pair options.
625 Returns:
626 Response Model
627 """
628 url = urljoin(self.base_url, f"v2/entities/{entity_id}/value")
629 res_q = self.__query_builder(
630 url=url,
631 attrs=attrs,
632 options=options,
633 entity_type=entity_type,
634 aggr_method=aggr_method,
635 aggr_period=aggr_period,
636 from_date=from_date,
637 to_date=to_date,
638 last_n=last_n,
639 limit=limit,
640 offset=offset,
641 georel=georel,
642 geometry=geometry,
643 coords=coords,
644 )
646 # merge response chunks
647 res = TimeSeries(entityId=entity_id, **res_q.popleft())
648 for item in res_q:
649 res.extend(TimeSeries(entityId=entity_id, **item))
651 return res
653 # /entities/{entityId}/attrs/{attrName}
654 def get_entity_attr_by_id(
655 self,
656 entity_id: str,
657 attr_name: str,
658 *,
659 entity_type: str = None,
660 aggr_method: Union[str, AggrMethod] = None,
661 aggr_period: Union[str, AggrPeriod] = None,
662 from_date: str = None,
663 to_date: str = None,
664 last_n: int = None,
665 limit: int = 10000,
666 offset: int = None,
667 georel: str = None,
668 geometry: str = None,
669 coords: str = None,
670 options: str = None,
671 ) -> TimeSeries:
672 """
673 History of an attribute of a given entity instance
674 For example, query max water level of the central tank throughout the
675 last year. Queries can get more
676 sophisticated with the use of filters and query attributes.
678 Args:
679 entity_id (String): Entity id is required.
680 attr_name (String): The attribute name is required.
681 entity_type (String): Comma-separated list of entity types whose
682 data are to be included in the response.
683 aggr_method (String): The function to apply to the raw data
684 filtered. count, sum, avg, min, max
685 aggr_period (String): year, month, day, hour, minute, second
686 from_date (String): Starting date and time inclusive.
687 to_date (String): Final date and time inclusive.
688 last_n (int): Request only the last N values.
689 limit (int): Maximum number of results to be retrieved.
690 Default value : 10000
691 offset (int): Offset for the results.
692 georel (String): Geographical pattern
693 geometry (String): Required if georel is specified. point, line,
694 polygon, box
695 coords (String): Required if georel is specified.
696 e.g. 40.714,-74.006
697 options (String): Key value pair options.
699 Returns:
700 Response Model
701 """
702 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs" f"/{attr_name}")
703 req_q = self.__query_builder(
704 url=url,
705 entity_id=entity_id,
706 options=options,
707 entity_type=entity_type,
708 aggr_method=aggr_method,
709 aggr_period=aggr_period,
710 from_date=from_date,
711 to_date=to_date,
712 last_n=last_n,
713 limit=limit,
714 offset=offset,
715 georel=georel,
716 geometry=geometry,
717 coords=coords,
718 )
720 # merge response chunks
721 first = req_q.popleft()
722 res = TimeSeries(
723 entityId=entity_id,
724 index=first.get("index"),
725 attributes=[AttributeValues(**first)],
726 )
727 for item in req_q:
728 res.extend(
729 TimeSeries(
730 entityId=entity_id,
731 index=item.get("index"),
732 attributes=[AttributeValues(**item)],
733 )
734 )
736 return res
738 # /entities/{entityId}/attrs/{attrName}/value
739 def get_entity_attr_values_by_id(
740 self,
741 entity_id: str,
742 attr_name: str,
743 *,
744 entity_type: str = None,
745 aggr_method: Union[str, AggrMethod] = None,
746 aggr_period: Union[str, AggrPeriod] = None,
747 from_date: str = None,
748 to_date: str = None,
749 last_n: int = None,
750 limit: int = 10000,
751 offset: int = None,
752 georel: str = None,
753 geometry: str = None,
754 coords: str = None,
755 options: str = None,
756 ) -> TimeSeries:
757 """
758 History of an attribute (values only) of a given entity instance
759 Similar to the previous, but focusing on the values regardless of the
760 metadata.
762 Args:
763 entity_id (String): Entity id is required.
764 attr_name (String): The attribute name is required.
765 entity_type (String): Comma-separated list of entity types whose
766 data are to be included in the response.
767 aggr_method (String): The function to apply to the raw data
768 filtered. count, sum, avg, min, max
769 aggr_period (String): year, month, day, hour, minute, second
770 from_date (String): Starting date and time inclusive.
771 to_date (String): Final date and time inclusive.
772 last_n (int): Request only the last N values.
773 limit (int): Maximum number of results to be retrieved.
774 Default value : 10000
775 offset (int): Offset for the results.
776 georel (String): Geographical pattern
777 geometry (String): Required if georel is specified. point, line,
778 polygon, box
779 coords (String): Required if georel is specified.
780 e.g. 40.714,-74.006
781 options (String): Key value pair options.
783 Returns:
784 Response Model
785 """
786 url = urljoin(
787 self.base_url, f"v2/entities/{entity_id}/attrs" f"/{attr_name}/value"
788 )
789 res_q = self.__query_builder(
790 url=url,
791 options=options,
792 entity_type=entity_type,
793 aggr_method=aggr_method,
794 aggr_period=aggr_period,
795 from_date=from_date,
796 to_date=to_date,
797 last_n=last_n,
798 limit=limit,
799 offset=offset,
800 georel=georel,
801 geometry=geometry,
802 coords=coords,
803 )
804 # merge response chunks
805 first = res_q.popleft()
806 res = TimeSeries(
807 entityId=entity_id,
808 index=first.get("index"),
809 attributes=[
810 AttributeValues(attrName=attr_name, values=first.get("values"))
811 ],
812 )
813 for item in res_q:
814 res.extend(
815 TimeSeries(
816 entityId=entity_id,
817 index=item.get("index"),
818 attributes=[
819 AttributeValues(attrName=attr_name, values=item.get("values"))
820 ],
821 )
822 )
824 return res
826 # /types/{entityType}
827 def get_entity_by_type(
828 self,
829 entity_type: str,
830 *,
831 attrs: str = None,
832 entity_id: str = None,
833 id_pattern: str = None,
834 aggr_method: Union[str, AggrMethod] = None,
835 aggr_period: Union[str, AggrPeriod] = None,
836 from_date: str = None,
837 to_date: str = None,
838 last_n: int = None,
839 limit: int = 10000,
840 offset: int = None,
841 georel: str = None,
842 geometry: str = None,
843 coords: str = None,
844 options: str = None,
845 aggr_scope: Union[str, AggrScope] = None,
846 ) -> List[TimeSeries]:
847 """
848 History of N attributes of N entities of the same type.
849 For example, query the average pressure, temperature and humidity of
850 this month in all the weather stations.
851 """
852 url = urljoin(self.base_url, f"v2/types/{entity_type}")
853 res_q = self.__query_builder(
854 url=url,
855 entity_id=entity_id,
856 id_pattern=id_pattern,
857 attrs=attrs,
858 options=options,
859 aggr_method=aggr_method,
860 aggr_period=aggr_period,
861 from_date=from_date,
862 to_date=to_date,
863 last_n=last_n,
864 limit=limit,
865 offset=offset,
866 georel=georel,
867 geometry=geometry,
868 coords=coords,
869 aggr_scope=aggr_scope,
870 )
872 # merge chunks of response
873 res = [
874 TimeSeries(entityType=entity_type, **item)
875 for item in res_q.popleft().get("entities")
876 ]
878 for chunk in res_q:
879 chunk = [
880 TimeSeries(entityType=entity_type, **item)
881 for item in chunk.get("entities")
882 ]
883 for new, old in zip(chunk, res):
884 old.extend(new)
886 return res
888 # /types/{entityType}/value
889 def get_entity_values_by_type(
890 self,
891 entity_type: str,
892 *,
893 attrs: str = None,
894 entity_id: str = None,
895 id_pattern: str = None,
896 aggr_method: Union[str, AggrMethod] = None,
897 aggr_period: Union[str, AggrPeriod] = None,
898 from_date: str = None,
899 to_date: str = None,
900 last_n: int = None,
901 limit: int = 10000,
902 offset: int = None,
903 georel: str = None,
904 geometry: str = None,
905 coords: str = None,
906 options: str = None,
907 aggr_scope: Union[str, AggrScope] = None,
908 ) -> List[TimeSeries]:
909 """
910 History of N attributes (values only) of N entities of the same type.
911 For example, query the average pressure, temperature and humidity (
912 values only, no metadata) of this month in
913 all the weather stations.
914 """
915 url = urljoin(self.base_url, f"v2/types/{entity_type}/value")
916 res_q = self.__query_builder(
917 url=url,
918 entity_id=entity_id,
919 id_pattern=id_pattern,
920 attrs=attrs,
921 options=options,
922 entity_type=entity_type,
923 aggr_method=aggr_method,
924 aggr_period=aggr_period,
925 from_date=from_date,
926 to_date=to_date,
927 last_n=last_n,
928 limit=limit,
929 offset=offset,
930 georel=georel,
931 geometry=geometry,
932 coords=coords,
933 aggr_scope=aggr_scope,
934 )
935 # merge chunks of response
936 res = [
937 TimeSeries(entityType=entity_type, **item)
938 for item in res_q.popleft().get("values")
939 ]
941 for chunk in res_q:
942 chunk = [
943 TimeSeries(entityType=entity_type, **item)
944 for item in chunk.get("values")
945 ]
946 for new, old in zip(chunk, res):
947 old.extend(new)
949 return res
951 # /types/{entityType}/attrs/{attrName}
952 def get_entity_attr_by_type(
953 self,
954 entity_type: str,
955 attr_name: str,
956 *,
957 entity_id: str = None,
958 id_pattern: str = None,
959 aggr_method: Union[str, AggrMethod] = None,
960 aggr_period: Union[str, AggrPeriod] = None,
961 from_date: str = None,
962 to_date: str = None,
963 last_n: int = None,
964 limit: int = 10000,
965 offset: int = None,
966 georel: str = None,
967 geometry: str = None,
968 coords: str = None,
969 options: str = None,
970 aggr_scope: Union[str, AggrScope] = None,
971 ) -> List[TimeSeries]:
972 """
973 History of an attribute of N entities of the same type.
974 For example, query the pressure measurements of this month in all the
975 weather stations. Note in the response,
976 the index and values arrays are parallel. Also, when using
977 aggrMethod, the aggregation is done by-entity
978 instance. In this case, the index array is just the fromDate and
979 toDate values user specified in the request
980 (if any).
982 Args:
983 entity_type (String): Entity type is required.
984 attr_name (String): The attribute name is required.
985 entity_id (String): Comma-separated list of entity ids whose data
986 are to be included in the response.
987 aggr_method (String): The function to apply to the raw data
988 filtered. count, sum, avg, min, max
989 aggr_period (String): year, month, day, hour, minute, second
990 aggr_scope (str): Optional. (This parameter is not yet supported).
991 When the query results cover historical data for multiple
992 entities instances, you can define the aggregation method to be
993 applied for each entity instance [entity] or across
994 them [global]
995 from_date (String): Starting date and time inclusive.
996 to_date (String): Final date and time inclusive.
997 last_n (int): Request only the last N values.
998 limit (int): Maximum number of results to be retrieved.
999 Default value : 10000
1000 offset (int): Offset for the results.
1001 georel (String): Geographical pattern
1002 geometry (String): Required if georel is specified. point, line,
1003 polygon, box
1004 coords (String): Required if georel is specified.
1005 e.g. 40.714,-74.006
1006 options (String): Key value pair options.
1008 Returns:
1009 Response Model
1010 """
1011 url = urljoin(self.base_url, f"v2/types/{entity_type}/attrs" f"/{attr_name}")
1012 res_q = self.__query_builder(
1013 url=url,
1014 entity_id=entity_id,
1015 id_pattern=id_pattern,
1016 options=options,
1017 entity_type=entity_type,
1018 aggr_method=aggr_method,
1019 aggr_period=aggr_period,
1020 from_date=from_date,
1021 to_date=to_date,
1022 last_n=last_n,
1023 limit=limit,
1024 offset=offset,
1025 georel=georel,
1026 geometry=geometry,
1027 coords=coords,
1028 aggr_scope=aggr_scope,
1029 )
1031 # merge chunks of response
1032 first = res_q.popleft()
1033 res = [
1034 TimeSeries(
1035 index=item.get("index"),
1036 entityType=entity_type,
1037 entityId=item.get("entityId"),
1038 attributes=[
1039 AttributeValues(
1040 attrName=first.get("attrName"), values=item.get("values")
1041 )
1042 ],
1043 )
1044 for item in first.get("entities")
1045 ]
1047 for chunk in res_q:
1048 chunk = [
1049 TimeSeries(
1050 index=item.get("index"),
1051 entityType=entity_type,
1052 entityId=item.get("entityId"),
1053 attributes=[
1054 AttributeValues(
1055 attrName=chunk.get("attrName"), values=item.get("values")
1056 )
1057 ],
1058 )
1059 for item in chunk.get("entities")
1060 ]
1061 for new, old in zip(chunk, res):
1062 old.extend(new)
1064 return res
1066 # /types/{entityType}/attrs/{attrName}/value
1067 def get_entity_attr_values_by_type(
1068 self,
1069 entity_type: str,
1070 attr_name: str,
1071 *,
1072 entity_id: str = None,
1073 id_pattern: str = None,
1074 aggr_method: Union[str, AggrMethod] = None,
1075 aggr_period: Union[str, AggrPeriod] = None,
1076 from_date: str = None,
1077 to_date: str = None,
1078 last_n: int = None,
1079 limit: int = 10000,
1080 offset: int = None,
1081 georel: str = None,
1082 geometry: str = None,
1083 coords: str = None,
1084 options: str = None,
1085 aggr_scope: Union[str, AggrScope] = None,
1086 ) -> List[TimeSeries]:
1087 """
1088 History of an attribute (values only) of N entities of the same type.
1089 For example, query the average pressure (values only, no metadata) of
1090 this month in all the weather stations.
1092 Args:
1093 aggr_scope:
1094 entity_type (String): Entity type is required.
1095 attr_name (String): The attribute name is required.
1096 entity_id (String): Comma-separated list of entity ids whose data
1097 are to be included in the response.
1098 aggr_method (String): The function to apply to the raw data
1099 filtered. count, sum, avg, min, max
1100 aggr_period (String): year, month, day, hour, minute, second
1101 aggr_scope (String):
1102 from_date (String): Starting date and time inclusive.
1103 to_date (String): Final date and time inclusive.
1104 last_n (int): Request only the last N values.
1105 limit (int): Maximum number of results to be retrieved.
1106 Default value : 10000
1107 offset (int): Offset for the results.
1108 georel (String): Geographical pattern
1109 geometry (String): Required if georel is specified. point, line,
1110 polygon, box
1111 coords (String): Required if georel is specified.
1112 e.g. 40.714,-74.006
1113 options (String): Key value pair options.
1115 Returns:
1116 Response Model
1117 """
1118 url = urljoin(
1119 self.base_url, f"v2/types/{entity_type}/attrs/" f"{attr_name}/value"
1120 )
1121 res_q = self.__query_builder(
1122 url=url,
1123 entity_id=entity_id,
1124 id_pattern=id_pattern,
1125 options=options,
1126 entity_type=entity_type,
1127 aggr_method=aggr_method,
1128 aggr_period=aggr_period,
1129 from_date=from_date,
1130 to_date=to_date,
1131 last_n=last_n,
1132 limit=limit,
1133 offset=offset,
1134 georel=georel,
1135 geometry=geometry,
1136 coords=coords,
1137 aggr_scope=aggr_scope,
1138 )
1140 # merge chunks of response
1141 res = [
1142 TimeSeries(
1143 index=item.get("index"),
1144 entityType=entity_type,
1145 entityId=item.get("entityId"),
1146 attributes=[
1147 AttributeValues(attrName=attr_name, values=item.get("values"))
1148 ],
1149 )
1150 for item in res_q.popleft().get("values")
1151 ]
1153 for chunk in res_q:
1154 chunk = [
1155 TimeSeries(
1156 index=item.get("index"),
1157 entityType=entity_type,
1158 entityId=item.get("entityId"),
1159 attributes=[
1160 AttributeValues(attrName=attr_name, values=item.get("values"))
1161 ],
1162 )
1163 for item in chunk.get("values")
1164 ]
1166 for new, old in zip(chunk, res):
1167 old.extend(new)
1168 return res
1170 # v2/attrs
1171 def get_entity_by_attrs(
1172 self,
1173 *,
1174 entity_type: str = None,
1175 from_date: str = None,
1176 to_date: str = None,
1177 limit: int = 10000,
1178 offset: int = None,
1179 ) -> List[TimeSeries]:
1180 """
1181 Get list of timeseries data grouped by each existing attribute name.
1182 The timeseries data include all entities corresponding to each
1183 attribute name as well as the index and values of this attribute in
1184 this entity.
1186 Args:
1187 entity_type (str): Comma-separated list of entity types whose data
1188 are to be included in the response. Use only one (no comma)
1189 when required. If used to resolve ambiguity for the given
1190 entityId, make sure the given entityId exists for this
1191 entityType.
1192 from_date (str): The starting date and time (inclusive) from which
1193 the context information is queried. Must be in ISO8601 format
1194 (e.g., 2018-01-05T15:44:34)
1195 to_date (str): The final date and time (inclusive) from which the
1196 context information is queried. Must be in ISO8601 format
1197 (e.g., 2018-01-05T15:44:34).
1198 limit (int): Maximum number of results to be retrieved.
1199 Default value : 10000
1200 offset (int): Offset for the results.
1202 Returns:
1203 List of TimeSeriesEntities
1204 """
1205 url = urljoin(self.base_url, "v2/attrs")
1206 res_q = self.__query_builder(
1207 url=url,
1208 entity_type=entity_type,
1209 from_date=from_date,
1210 to_date=to_date,
1211 limit=limit,
1212 offset=offset,
1213 )
1214 first = res_q.popleft()
1216 res = chain.from_iterable(
1217 map(lambda x: self.transform_attr_response_model(x), first.get("attrs"))
1218 )
1219 for chunk in res_q:
1220 chunk = chain.from_iterable(
1221 map(lambda x: self.transform_attr_response_model(x), chunk.get("attrs"))
1222 )
1224 for new, old in zip(chunk, res):
1225 old.extend(new)
1227 return list(res)
1229 # v2/attrs/{attr_name}
1230 def get_entity_by_attr_name(
1231 self,
1232 *,
1233 attr_name: str,
1234 entity_type: str = None,
1235 from_date: str = None,
1236 to_date: str = None,
1237 limit: int = 10000,
1238 offset: int = None,
1239 ) -> List[TimeSeries]:
1240 """
1241 Get list of all entities containing this attribute name, as well as
1242 getting the index and values of this attribute in every corresponding
1243 entity.
1245 Args:
1246 attr_name (str): The attribute name in interest.
1247 entity_type (str): Comma-separated list of entity types whose data
1248 are to be included in the response. Use only one (no comma)
1249 when required. If used to resolve ambiguity for the given
1250 entityId, make sure the given entityId exists for this
1251 entityType.
1252 from_date (str): The starting date and time (inclusive) from which
1253 the context information is queried. Must be in ISO8601 format
1254 (e.g., 2018-01-05T15:44:34)
1255 to_date (str): The final date and time (inclusive) from which the
1256 context information is queried. Must be in ISO8601 format
1257 (e.g., 2018-01-05T15:44:34).
1258 limit (int): Maximum number of results to be retrieved.
1259 Default value : 10000
1260 offset (int): Offset for the results.
1262 Returns:
1263 List of TimeSeries
1264 """
1265 url = urljoin(self.base_url, f"/v2/attrs/{attr_name}")
1266 res_q = self.__query_builder(
1267 url=url,
1268 entity_type=entity_type,
1269 from_date=from_date,
1270 to_date=to_date,
1271 limit=limit,
1272 offset=offset,
1273 )
1275 first = res_q.popleft()
1276 res = self.transform_attr_response_model(first)
1278 for chunk in res_q:
1279 chunk = self.transform_attr_response_model(chunk)
1280 for new, old in zip(chunk, res):
1281 old.extend(new)
1282 return list(res)
1284 def transform_attr_response_model(self, attr_response):
1285 res = []
1286 attr_name = attr_response.get("attrName")
1287 for entity_group in attr_response.get("types"):
1288 timeseries = map(
1289 lambda entity: TimeSeries(
1290 entityId=entity.get("entityId"),
1291 entityType=entity_group.get("entityType"),
1292 index=entity.get("index"),
1293 attributes=[
1294 AttributeValues(attrName=attr_name, values=entity.get("values"))
1295 ],
1296 ),
1297 entity_group.get("entities"),
1298 )
1299 res.append(timeseries)
1300 return chain.from_iterable(res)