Coverage for aixweather/transformation_functions/pass_through_handling.py: 96%

71 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-03-04 14:19 +0000

1""" 

2This module contains auxiliary functions for data transformation, e.g. time shifts 

3""" 

4import logging 

5 

6import pandas as pd 

7 

8from aixweather.transformation_functions import auxiliary 

9from aixweather.imports.utils_import import MetaData 

10 

11 

12logger = logging.getLogger(__name__) 

13 

14 

15def create_pass_through_variables( 

16 df_shifted: pd.DataFrame, 

17 df_no_shift: pd.DataFrame, 

18 format: dict, 

19 transform_func, 

20 meta: MetaData, 

21): 

22 """ 

23 This function adds unshifted data from the `df_no_shift` DataFrame to the `df_shifted` DataFrame to 

24 prevent unnecessary interpolation of values. It takes into account the required shifting and performs 

25 transformations. The appropriate pass-through variables (unshifted variables) are added to the dataframe 

26 with a suffix specifying the shifting that would be required for them. Calculated (transformed) variables 

27 are only added if the variables they were calculated from all have the same shifting (time of measurement). 

28 

29 Args: 

30 df_shifted (pd.DataFrame): The DataFrame with shifted data. 

31 df_no_shift (pd.DataFrame): The DataFrame with unshifted data. 

32 format (dict): A dictionary specifying the format (required shifting) of the data. 

33 transform_func: The transformation function from the import2core data process. 

34 meta (MetaData): Metadata associated with the data. 

35 

36 Returns: 

37 pd.DataFrame: The modified `df_shifted` DataFrame with added pass-through variables. 

38 """ 

39 

40 logger.debug("Apply transformation for pass through variables.") 

41 # perform same transformation 

42 df_no_shift, meta.executed_transformations_no_shift = transform_func(df_no_shift) 

43 

44 ### add unshifted variables present in the format_dict to the df 

45 for key, value in format.items(): 

46 if ( 

47 value["core_name"] not in meta.executed_transformations_no_shift.keys() 

48 ): # imputed variables need the shift according to their used variables 

49 shift = value["time_of_meas_shift"] 

50 if shift is not None: 

51 df_shifted[f"{value['core_name']}_no_{shift}"] = df_no_shift[ 

52 value["core_name"] 

53 ] 

54 

55 ### add unshifted variables that have been imputed from other variables 

56 for ( 

57 desired_variable, 

58 used_variables, 

59 ) in meta.executed_transformations_no_shift.items(): 

60 

61 def get_shifts_of_used_variables(used_variables, meta): 

62 # get variables that have been used for transformation 

63 used_variables_shifts = {} 

64 for var in used_variables: 

65 if isinstance(var, str): 

66 format_entry = auxiliary.select_entry_by_core_name(format, var) 

67 if ( 

68 format_entry is not None 

69 and var not in meta.executed_transformations_no_shift.keys() 

70 ): # use shift from format if var has not been calculated itself 

71 used_variables_shifts.update( 

72 {var: format_entry["time_of_meas_shift"]} 

73 ) 

74 elif format_entry is None: 

75 # then the used variable has been calculated itself and 

76 # it is not in the format 

77 # get the shifts of the variables used to calculate that used variable 

78 used_variables_of_var = meta.executed_transformations_no_shift[ 

79 var 

80 ] 

81 used_variables_shifts_of_var = get_shifts_of_used_variables( 

82 used_variables_of_var, meta 

83 ) 

84 used_variables_shifts.update(used_variables_shifts_of_var) 

85 return used_variables_shifts 

86 

87 used_variables_shifts = get_shifts_of_used_variables(used_variables, meta) 

88 

89 # define vars of which the shift should be ignored for 

90 # validity checking, e.g. slowly changing variables 

91 vars_to_ignore_shift = [] # fill in the core names 

92 for var_to_ignore in vars_to_ignore_shift: 

93 used_variables_shifts.pop(var_to_ignore, None) 

94 

95 # check whether they have all the same time shifting 

96 is_identical = len(set(used_variables_shifts.values())) == 1 

97 

98 if not is_identical: 

99 # dont add to df 

100 logger.debug( 

101 f"Calculation of the non-shifted {desired_variable} is " 

102 f"not valid due non consistent " 

103 f"time of measurement (shifting) of the required " 

104 f"variables {used_variables_shifts}. " 

105 f"There wont be a pass-through for this variable. " 

106 f"Info: If used variables have been calculated themself " 

107 f"the shift of the used variables " 

108 f"for that calculation are checked." 

109 ) 

110 else: 

111 # add to df 

112 shift = list(used_variables_shifts.values())[0] 

113 if shift is not None: 

114 df_shifted[f"{desired_variable}_no_{shift}"] = df_no_shift[ 

115 desired_variable 

116 ] 

117 

118 return df_shifted 

119 

120 

121def _find_pass_through_core_names(columns: list, output_format: dict) -> list: 

122 """ 

123 Identify pass-through variable names based on the output format and their column suffix. 

124 

125 This function analyzes a list of column names and identifies those that represent pass-through 

126 core variables based on the provided output format. It takes into account suffix mappings 

127 to match the required shifting. 

128 suffix mapping key = desired shifting during import 

129 suffix mapping value = desired shifting during export 

130 

131 Args: 

132 columns (list): A list of column names to analyze. 

133 output_format (dict): A dictionary specifying the desired format and shifting of the data. 

134 

135 Returns: 

136 list: A list of column names representing pass-through variables that shall actually be 

137 passed through. 

138 """ 

139 

140 selected_columns = [] 

141 suffix_mapping = {"_no_prec2ind": "ind2prec", "_no_foll2ind": "ind2foll"} 

142 

143 for col in columns: 

144 core_name = col.split("_no_")[0] 

145 if core_name in output_format: 

146 suffix = col[len(core_name) :] 

147 if ( 

148 suffix in suffix_mapping 

149 and output_format[core_name]["time_of_meas_shift"] 

150 == suffix_mapping[suffix] 

151 ): 

152 selected_columns.append(col) 

153 

154 return selected_columns 

155 

156 

157def _find_and_apply_full_hour_shifts(df: pd.DataFrame, output_format: dict) -> tuple: 

158 """ 

159 Find variables that require a full-hour shift to avoid double interpolation. 

160 

161 This function identifies pass-through variables in the DataFrame `df` that are specified in 

162 the `output_format` to be shifted by a full hour in total. It performs the necessary full 

163 hour shift on these variables to prevent double interpolation. 

164 

165 Args: 

166 df (pd.DataFrame): The DataFrame containing data to be shifted. 

167 output_format (dict): A dictionary specifying the desired format and shifting of the data. 

168 

169 Returns: 

170 tuple: List of added pass-through variables and the modified DataFrame. 

171 """ 

172 

173 selected_columns = [] 

174 suffix_mapping_forward = {"_no_prec2ind": "ind2foll"} 

175 suffix_mapping_backward = {"_no_foll2ind": "ind2prec"} 

176 

177 for col in df.columns: 

178 core_name = col.split("_no_")[0] 

179 if core_name in output_format: 

180 suffix = col[len(core_name) :] 

181 if ( 

182 suffix in suffix_mapping_forward 

183 and output_format[core_name]["time_of_meas_shift"] 

184 == suffix_mapping_forward[suffix] 

185 ): 

186 df.loc[:, col] = df[col].shift(periods=1, freq="h", axis=0) 

187 selected_columns.append(col) 

188 elif ( 

189 suffix in suffix_mapping_backward 

190 and output_format[core_name]["time_of_meas_shift"] 

191 == suffix_mapping_backward[suffix] 

192 ): 

193 df.loc[:, col] = df[col].shift(periods=-1, freq="h", axis=0) 

194 selected_columns.append(col) 

195 

196 return selected_columns, df 

197 

198 

199def pass_through_measurements_with_back_and_forth_interpolating( 

200 core2output_df: pd.DataFrame, format_outputter: dict 

201) -> pd.DataFrame: 

202 """ 

203 Insert pass-through measurements to the output dataframe to 

204 avoid back-and-forth or double shifting interpolation. 

205 

206 It deletes the double interpolated variables and inserts the pass-through ones where applicable. 

207 

208 Args: 

209 core2output_df (pd.DataFrame): DataFrame containing core data in the process of core2outputfile. 

210 format_outputter (dict): Dictionary specifying the format of output data. 

211 

212 Returns: 

213 pd.DataFrame: The modified `core2output_df` DataFrame with pass-through variables. 

214 """ 

215 pass_trough_variables = _find_pass_through_core_names( 

216 core2output_df.columns, format_outputter 

217 ) 

218 shift_full_hour_variables, core2output_df = _find_and_apply_full_hour_shifts( 

219 core2output_df, format_outputter 

220 ) 

221 

222 all_vars = pass_trough_variables + shift_full_hour_variables 

223 

224 # delete interpolated variables and insert pass through or full hour shift variables 

225 for column_name in all_vars: 

226 core_name = column_name.split("_no")[0] 

227 # Drop the original variable 

228 core2output_df.drop(columns=[core_name], inplace=True) 

229 # Rename the "_noShift" variable by removing the suffix 

230 core2output_df.rename(columns={column_name: core_name}, inplace=True) 

231 

232 return core2output_df