Coverage for filip/models/ngsi_v2/timeseries.py: 100%
55 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-20 16:54 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-11-20 16:54 +0000
1"""
2Data models for interacting with FIWARE's time series-api (aka QuantumLeap)
3"""
4from __future__ import annotations
5import logging
6from typing import Any, List, Union
7from datetime import datetime
8import numpy as np
9import pandas as pd
10from aenum import Enum
11from pydantic import ConfigDict, BaseModel, Field
14logger = logging.getLogger(__name__)
17class TimeSeriesBase(BaseModel):
18 """
19 Base model for other time series api models
20 """
21 index: Union[List[datetime], datetime] = Field(
22 default=None,
23 description="Array of the timestamps which are indexes of the response "
24 "for the requested data. It's a parallel array to 'values'."
25 " The timestamp will be in the ISO8601 format "
26 "(e.g. 2010-10-10T07:09:00.792) or in milliseconds since "
27 "epoch whichever format was used in the input "
28 "(notification), but ALWAYS in UTC. When using aggregation "
29 "options, the format of this remains the same, only the "
30 "semantics will change. For example, if aggrPeriod is day, "
31 "each index will be a valid timestamp of a moment in the "
32 "corresponding day."
33 )
36class TimeSeriesHeader(TimeSeriesBase):
37 """
38 Model to describe an available entity in the time series api
39 """
40 model_config = ConfigDict(populate_by_name=True)
41 # aliases are required due to formally inconsistencies in the api-specs
42 entityId: str = Field(default=None,
43 alias="id",
44 description="The entity id the time series api."
45 "If the id is unique among all entity "
46 "types, this could be used to uniquely "
47 "identify the entity instance. Otherwise,"
48 " you will have to use the entityType "
49 "attribute to resolve ambiguity.")
50 entityType: str = Field(default=None,
51 alias="type",
52 description="The type of an entity")
55class IndexedValues(BaseModel):
56 """
57 Model for time indexed values
58 """
59 values: List[Any] = Field(
60 default=None,
61 description="Array of values of the selected attribute, in the same "
62 "corresponding order of the 'index' array. When using "
63 "aggregation options, the format of this remains the same, "
64 "only the semantics will change. For example, if "
65 "aggrPeriod is day, each value of course may not "
66 "correspond to original measurements but rather the "
67 "aggregate of measurements in each day."
68 )
71class AttributeValues(IndexedValues):
72 """
73 Model for indexed values that contain attribute name
74 """
75 attrName: str = Field(
76 title="Attribute name",
77 description=""
78 )
81class TimeSeries(TimeSeriesHeader):
82 """
83 Model for time series data
84 """
85 model_config = ConfigDict(populate_by_name=True)
86 attributes: List[AttributeValues] = None
88 def extend(self, other: TimeSeries) -> None:
89 """
90 Extends the current `TimeSeries` object with an other
91 `TimeSeries` object. With the same format.
93 Args:
94 other: TimeSeries Object that will be added to the original object
96 Returns:
97 None
99 Raises:
100 Assertion Error: if header fields do not fit or if index is not
101 rising
102 """
103 assert self.entityId == other.entityId
104 assert self.entityType == other.entityType
105 assert self.index[-1] < other.index[0]
107 for attr, other_attr in zip(self.attributes, other.attributes):
108 assert attr.attrName == other_attr.attrName
109 attr.values.extend(other_attr.values)
110 self.index.extend(other.index)
112 def to_pandas(self) -> pd.DataFrame:
113 """
114 Converts time series data to pandas dataframe
115 Returns:
116 pandas.DataFrame
117 """
118 index = pd.Index(data=self.index, name='datetime')
119 attr_names = [attr.attrName for attr in self.attributes]
120 values = np.array([attr.values for attr in self.attributes]).transpose()
121 columns = pd.MultiIndex.from_product(
122 [[self.entityId], [self.entityType], attr_names],
123 names=['entityId', 'entityType', 'attribute'])
125 return pd.DataFrame(data=values, index=index, columns=columns)
128class AggrMethod(str, Enum):
129 """
130 Aggregation Methods
131 """
132 _init_ = 'value __doc__'
133 COUNT = "count", "Number of Entries"
134 SUM = "sum", "Sum"
135 AVG = "avg", "Average"
136 MIN = "min", "Minimum"
137 MAX = "max", "Maximum"
140class AggrPeriod(str, Enum):
141 """
142 Aggregation Periods
143 """
144 _init_ = 'value __doc__'
145 YEAR = "year", "year"
146 MONTH = "month", "month"
147 DAY = "day", "day"
148 HOUR = "hour", "hour"
149 MINUTE = "minute", "minute"
150 SECOND = "second", "second"
153class AggrScope(str, Enum):
154 """
155 Aggregation Periods
156 When the query results cover historical data for
157 multiple entities instances, you can define the aggregation method to be
158 applied for each entity instance [entity] or across them [global].
159 """
160 _init_ = 'value __doc__'
161 ENTITY = "entity", "Entity (default)"
162 GLOBAL = "global", "Global"