Coverage for filip/clients/ngsi_v2/quantumleap.py: 78%
232 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-05 11:07 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-05 11:07 +0000
1"""
2TimeSeries Module for QuantumLeap API Client
3"""
5import logging
6import time
7from math import inf
8from collections import deque
9from itertools import count, chain
10from typing import Dict, List, Union, Deque, Optional
11from urllib.parse import urljoin
12import requests
13from pydantic import AnyHttpUrl
14from pydantic.type_adapter import TypeAdapter
15from filip import settings
16from filip.clients.base_http_client import BaseHttpClient
17from filip.models.base import FiwareHeader
18from filip.models.ngsi_v2.subscriptions import Message
19from filip.models.ngsi_ld.context import MessageLD
20from filip.models.ngsi_v2.timeseries import (
21 AggrPeriod,
22 AggrMethod,
23 AggrScope,
24 AttributeValues,
25 TimeSeries,
26 TimeSeriesHeader,
27)
28from filip.clients.exceptions import BaseHttpClientException
31logger = logging.getLogger(__name__)
34class QuantumLeapClient(BaseHttpClient):
35 """
36 Implements functions to use the FIWARE's QuantumLeap, which subscribes to an
37 Orion Context Broker and stores the subscription data in a time series
38 database (CrateDB). Further Information:
39 https://smartsdk.github.io/ngsi-timeseries-api/#quantumleap
40 https://app.swaggerhub.com/apis/heikkilv/quantumleap-api/
42 Args:
43 url: url of the quantumleap service
44 session (Optional):
45 fiware_header:
46 **kwargs:
47 """
49 def __init__(
50 self,
51 url: str = None,
52 *,
53 session: requests.Session = None,
54 fiware_header: FiwareHeader = None,
55 **kwargs,
56 ):
57 # set service url
58 url = url or settings.QL_URL
59 super().__init__(
60 url=url, session=session, fiware_header=fiware_header, **kwargs
61 )
63 # META API ENDPOINTS
64 def get_version(self) -> Dict:
65 """
66 Gets version of QuantumLeap-Service.
68 Returns:
69 Dictionary with response
70 """
71 url = urljoin(self.base_url, "version")
72 try:
73 res = self.get(url=url, headers=self.headers)
74 if res.ok:
75 return res.json()
76 res.raise_for_status()
77 except requests.exceptions.RequestException as err:
78 self.logger.error(err)
79 raise BaseHttpClientException(
80 message=err.response, response=err.response
81 ) from err
83 def get_health(self) -> Dict:
84 """
85 This endpoint is intended for administrators of QuantumLeap. Using the
86 information returned by this endpoint they can diagnose problems in the
87 service or its dependencies. This information is also useful for cloud
88 tools such as orchestrators and load balancers with rules based on
89 health-checks. Due to the lack of a standardized response format, we
90 base the implementation on the draft of
91 https://inadarei.github.io/rfc-healthcheck/
93 Returns:
94 Dictionary with response
95 """
96 url = urljoin(self.base_url, "health")
97 try:
98 res = self.get(url=url, headers=self.headers)
99 if res.ok:
100 return res.json()
101 res.raise_for_status()
102 except requests.exceptions.RequestException as err:
103 self.logger.error(err)
104 raise BaseHttpClientException(
105 message=err.response.text, response=err.response
106 ) from err
108 def post_config(self):
109 """
110 (To Be Implemented) Customize your persistence configuration to
111 better suit your needs.
112 """
113 raise NotImplementedError("Endpoint to be implemented..")
115 # INPUT API ENDPOINTS
116 def post_notification(self, notification: Union[Message, MessageLD]):
117 """
118 Notify QuantumLeap the arrival of a new NGSI notification.
120 Args:
121 notification: Notification Message Object
122 """
123 url = urljoin(self.base_url, "v2/notify")
124 headers = self.headers.copy()
125 data = []
126 for entity in notification.data:
127 data.append(entity.model_dump(exclude_none=True))
128 data_set = {"data": data, "subscriptionId": notification.subscriptionId}
130 try:
131 res = self.post(url=url, headers=headers, json=data_set)
132 if res.ok:
133 self.logger.debug(res.text)
134 else:
135 res.raise_for_status()
136 except requests.exceptions.RequestException as err:
137 msg = (
138 f"Could not post notification for subscription id "
139 f"{notification.subscriptionId}"
140 )
141 self.log_error(err=err, msg=msg)
142 raise BaseHttpClientException(message=msg, response=err.response) from err
144 def post_subscription(
145 self,
146 cb_url: Union[AnyHttpUrl, str],
147 ql_url: Union[AnyHttpUrl, str],
148 entity_type: str = None,
149 entity_id: str = None,
150 id_pattern: str = None,
151 attributes: str = None,
152 observed_attributes: str = None,
153 notified_attributes: str = None,
154 throttling: int = None,
155 time_index_attribute: str = None,
156 ):
157 """
158 Subscribe QL to process Orion notifications of certain type.
159 This endpoint simplifies the creation of the subscription in orion
160 that will generate the notifications to be consumed by QuantumLeap in
161 order to save historical records. If you want an advanced specification
162 of the notifications, you can always create the subscription in orion
163 at your will. This endpoint just aims to simplify the common use case.
165 Args:
166 cb_url:
167 url of the context broker
168 ql_url:
169 The url where Orion can reach QuantumLeap. Do not include
170 specific paths.
171 entity_type (String):
172 The type of entities for which to create a
173 subscription, so as to persist historical data of entities of
174 this type.
175 entity_id (String):
176 Id of the entity to track. If specified, it
177 takes precedence over the idPattern parameter.
178 id_pattern (String): The pattern covering the entity ids for which
179 to subscribe. If not specified, QL will track all entities of
180 the specified type.
181 attributes (String): Comma-separated list of attribute names to
182 track.
183 observed_attributes (String): Comma-separated list of attribute
184 names to track.
185 notified_attributes (String): Comma-separated list of attribute
186 names to be used to restrict the data of which QL will keep a
187 history.
188 throttling (int): Minimal period of time in seconds which must
189 elapse between two consecutive notifications.
190 time_index_attribute (String): The name of a custom attribute to be
191 used as a
192 time index.
193 """
194 raise DeprecationWarning(
195 "Subscription endpoint of Quantumleap API is "
196 "deprecated, use the ORION subscription endpoint "
197 "instead"
198 )
200 def delete_entity(self, entity_id: str, entity_type: Optional[str] = None) -> str:
201 """
202 Given an entity (with type and id), delete all its historical records.
204 Args:
205 entity_id (String): Entity id is required.
206 entity_type (Optional[String]): Entity type if entity_id alone
207 can not uniquely define the entity.
209 Raises:
210 RequestException, if entity was not found
211 Exception, if deleting was not successful
213 Returns:
214 The entity_id of entity that is deleted.
215 """
216 url = urljoin(self.base_url, f"v2/entities/{entity_id}")
217 headers = self.headers.copy()
218 if entity_type is not None:
219 params = {"type": entity_type}
220 else:
221 params = {}
223 # The deletion does not always resolves in a success even if an ok is
224 # returned.
225 # Try to delete multiple times with incrementing waits.
226 # If the entity is no longer found the methode returns with a success
227 # If the deletion attempt fails after the 10th try, raise an
228 # Exception: it could not be deleted
229 counter = 0
230 while counter < 10:
231 self.delete(url=url, headers=headers, params=params)
232 try:
233 self.get_entity_by_id(entity_id=entity_id, entity_type=entity_type)
234 except requests.exceptions.RequestException as err:
235 self.logger.info("Entity id '%s' successfully deleted!", entity_id)
236 return entity_id
237 time.sleep(counter * 5)
238 counter += 1
240 msg = f"Could not delete QL entity of id {entity_id}"
241 logger.error(msg=msg)
242 raise Exception(msg)
244 def delete_entity_type(self, entity_type: str) -> str:
245 """
246 Given an entity type, delete all the historical records of all
247 entities of such type.
248 Args:
249 entity_type (String): Type of entities data to be deleted.
250 Returns:
251 Entity type of the entities deleted.
252 """
253 url = urljoin(self.base_url, f"v2/types/{entity_type}")
254 headers = self.headers.copy()
255 try:
256 res = self.delete(url=url, headers=headers)
257 if res.ok:
258 self.logger.info(
259 "Entities of type '%s' successfully deleted!", entity_type
260 )
261 return entity_type
262 res.raise_for_status()
263 except requests.exceptions.RequestException as err:
264 msg = f"Could not delete entities of type {entity_type}"
265 self.log_error(err=err, msg=msg)
266 raise BaseHttpClientException(message=msg, response=err.response) from err
268 # QUERY API ENDPOINTS
269 def __query_builder(
270 self,
271 url,
272 *,
273 entity_id: str = None,
274 id_pattern: str = None,
275 options: str = None,
276 entity_type: str = None,
277 aggr_method: Union[str, AggrMethod] = None,
278 aggr_period: Union[str, AggrPeriod] = None,
279 from_date: str = None,
280 to_date: str = None,
281 last_n: int = None,
282 limit: int = 10000,
283 offset: int = 0,
284 georel: str = None,
285 geometry: str = None,
286 coords: str = None,
287 attrs: str = None,
288 aggr_scope: Union[str, AggrScope] = None,
289 ) -> Deque[Dict]:
290 """
291 Private Function to call respective API endpoints, chops large
292 requests into multiple single requests and merges the
293 responses
295 Args:
296 url:
297 entity_id:
298 options:
299 entity_type:
300 aggr_method:
301 aggr_period:
302 from_date:
303 to_date:
304 last_n:
305 limit:
306 Maximum number of results to retrieve in a single response.
307 offset:
308 Offset to apply to the response results. For example, if the
309 query was to return 10 results and you use an offset of 1, the
310 response will return the last 9 values. Make sure you don't
311 give more offset than the number of results.
312 georel:
313 geometry:
314 coords:
315 attrs:
316 aggr_scope:
317 id_pattern (str): The pattern covering the entity ids for which
318 to subscribe. The pattern follow regular expressions (POSIX
319 Extendede) e.g. ".*", "Room.*". Detail information:
320 https://en.wikibooks.org/wiki/Regular_Expressions/POSIX-Extended_Regular_Expressions
322 Returns:
323 Dict
324 """
325 assert (
326 id_pattern is None or entity_id is None
327 ), "Cannot have both id and idPattern as parameter."
328 params = {}
329 headers = self.headers.copy()
330 max_records_per_request = 10000
331 # create a double ending queue
332 res_q: Deque[Dict] = deque([])
334 if options:
335 params.update({"options": options})
336 if entity_type:
337 params.update({"type": entity_type})
338 if aggr_method:
339 aggr_method = AggrMethod(aggr_method)
340 params.update({"aggrMethod": aggr_method.value})
341 if aggr_period:
342 aggr_period = AggrPeriod(aggr_period)
343 params.update({"aggrPeriod": aggr_period.value})
344 if from_date:
345 params.update({"fromDate": from_date})
346 if to_date:
347 params.update({"toDate": to_date})
348 # These values are required for the integrated pagination mechanism
349 # maximum items per request
350 if limit is None:
351 limit = inf
352 if offset is None:
353 offset = 0
354 if georel:
355 params.update({"georel": georel})
356 if coords:
357 params.update({"coords": coords})
358 if geometry:
359 params.update({"geometry": geometry})
360 if attrs:
361 params.update({"attrs": attrs})
362 if aggr_scope:
363 aggr_scope = AggrScope(aggr_scope)
364 params.update({"aggr_scope": aggr_scope.value})
365 if entity_id:
366 params.update({"id": entity_id})
367 if id_pattern:
368 params.update({"idPattern": id_pattern})
370 # This loop will chop large requests into smaller junks.
371 # The individual functions will then merge the final response models
372 for i in count(0, max_records_per_request):
373 try:
374 params["offset"] = offset + i
376 params["limit"] = min(limit - i, max_records_per_request)
377 if params["limit"] <= 0:
378 break
380 if last_n:
381 params["lastN"] = min(last_n - i, max_records_per_request)
382 if params["lastN"] <= 0:
383 break
385 res = self.get(url=url, params=params, headers=headers)
387 if res.ok:
388 self.logger.debug("Received: %s", res.json())
390 # revert append direction when using last_n
391 if last_n:
392 res_q.appendleft(res.json())
393 else:
394 res_q.append(res.json())
395 res.raise_for_status()
397 except requests.exceptions.RequestException as err:
398 if (
399 err.response.status_code == 404
400 and err.response.json().get("error") == "Not Found"
401 and len(res_q) > 0
402 ):
403 break
404 else:
405 msg = "Could not load entity data"
406 self.log_error(err=err, msg=msg)
407 raise BaseHttpClientException(
408 message=msg, response=err.response
409 ) from err
411 self.logger.info("Successfully retrieved entity data")
412 return res_q
414 # v2/entities
415 def get_entities(
416 self,
417 *,
418 entity_type: str = None,
419 id_pattern: str = None,
420 from_date: str = None,
421 to_date: str = None,
422 limit: int = 10000,
423 offset: int = None,
424 ) -> List[TimeSeriesHeader]:
425 """
426 Get list of all available entities and their context information
427 about EntityType and last update date.
429 Args:
430 entity_type (str): Comma-separated list of entity types whose data
431 are to be included in the response. Use only one (no comma)
432 when required. If used to resolve ambiguity for the given
433 entityId, make sure the given entityId exists for this
434 entityType.
435 id_pattern (str): The pattern covering the entity ids for which
436 to subscribe. The pattern follow regular expressions (POSIX
437 Extendede) e.g. ".*", "Room.*". Detail information:
438 https://en.wikibooks.org/wiki/Regular_Expressions/POSIX-Extended_Regular_Expressions
439 from_date (str): The starting date and time (inclusive) from which
440 the context information is queried. Must be in ISO8601 format
441 (e.g., 2018-01-05T15:44:34)
442 to_date (str): The final date and time (inclusive) from which the
443 context information is queried. Must be in ISO8601 format
444 (e.g., 2018-01-05T15:44:34).
445 limit (int): Maximum number of results to be retrieved.
446 Default value : 10000
447 offset (int): Offset for the results.
449 Returns:
450 List of TimeSeriesHeader
451 """
452 url = urljoin(self.base_url, "v2/entities")
453 res = self.__query_builder(
454 url=url,
455 id_pattern=id_pattern,
456 entity_type=entity_type,
457 from_date=from_date,
458 to_date=to_date,
459 limit=limit,
460 offset=offset,
461 )
463 ta = TypeAdapter(List[TimeSeriesHeader])
464 return ta.validate_python(res[0])
466 # /entities/{entityId}
467 def get_entity_by_id(
468 self,
469 entity_id: str,
470 *,
471 attrs: str = None,
472 entity_type: str = None,
473 aggr_method: Union[str, AggrMethod] = None,
474 aggr_period: Union[str, AggrPeriod] = None,
475 from_date: str = None,
476 to_date: str = None,
477 last_n: int = None,
478 limit: int = 10000,
479 offset: int = None,
480 georel: str = None,
481 geometry: str = None,
482 coords: str = None,
483 options: str = None,
484 ) -> TimeSeries:
485 """
486 History of N attributes of a given entity instance
487 For example, query max water level of the central tank throughout the
488 last year. Queries can get more
489 sophisticated with the use of filters and query attributes.
491 Args:
492 entity_id (String): Entity id is required.
493 attrs (String):
494 Comma-separated list of attribute names whose data are to be
495 included in the response. The attributes are retrieved in the
496 order specified by this parameter. If not specified, all
497 attributes are included in the response in arbitrary order.
498 entity_type (String): Comma-separated list of entity types whose
499 data are to be included in the response.
500 aggr_method (String):
501 The function to apply to the raw data filtered by the query
502 parameters. If not given, the returned data are the same raw
503 inserted data.
505 Allowed values: count, sum, avg, min, max
506 aggr_period (String):
507 If not defined, the aggregation will apply to all the values
508 contained in the search result. If defined, the aggregation
509 function will instead be applied N times, once for each
510 period, and all those results will be considered for the
511 response. For example, a query asking for the average
512 temperature of an attribute will typically return 1 value.
513 However, with an aggregationPeriod of day, you get the daily
514 average of the temperature instead (more than one value
515 assuming you had measurements across many days within the
516 scope of your search result). aggrPeriod must be accompanied
517 by an aggrMethod, and the aggrMethod will be applied to all
518 the numeric attributes specified in attrs; the rest of the
519 non-numerical attrs will be ignored. By default, the response
520 is grouped by entity_id. See aggrScope to create aggregation
521 across entities:
523 Allowed values: year, month, day, hour, minute, second
525 from_date (String):
526 The starting date and time (inclusive) from which the context
527 information is queried. Must be in ISO8601 format (e.g.,
528 2018-01-05T15:44:34)
529 to_date (String):
530 The final date and time (inclusive) from which the context
531 information is queried. Must be in ISO8601 format (e.g.,
532 2018-01-05T15:44:34)
533 last_n (int):
534 Used to request only the last N values that satisfy the
535 request conditions.
536 limit (int): Maximum number of results to be retrieved.
537 Default value : 10000
538 offset (int):
539 Offset to apply to the response results.
540 georel (String):
541 It specifies a spatial relationship between matching entities
542 and a reference shape (geometry). This parameter is used to
543 perform geographical queries with the same semantics as in the
544 FIWARE-NGSI v2 Specification. Full details can be found in the
545 Geographical Queries section of the specification:
546 https://fiware.github.io/specifications/ngsiv2/stable/.
547 geometry (String):
548 Required if georel is specified. point, line, polygon, box
549 coords (String):
550 Optional but required if georel is specified. This parameter
551 defines the reference shape (geometry) in terms of WGS 84
552 coordinates and has the same semantics as in the
553 FIWARE-NGSI v2 Specification, except we only accept coordinates
554 in decimal degrees---e.g. 40.714,-74.006 is okay, but not
555 40 42' 51'',74 0' 21''. Full details can be found in the
556 Geographical Queries section of the specification:
557 https://fiware.github.io/specifications/ngsiv2/stable/.
558 options (String): Key value pair options.
560 Returns:
561 TimeSeries
562 """
563 url = urljoin(self.base_url, f"v2/entities/{entity_id}")
564 res_q = self.__query_builder(
565 url=url,
566 attrs=attrs,
567 options=options,
568 entity_type=entity_type,
569 aggr_method=aggr_method,
570 aggr_period=aggr_period,
571 from_date=from_date,
572 to_date=to_date,
573 last_n=last_n,
574 limit=limit,
575 offset=offset,
576 georel=georel,
577 geometry=geometry,
578 coords=coords,
579 )
580 # merge response chunks
581 res = TimeSeries.model_validate(res_q.popleft())
582 for item in res_q:
583 res.extend(TimeSeries.model_validate(item))
585 return res
587 # /entities/{entityId}/value
588 def get_entity_values_by_id(
589 self,
590 entity_id: str,
591 *,
592 attrs: str = None,
593 entity_type: str = None,
594 aggr_method: Union[str, AggrMethod] = None,
595 aggr_period: Union[str, AggrPeriod] = None,
596 from_date: str = None,
597 to_date: str = None,
598 last_n: int = None,
599 limit: int = 10000,
600 offset: int = None,
601 georel: str = None,
602 geometry: str = None,
603 coords: str = None,
604 options: str = None,
605 ) -> TimeSeries:
606 """
607 History of N attributes (values only) of a given entity instance
608 For example, query the average pressure, temperature and humidity (
609 values only, no metadata) of this
610 month in the weather station WS1.
612 Args:
613 entity_id (String): Entity id is required.
614 attrs (String): Comma-separated list of attribute names
615 entity_type (String): Comma-separated list of entity types whose
616 data are to be included in the response.
617 aggr_method (String): The function to apply to the raw data
618 filtered. count, sum, avg, min, max
619 aggr_period (String): year, month, day, hour, minute, second
620 from_date (String): Starting date and time inclusive.
621 to_date (String): Final date and time inclusive.
622 last_n (int): Request only the last N values.
623 limit (int): Maximum number of results to be retrieved.
624 Default value : 10000
625 offset (int): Offset for the results.
626 georel (String): Geographical pattern
627 geometry (String): Required if georel is specified. point, line,
628 polygon, box
629 coords (String): Required if georel is specified.
630 e.g. 40.714,-74.006
631 options (String): Key value pair options.
633 Returns:
634 Response Model
635 """
636 url = urljoin(self.base_url, f"v2/entities/{entity_id}/value")
637 res_q = self.__query_builder(
638 url=url,
639 attrs=attrs,
640 options=options,
641 entity_type=entity_type,
642 aggr_method=aggr_method,
643 aggr_period=aggr_period,
644 from_date=from_date,
645 to_date=to_date,
646 last_n=last_n,
647 limit=limit,
648 offset=offset,
649 georel=georel,
650 geometry=geometry,
651 coords=coords,
652 )
654 # merge response chunks
655 res = TimeSeries(entityId=entity_id, **res_q.popleft())
656 for item in res_q:
657 res.extend(TimeSeries(entityId=entity_id, **item))
659 return res
661 # /entities/{entityId}/attrs/{attrName}
662 def get_entity_attr_by_id(
663 self,
664 entity_id: str,
665 attr_name: str,
666 *,
667 entity_type: str = None,
668 aggr_method: Union[str, AggrMethod] = None,
669 aggr_period: Union[str, AggrPeriod] = None,
670 from_date: str = None,
671 to_date: str = None,
672 last_n: int = None,
673 limit: int = 10000,
674 offset: int = None,
675 georel: str = None,
676 geometry: str = None,
677 coords: str = None,
678 options: str = None,
679 ) -> TimeSeries:
680 """
681 History of an attribute of a given entity instance
682 For example, query max water level of the central tank throughout the
683 last year. Queries can get more
684 sophisticated with the use of filters and query attributes.
686 Args:
687 entity_id (String): Entity id is required.
688 attr_name (String): The attribute name is required.
689 entity_type (String): Comma-separated list of entity types whose
690 data are to be included in the response.
691 aggr_method (String): The function to apply to the raw data
692 filtered. count, sum, avg, min, max
693 aggr_period (String): year, month, day, hour, minute, second
694 from_date (String): Starting date and time inclusive.
695 to_date (String): Final date and time inclusive.
696 last_n (int): Request only the last N values.
697 limit (int): Maximum number of results to be retrieved.
698 Default value : 10000
699 offset (int): Offset for the results.
700 georel (String): Geographical pattern
701 geometry (String): Required if georel is specified. point, line,
702 polygon, box
703 coords (String): Required if georel is specified.
704 e.g. 40.714,-74.006
705 options (String): Key value pair options.
707 Returns:
708 Response Model
709 """
710 url = urljoin(self.base_url, f"v2/entities/{entity_id}/attrs" f"/{attr_name}")
711 req_q = self.__query_builder(
712 url=url,
713 entity_id=entity_id,
714 options=options,
715 entity_type=entity_type,
716 aggr_method=aggr_method,
717 aggr_period=aggr_period,
718 from_date=from_date,
719 to_date=to_date,
720 last_n=last_n,
721 limit=limit,
722 offset=offset,
723 georel=georel,
724 geometry=geometry,
725 coords=coords,
726 )
728 # merge response chunks
729 first = req_q.popleft()
730 res = TimeSeries(
731 entityId=entity_id,
732 index=first.get("index"),
733 attributes=[AttributeValues(**first)],
734 )
735 for item in req_q:
736 res.extend(
737 TimeSeries(
738 entityId=entity_id,
739 index=item.get("index"),
740 attributes=[AttributeValues(**item)],
741 )
742 )
744 return res
746 # /entities/{entityId}/attrs/{attrName}/value
747 def get_entity_attr_values_by_id(
748 self,
749 entity_id: str,
750 attr_name: str,
751 *,
752 entity_type: str = None,
753 aggr_method: Union[str, AggrMethod] = None,
754 aggr_period: Union[str, AggrPeriod] = None,
755 from_date: str = None,
756 to_date: str = None,
757 last_n: int = None,
758 limit: int = 10000,
759 offset: int = None,
760 georel: str = None,
761 geometry: str = None,
762 coords: str = None,
763 options: str = None,
764 ) -> TimeSeries:
765 """
766 History of an attribute (values only) of a given entity instance
767 Similar to the previous, but focusing on the values regardless of the
768 metadata.
770 Args:
771 entity_id (String): Entity id is required.
772 attr_name (String): The attribute name is required.
773 entity_type (String): Comma-separated list of entity types whose
774 data are to be included in the response.
775 aggr_method (String): The function to apply to the raw data
776 filtered. count, sum, avg, min, max
777 aggr_period (String): year, month, day, hour, minute, second
778 from_date (String): Starting date and time inclusive.
779 to_date (String): Final date and time inclusive.
780 last_n (int): Request only the last N values.
781 limit (int): Maximum number of results to be retrieved.
782 Default value : 10000
783 offset (int): Offset for the results.
784 georel (String): Geographical pattern
785 geometry (String): Required if georel is specified. point, line,
786 polygon, box
787 coords (String): Required if georel is specified.
788 e.g. 40.714,-74.006
789 options (String): Key value pair options.
791 Returns:
792 Response Model
793 """
794 url = urljoin(
795 self.base_url, f"v2/entities/{entity_id}/attrs" f"/{attr_name}/value"
796 )
797 res_q = self.__query_builder(
798 url=url,
799 options=options,
800 entity_type=entity_type,
801 aggr_method=aggr_method,
802 aggr_period=aggr_period,
803 from_date=from_date,
804 to_date=to_date,
805 last_n=last_n,
806 limit=limit,
807 offset=offset,
808 georel=georel,
809 geometry=geometry,
810 coords=coords,
811 )
812 # merge response chunks
813 first = res_q.popleft()
814 res = TimeSeries(
815 entityId=entity_id,
816 index=first.get("index"),
817 attributes=[
818 AttributeValues(attrName=attr_name, values=first.get("values"))
819 ],
820 )
821 for item in res_q:
822 res.extend(
823 TimeSeries(
824 entityId=entity_id,
825 index=item.get("index"),
826 attributes=[
827 AttributeValues(attrName=attr_name, values=item.get("values"))
828 ],
829 )
830 )
832 return res
834 # /types/{entityType}
835 def get_entity_by_type(
836 self,
837 entity_type: str,
838 *,
839 attrs: str = None,
840 entity_id: str = None,
841 id_pattern: str = None,
842 aggr_method: Union[str, AggrMethod] = None,
843 aggr_period: Union[str, AggrPeriod] = None,
844 from_date: str = None,
845 to_date: str = None,
846 last_n: int = None,
847 limit: int = 10000,
848 offset: int = None,
849 georel: str = None,
850 geometry: str = None,
851 coords: str = None,
852 options: str = None,
853 aggr_scope: Union[str, AggrScope] = None,
854 ) -> List[TimeSeries]:
855 """
856 History of N attributes of N entities of the same type.
857 For example, query the average pressure, temperature and humidity of
858 this month in all the weather stations.
859 """
860 url = urljoin(self.base_url, f"v2/types/{entity_type}")
861 res_q = self.__query_builder(
862 url=url,
863 entity_id=entity_id,
864 id_pattern=id_pattern,
865 attrs=attrs,
866 options=options,
867 aggr_method=aggr_method,
868 aggr_period=aggr_period,
869 from_date=from_date,
870 to_date=to_date,
871 last_n=last_n,
872 limit=limit,
873 offset=offset,
874 georel=georel,
875 geometry=geometry,
876 coords=coords,
877 aggr_scope=aggr_scope,
878 )
880 # merge chunks of response
881 res = [
882 TimeSeries(entityType=entity_type, **item)
883 for item in res_q.popleft().get("entities")
884 ]
886 for chunk in res_q:
887 chunk = [
888 TimeSeries(entityType=entity_type, **item)
889 for item in chunk.get("entities")
890 ]
891 for new, old in zip(chunk, res):
892 old.extend(new)
894 return res
896 # /types/{entityType}/value
897 def get_entity_values_by_type(
898 self,
899 entity_type: str,
900 *,
901 attrs: str = None,
902 entity_id: str = None,
903 id_pattern: str = None,
904 aggr_method: Union[str, AggrMethod] = None,
905 aggr_period: Union[str, AggrPeriod] = None,
906 from_date: str = None,
907 to_date: str = None,
908 last_n: int = None,
909 limit: int = 10000,
910 offset: int = None,
911 georel: str = None,
912 geometry: str = None,
913 coords: str = None,
914 options: str = None,
915 aggr_scope: Union[str, AggrScope] = None,
916 ) -> List[TimeSeries]:
917 """
918 History of N attributes (values only) of N entities of the same type.
919 For example, query the average pressure, temperature and humidity (
920 values only, no metadata) of this month in
921 all the weather stations.
922 """
923 url = urljoin(self.base_url, f"v2/types/{entity_type}/value")
924 res_q = self.__query_builder(
925 url=url,
926 entity_id=entity_id,
927 id_pattern=id_pattern,
928 attrs=attrs,
929 options=options,
930 entity_type=entity_type,
931 aggr_method=aggr_method,
932 aggr_period=aggr_period,
933 from_date=from_date,
934 to_date=to_date,
935 last_n=last_n,
936 limit=limit,
937 offset=offset,
938 georel=georel,
939 geometry=geometry,
940 coords=coords,
941 aggr_scope=aggr_scope,
942 )
943 # merge chunks of response
944 res = [
945 TimeSeries(entityType=entity_type, **item)
946 for item in res_q.popleft().get("values")
947 ]
949 for chunk in res_q:
950 chunk = [
951 TimeSeries(entityType=entity_type, **item)
952 for item in chunk.get("values")
953 ]
954 for new, old in zip(chunk, res):
955 old.extend(new)
957 return res
959 # /types/{entityType}/attrs/{attrName}
960 def get_entity_attr_by_type(
961 self,
962 entity_type: str,
963 attr_name: str,
964 *,
965 entity_id: str = None,
966 id_pattern: str = None,
967 aggr_method: Union[str, AggrMethod] = None,
968 aggr_period: Union[str, AggrPeriod] = None,
969 from_date: str = None,
970 to_date: str = None,
971 last_n: int = None,
972 limit: int = 10000,
973 offset: int = None,
974 georel: str = None,
975 geometry: str = None,
976 coords: str = None,
977 options: str = None,
978 aggr_scope: Union[str, AggrScope] = None,
979 ) -> List[TimeSeries]:
980 """
981 History of an attribute of N entities of the same type.
982 For example, query the pressure measurements of this month in all the
983 weather stations. Note in the response,
984 the index and values arrays are parallel. Also, when using
985 aggrMethod, the aggregation is done by-entity
986 instance. In this case, the index array is just the fromDate and
987 toDate values user specified in the request
988 (if any).
990 Args:
991 entity_type (String): Entity type is required.
992 attr_name (String): The attribute name is required.
993 entity_id (String): Comma-separated list of entity ids whose data
994 are to be included in the response.
995 aggr_method (String): The function to apply to the raw data
996 filtered. count, sum, avg, min, max
997 aggr_period (String): year, month, day, hour, minute, second
998 aggr_scope (str): Optional. (This parameter is not yet supported).
999 When the query results cover historical data for multiple
1000 entities instances, you can define the aggregation method to be
1001 applied for each entity instance [entity] or across
1002 them [global]
1003 from_date (String): Starting date and time inclusive.
1004 to_date (String): Final date and time inclusive.
1005 last_n (int): Request only the last N values.
1006 limit (int): Maximum number of results to be retrieved.
1007 Default value : 10000
1008 offset (int): Offset for the results.
1009 georel (String): Geographical pattern
1010 geometry (String): Required if georel is specified. point, line,
1011 polygon, box
1012 coords (String): Required if georel is specified.
1013 e.g. 40.714,-74.006
1014 options (String): Key value pair options.
1016 Returns:
1017 Response Model
1018 """
1019 url = urljoin(self.base_url, f"v2/types/{entity_type}/attrs" f"/{attr_name}")
1020 res_q = self.__query_builder(
1021 url=url,
1022 entity_id=entity_id,
1023 id_pattern=id_pattern,
1024 options=options,
1025 entity_type=entity_type,
1026 aggr_method=aggr_method,
1027 aggr_period=aggr_period,
1028 from_date=from_date,
1029 to_date=to_date,
1030 last_n=last_n,
1031 limit=limit,
1032 offset=offset,
1033 georel=georel,
1034 geometry=geometry,
1035 coords=coords,
1036 aggr_scope=aggr_scope,
1037 )
1039 # merge chunks of response
1040 first = res_q.popleft()
1041 res = [
1042 TimeSeries(
1043 index=item.get("index"),
1044 entityType=entity_type,
1045 entityId=item.get("entityId"),
1046 attributes=[
1047 AttributeValues(
1048 attrName=first.get("attrName"), values=item.get("values")
1049 )
1050 ],
1051 )
1052 for item in first.get("entities")
1053 ]
1055 for chunk in res_q:
1056 chunk = [
1057 TimeSeries(
1058 index=item.get("index"),
1059 entityType=entity_type,
1060 entityId=item.get("entityId"),
1061 attributes=[
1062 AttributeValues(
1063 attrName=chunk.get("attrName"), values=item.get("values")
1064 )
1065 ],
1066 )
1067 for item in chunk.get("entities")
1068 ]
1069 for new, old in zip(chunk, res):
1070 old.extend(new)
1072 return res
1074 # /types/{entityType}/attrs/{attrName}/value
1075 def get_entity_attr_values_by_type(
1076 self,
1077 entity_type: str,
1078 attr_name: str,
1079 *,
1080 entity_id: str = None,
1081 id_pattern: str = None,
1082 aggr_method: Union[str, AggrMethod] = None,
1083 aggr_period: Union[str, AggrPeriod] = None,
1084 from_date: str = None,
1085 to_date: str = None,
1086 last_n: int = None,
1087 limit: int = 10000,
1088 offset: int = None,
1089 georel: str = None,
1090 geometry: str = None,
1091 coords: str = None,
1092 options: str = None,
1093 aggr_scope: Union[str, AggrScope] = None,
1094 ) -> List[TimeSeries]:
1095 """
1096 History of an attribute (values only) of N entities of the same type.
1097 For example, query the average pressure (values only, no metadata) of
1098 this month in all the weather stations.
1100 Args:
1101 aggr_scope:
1102 entity_type (String): Entity type is required.
1103 attr_name (String): The attribute name is required.
1104 entity_id (String): Comma-separated list of entity ids whose data
1105 are to be included in the response.
1106 aggr_method (String): The function to apply to the raw data
1107 filtered. count, sum, avg, min, max
1108 aggr_period (String): year, month, day, hour, minute, second
1109 aggr_scope (String):
1110 from_date (String): Starting date and time inclusive.
1111 to_date (String): Final date and time inclusive.
1112 last_n (int): Request only the last N values.
1113 limit (int): Maximum number of results to be retrieved.
1114 Default value : 10000
1115 offset (int): Offset for the results.
1116 georel (String): Geographical pattern
1117 geometry (String): Required if georel is specified. point, line,
1118 polygon, box
1119 coords (String): Required if georel is specified.
1120 e.g. 40.714,-74.006
1121 options (String): Key value pair options.
1123 Returns:
1124 Response Model
1125 """
1126 url = urljoin(
1127 self.base_url, f"v2/types/{entity_type}/attrs/" f"{attr_name}/value"
1128 )
1129 res_q = self.__query_builder(
1130 url=url,
1131 entity_id=entity_id,
1132 id_pattern=id_pattern,
1133 options=options,
1134 entity_type=entity_type,
1135 aggr_method=aggr_method,
1136 aggr_period=aggr_period,
1137 from_date=from_date,
1138 to_date=to_date,
1139 last_n=last_n,
1140 limit=limit,
1141 offset=offset,
1142 georel=georel,
1143 geometry=geometry,
1144 coords=coords,
1145 aggr_scope=aggr_scope,
1146 )
1148 # merge chunks of response
1149 res = [
1150 TimeSeries(
1151 index=item.get("index"),
1152 entityType=entity_type,
1153 entityId=item.get("entityId"),
1154 attributes=[
1155 AttributeValues(attrName=attr_name, values=item.get("values"))
1156 ],
1157 )
1158 for item in res_q.popleft().get("values")
1159 ]
1161 for chunk in res_q:
1162 chunk = [
1163 TimeSeries(
1164 index=item.get("index"),
1165 entityType=entity_type,
1166 entityId=item.get("entityId"),
1167 attributes=[
1168 AttributeValues(attrName=attr_name, values=item.get("values"))
1169 ],
1170 )
1171 for item in chunk.get("values")
1172 ]
1174 for new, old in zip(chunk, res):
1175 old.extend(new)
1176 return res
1178 # v2/attrs
1179 def get_entity_by_attrs(
1180 self,
1181 *,
1182 entity_type: str = None,
1183 from_date: str = None,
1184 to_date: str = None,
1185 limit: int = 10000,
1186 offset: int = None,
1187 ) -> List[TimeSeries]:
1188 """
1189 Get list of timeseries data grouped by each existing attribute name.
1190 The timeseries data include all entities corresponding to each
1191 attribute name as well as the index and values of this attribute in
1192 this entity.
1194 Args:
1195 entity_type (str): Comma-separated list of entity types whose data
1196 are to be included in the response. Use only one (no comma)
1197 when required. If used to resolve ambiguity for the given
1198 entityId, make sure the given entityId exists for this
1199 entityType.
1200 from_date (str): The starting date and time (inclusive) from which
1201 the context information is queried. Must be in ISO8601 format
1202 (e.g., 2018-01-05T15:44:34)
1203 to_date (str): The final date and time (inclusive) from which the
1204 context information is queried. Must be in ISO8601 format
1205 (e.g., 2018-01-05T15:44:34).
1206 limit (int): Maximum number of results to be retrieved.
1207 Default value : 10000
1208 offset (int): Offset for the results.
1210 Returns:
1211 List of TimeSeriesEntities
1212 """
1213 url = urljoin(self.base_url, "v2/attrs")
1214 res_q = self.__query_builder(
1215 url=url,
1216 entity_type=entity_type,
1217 from_date=from_date,
1218 to_date=to_date,
1219 limit=limit,
1220 offset=offset,
1221 )
1222 first = res_q.popleft()
1224 res = chain.from_iterable(
1225 map(lambda x: self.transform_attr_response_model(x), first.get("attrs"))
1226 )
1227 for chunk in res_q:
1228 chunk = chain.from_iterable(
1229 map(lambda x: self.transform_attr_response_model(x), chunk.get("attrs"))
1230 )
1232 for new, old in zip(chunk, res):
1233 old.extend(new)
1235 return list(res)
1237 # v2/attrs/{attr_name}
1238 def get_entity_by_attr_name(
1239 self,
1240 *,
1241 attr_name: str,
1242 entity_type: str = None,
1243 from_date: str = None,
1244 to_date: str = None,
1245 limit: int = 10000,
1246 offset: int = None,
1247 ) -> List[TimeSeries]:
1248 """
1249 Get list of all entities containing this attribute name, as well as
1250 getting the index and values of this attribute in every corresponding
1251 entity.
1253 Args:
1254 attr_name (str): The attribute name in interest.
1255 entity_type (str): Comma-separated list of entity types whose data
1256 are to be included in the response. Use only one (no comma)
1257 when required. If used to resolve ambiguity for the given
1258 entityId, make sure the given entityId exists for this
1259 entityType.
1260 from_date (str): The starting date and time (inclusive) from which
1261 the context information is queried. Must be in ISO8601 format
1262 (e.g., 2018-01-05T15:44:34)
1263 to_date (str): The final date and time (inclusive) from which the
1264 context information is queried. Must be in ISO8601 format
1265 (e.g., 2018-01-05T15:44:34).
1266 limit (int): Maximum number of results to be retrieved.
1267 Default value : 10000
1268 offset (int): Offset for the results.
1270 Returns:
1271 List of TimeSeries
1272 """
1273 url = urljoin(self.base_url, f"/v2/attrs/{attr_name}")
1274 res_q = self.__query_builder(
1275 url=url,
1276 entity_type=entity_type,
1277 from_date=from_date,
1278 to_date=to_date,
1279 limit=limit,
1280 offset=offset,
1281 )
1283 first = res_q.popleft()
1284 res = self.transform_attr_response_model(first)
1286 for chunk in res_q:
1287 chunk = self.transform_attr_response_model(chunk)
1288 for new, old in zip(chunk, res):
1289 old.extend(new)
1290 return list(res)
1292 def transform_attr_response_model(self, attr_response):
1293 res = []
1294 attr_name = attr_response.get("attrName")
1295 for entity_group in attr_response.get("types"):
1296 timeseries = map(
1297 lambda entity: TimeSeries(
1298 entityId=entity.get("entityId"),
1299 entityType=entity_group.get("entityType"),
1300 index=entity.get("index"),
1301 attributes=[
1302 AttributeValues(attrName=attr_name, values=entity.get("values"))
1303 ],
1304 ),
1305 entity_group.get("entities"),
1306 )
1307 res.append(timeseries)
1308 return chain.from_iterable(res)