Coverage for filip/models/ngsi_v2/timeseries.py: 100%

55 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-11-20 16:54 +0000

1""" 

2Data models for interacting with FIWARE's time series-api (aka QuantumLeap) 

3""" 

4from __future__ import annotations 

5import logging 

6from typing import Any, List, Union 

7from datetime import datetime 

8import numpy as np 

9import pandas as pd 

10from aenum import Enum 

11from pydantic import ConfigDict, BaseModel, Field 

12 

13 

14logger = logging.getLogger(__name__) 

15 

16 

17class TimeSeriesBase(BaseModel): 

18 """ 

19 Base model for other time series api models 

20 """ 

21 index: Union[List[datetime], datetime] = Field( 

22 default=None, 

23 description="Array of the timestamps which are indexes of the response " 

24 "for the requested data. It's a parallel array to 'values'." 

25 " The timestamp will be in the ISO8601 format " 

26 "(e.g. 2010-10-10T07:09:00.792) or in milliseconds since " 

27 "epoch whichever format was used in the input " 

28 "(notification), but ALWAYS in UTC. When using aggregation " 

29 "options, the format of this remains the same, only the " 

30 "semantics will change. For example, if aggrPeriod is day, " 

31 "each index will be a valid timestamp of a moment in the " 

32 "corresponding day." 

33 ) 

34 

35 

36class TimeSeriesHeader(TimeSeriesBase): 

37 """ 

38 Model to describe an available entity in the time series api 

39 """ 

40 model_config = ConfigDict(populate_by_name=True) 

41 # aliases are required due to formally inconsistencies in the api-specs 

42 entityId: str = Field(default=None, 

43 alias="id", 

44 description="The entity id the time series api." 

45 "If the id is unique among all entity " 

46 "types, this could be used to uniquely " 

47 "identify the entity instance. Otherwise," 

48 " you will have to use the entityType " 

49 "attribute to resolve ambiguity.") 

50 entityType: str = Field(default=None, 

51 alias="type", 

52 description="The type of an entity") 

53 

54 

55class IndexedValues(BaseModel): 

56 """ 

57 Model for time indexed values 

58 """ 

59 values: List[Any] = Field( 

60 default=None, 

61 description="Array of values of the selected attribute, in the same " 

62 "corresponding order of the 'index' array. When using " 

63 "aggregation options, the format of this remains the same, " 

64 "only the semantics will change. For example, if " 

65 "aggrPeriod is day, each value of course may not " 

66 "correspond to original measurements but rather the " 

67 "aggregate of measurements in each day." 

68 ) 

69 

70 

71class AttributeValues(IndexedValues): 

72 """ 

73 Model for indexed values that contain attribute name 

74 """ 

75 attrName: str = Field( 

76 title="Attribute name", 

77 description="" 

78 ) 

79 

80 

81class TimeSeries(TimeSeriesHeader): 

82 """ 

83 Model for time series data 

84 """ 

85 model_config = ConfigDict(populate_by_name=True) 

86 attributes: List[AttributeValues] = None 

87 

88 def extend(self, other: TimeSeries) -> None: 

89 """ 

90 Extends the current `TimeSeries` object with an other 

91 `TimeSeries` object. With the same format. 

92 

93 Args: 

94 other: TimeSeries Object that will be added to the original object 

95 

96 Returns: 

97 None 

98 

99 Raises: 

100 Assertion Error: if header fields do not fit or if index is not 

101 rising 

102 """ 

103 assert self.entityId == other.entityId 

104 assert self.entityType == other.entityType 

105 assert self.index[-1] < other.index[0] 

106 

107 for attr, other_attr in zip(self.attributes, other.attributes): 

108 assert attr.attrName == other_attr.attrName 

109 attr.values.extend(other_attr.values) 

110 self.index.extend(other.index) 

111 

112 def to_pandas(self) -> pd.DataFrame: 

113 """ 

114 Converts time series data to pandas dataframe 

115 Returns: 

116 pandas.DataFrame 

117 """ 

118 index = pd.Index(data=self.index, name='datetime') 

119 attr_names = [attr.attrName for attr in self.attributes] 

120 values = np.array([attr.values for attr in self.attributes]).transpose() 

121 columns = pd.MultiIndex.from_product( 

122 [[self.entityId], [self.entityType], attr_names], 

123 names=['entityId', 'entityType', 'attribute']) 

124 

125 return pd.DataFrame(data=values, index=index, columns=columns) 

126 

127 

128class AggrMethod(str, Enum): 

129 """ 

130 Aggregation Methods 

131 """ 

132 _init_ = 'value __doc__' 

133 COUNT = "count", "Number of Entries" 

134 SUM = "sum", "Sum" 

135 AVG = "avg", "Average" 

136 MIN = "min", "Minimum" 

137 MAX = "max", "Maximum" 

138 

139 

140class AggrPeriod(str, Enum): 

141 """ 

142 Aggregation Periods 

143 """ 

144 _init_ = 'value __doc__' 

145 YEAR = "year", "year" 

146 MONTH = "month", "month" 

147 DAY = "day", "day" 

148 HOUR = "hour", "hour" 

149 MINUTE = "minute", "minute" 

150 SECOND = "second", "second" 

151 

152 

153class AggrScope(str, Enum): 

154 """ 

155 Aggregation Periods 

156 When the query results cover historical data for 

157 multiple entities instances, you can define the aggregation method to be 

158 applied for each entity instance [entity] or across them [global]. 

159 """ 

160 _init_ = 'value __doc__' 

161 ENTITY = "entity", "Entity (default)" 

162 GLOBAL = "global", "Global"