Coverage for agentlib_flexquant/utils/data_handling.py: 67%

49 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-10-20 14:09 +0000

1from typing import Literal 

2 

3import pandas as pd 

4from agentlib_mpc.utils import TIME_CONVERSION, TimeConversionTypes 

5 

6MEAN: str = "mean" 

7INTERPOLATE: str = "interpolate" 

8FillNansMethods = Literal[MEAN, INTERPOLATE] 

9 

10 

11def fill_nans(series: pd.Series, method: FillNansMethods) -> pd.Series: 

12 """Fill NaN values in the series with the given method. 

13 

14 Args: 

15 series: the series to be filled 

16 method: the method to be applied, there are two predefined 

17 - mean: fill NaN values with the mean of the following values. 

18 - interpolate: interpolate missing values. 

19 

20 Returns: 

21 A pd.Series with nan filled. 

22 

23 """ 

24 if method == MEAN: 

25 series = _set_mean_values(series=series) 

26 elif method == INTERPOLATE: 

27 # Interpolate missing values 

28 series = series.interpolate(method="index", limit_direction="both") 

29 

30 if series.isna().any(): 

31 raise ValueError( 

32 f"NaN values are still present in the series after filling them " 

33 f"with the method {method}\n{series}" 

34 ) 

35 return series 

36 

37 

38def _set_mean_values(series: pd.Series) -> pd.Series: 

39 """Fill intervals including the nan with the mean of the following values 

40 before the next nan.""" 

41 

42 def _get_intervals_for_mean(s: pd.Series) -> list[pd.Interval]: 

43 intervals = [] 

44 start = None 

45 for index, value in s.items(): 

46 if pd.isna(value): 

47 if pd.isna(start): 

48 start = index 

49 else: 

50 end = index 

51 intervals.append(pd.Interval(left=start, right=end, closed="both")) 

52 start = end 

53 elif index == s.index[-1]: 

54 end = index 

55 intervals.append(pd.Interval(left=start, right=end, closed="both")) 

56 return intervals 

57 

58 for interval in _get_intervals_for_mean(series): 

59 interval_index = (interval.left <= series.index) & ( 

60 series.index <= interval.right 

61 ) 

62 series[interval.left] = series[interval_index].mean(skipna=True) 

63 # fill the last entry of series with mean value of previous entries 

64 if interval.right == series.index[-1]: 

65 series[interval.right] = series[interval.left] 

66 

67 # remove last entry if nan, e.g. with collocation 

68 if pd.isna(series.iloc[-1]): 

69 series = series.iloc[:-1] 

70 

71 return series 

72 

73 

74def strip_multi_index(series: pd.Series) -> pd.Series: 

75 # Convert the index (communicated as string) into a MultiIndex 

76 if isinstance(series.index[0], str): 

77 series.index = series.index.map(lambda x: eval(x)) 

78 series.index = pd.MultiIndex.from_tuples(series.index) 

79 # vals is multicolumn so get rid of first value (start time of predictions) 

80 series.index = series.index.get_level_values(1).astype(float) 

81 return series 

82 

83 

84def convert_timescale_of_index( 

85 df: pd.DataFrame, from_unit: TimeConversionTypes, to_unit: TIME_CONVERSION 

86) -> pd.DataFrame: 

87 """Convert the timescale of a dataframe index (from seconds) to the given time unit. 

88 

89 Args: 

90 from_unit: the time unit of the original index 

91 to_unit: the time unit to convert the index to 

92 

93 Returns: 

94 A DataFrame with the converted index 

95 

96 """ 

97 time_conversion_factor = TIME_CONVERSION[from_unit] / TIME_CONVERSION[to_unit] 

98 if isinstance(df.index, pd.MultiIndex): 

99 df.index = pd.MultiIndex.from_arrays( 

100 [ 

101 df.index.get_level_values(level) * time_conversion_factor 

102 for level in range(df.index.nlevels) 

103 ] 

104 ) 

105 else: 

106 df.index = df.index * time_conversion_factor 

107 return df