Coverage for filip/models/ngsi_v2/timeseries.py: 100%

55 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-02-19 11:48 +0000

1""" 

2Data models for interacting with FIWARE's time series-api (aka QuantumLeap) 

3""" 

4 

5from __future__ import annotations 

6import logging 

7from typing import Any, List, Union 

8from datetime import datetime 

9import numpy as np 

10import pandas as pd 

11from aenum import Enum 

12from pydantic import ConfigDict, BaseModel, Field 

13 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18class TimeSeriesBase(BaseModel): 

19 """ 

20 Base model for other time series api models 

21 """ 

22 

23 index: Union[List[datetime], datetime] = Field( 

24 default=None, 

25 description="Array of the timestamps which are indexes of the response " 

26 "for the requested data. It's a parallel array to 'values'." 

27 " The timestamp will be in the ISO8601 format " 

28 "(e.g. 2010-10-10T07:09:00.792) or in milliseconds since " 

29 "epoch whichever format was used in the input " 

30 "(notification), but ALWAYS in UTC. When using aggregation " 

31 "options, the format of this remains the same, only the " 

32 "semantics will change. For example, if aggrPeriod is day, " 

33 "each index will be a valid timestamp of a moment in the " 

34 "corresponding day.", 

35 ) 

36 

37 

38class TimeSeriesHeader(TimeSeriesBase): 

39 """ 

40 Model to describe an available entity in the time series api 

41 """ 

42 

43 model_config = ConfigDict(populate_by_name=True) 

44 # aliases are required due to formally inconsistencies in the api-specs 

45 entityId: str = Field( 

46 default=None, 

47 alias="id", 

48 description="The entity id the time series api." 

49 "If the id is unique among all entity " 

50 "types, this could be used to uniquely " 

51 "identify the entity instance. Otherwise," 

52 " you will have to use the entityType " 

53 "attribute to resolve ambiguity.", 

54 ) 

55 entityType: str = Field( 

56 default=None, alias="type", description="The type of an entity" 

57 ) 

58 

59 

60class IndexedValues(BaseModel): 

61 """ 

62 Model for time indexed values 

63 """ 

64 

65 values: List[Any] = Field( 

66 default=None, 

67 description="Array of values of the selected attribute, in the same " 

68 "corresponding order of the 'index' array. When using " 

69 "aggregation options, the format of this remains the same, " 

70 "only the semantics will change. For example, if " 

71 "aggrPeriod is day, each value of course may not " 

72 "correspond to original measurements but rather the " 

73 "aggregate of measurements in each day.", 

74 ) 

75 

76 

77class AttributeValues(IndexedValues): 

78 """ 

79 Model for indexed values that contain attribute name 

80 """ 

81 

82 attrName: str = Field(title="Attribute name", description="") 

83 

84 

85class TimeSeries(TimeSeriesHeader): 

86 """ 

87 Model for time series data 

88 """ 

89 

90 model_config = ConfigDict(populate_by_name=True) 

91 attributes: List[AttributeValues] = None 

92 

93 def extend(self, other: TimeSeries) -> None: 

94 """ 

95 Extends the current `TimeSeries` object with an other 

96 `TimeSeries` object. With the same format. 

97 

98 Args: 

99 other: TimeSeries Object that will be added to the original object 

100 

101 Returns: 

102 None 

103 

104 Raises: 

105 Assertion Error: if header fields do not fit or if index is not 

106 rising 

107 """ 

108 assert self.entityId == other.entityId 

109 assert self.entityType == other.entityType 

110 assert self.index[-1] < other.index[0] 

111 

112 for attr, other_attr in zip(self.attributes, other.attributes): 

113 assert attr.attrName == other_attr.attrName 

114 attr.values.extend(other_attr.values) 

115 self.index.extend(other.index) 

116 

117 def to_pandas(self) -> pd.DataFrame: 

118 """ 

119 Converts time series data to pandas dataframe 

120 Returns: 

121 pandas.DataFrame 

122 """ 

123 index = pd.Index(data=self.index, name="datetime") 

124 attr_names = [attr.attrName for attr in self.attributes] 

125 values = np.array([attr.values for attr in self.attributes]).transpose() 

126 columns = pd.MultiIndex.from_product( 

127 [[self.entityId], [self.entityType], attr_names], 

128 names=["entityId", "entityType", "attribute"], 

129 ) 

130 

131 return pd.DataFrame(data=values, index=index, columns=columns) 

132 

133 

134class AggrMethod(str, Enum): 

135 """ 

136 Aggregation Methods 

137 """ 

138 

139 _init_ = "value __doc__" 

140 COUNT = "count", "Number of Entries" 

141 SUM = "sum", "Sum" 

142 AVG = "avg", "Average" 

143 MIN = "min", "Minimum" 

144 MAX = "max", "Maximum" 

145 

146 

147class AggrPeriod(str, Enum): 

148 """ 

149 Aggregation Periods 

150 """ 

151 

152 _init_ = "value __doc__" 

153 YEAR = "year", "year" 

154 MONTH = "month", "month" 

155 DAY = "day", "day" 

156 HOUR = "hour", "hour" 

157 MINUTE = "minute", "minute" 

158 SECOND = "second", "second" 

159 

160 

161class AggrScope(str, Enum): 

162 """ 

163 Aggregation Periods 

164 When the query results cover historical data for 

165 multiple entities instances, you can define the aggregation method to be 

166 applied for each entity instance [entity] or across them [global]. 

167 """ 

168 

169 _init_ = "value __doc__" 

170 ENTITY = "entity", "Entity (default)" 

171 GLOBAL = "global", "Global"