Source code for aixweather.transformation_functions.pass_through_handling

"""
This module contains auxiliary functions for data transformation, e.g. time shifts
"""
import logging

import pandas as pd

from aixweather.transformation_functions import auxiliary
from aixweather.imports.utils_import import MetaData


logger = logging.getLogger(__name__)


[docs]def create_pass_through_variables( df_shifted: pd.DataFrame, df_no_shift: pd.DataFrame, format: dict, transform_func, meta: MetaData, ): """ This function adds unshifted data from the `df_no_shift` DataFrame to the `df_shifted` DataFrame to prevent unnecessary interpolation of values. It takes into account the required shifting and performs transformations. The appropriate pass-through variables (unshifted variables) are added to the dataframe with a suffix specifying the shifting that would be required for them. Calculated (transformed) variables are only added if the variables they were calculated from all have the same shifting (time of measurement). Args: df_shifted (pd.DataFrame): The DataFrame with shifted data. df_no_shift (pd.DataFrame): The DataFrame with unshifted data. format (dict): A dictionary specifying the format (required shifting) of the data. transform_func: The transformation function from the import2core data process. meta (MetaData): Metadata associated with the data. Returns: pd.DataFrame: The modified `df_shifted` DataFrame with added pass-through variables. """ logger.debug("Apply transformation for pass through variables.") # perform same transformation df_no_shift, meta.executed_transformations_no_shift = transform_func(df_no_shift) ### add unshifted variables present in the format_dict to the df for key, value in format.items(): if ( value["core_name"] not in meta.executed_transformations_no_shift.keys() ): # imputed variables need the shift according to their used variables shift = value["time_of_meas_shift"] if shift is not None: df_shifted[f"{value['core_name']}_no_{shift}"] = df_no_shift[ value["core_name"] ] ### add unshifted variables that have been imputed from other variables for ( desired_variable, used_variables, ) in meta.executed_transformations_no_shift.items(): def get_shifts_of_used_variables(used_variables, meta): # get variables that have been used for transformation used_variables_shifts = {} for var in used_variables: if isinstance(var, str): format_entry = auxiliary.select_entry_by_core_name(format, var) if ( format_entry is not None and var not in meta.executed_transformations_no_shift.keys() ): # use shift from format if var has not been calculated itself used_variables_shifts.update( {var: format_entry["time_of_meas_shift"]} ) elif format_entry is None: # then the used variable has been calculated itself and # it is not in the format # get the shifts of the variables used to calculate that used variable used_variables_of_var = meta.executed_transformations_no_shift[ var ] used_variables_shifts_of_var = get_shifts_of_used_variables( used_variables_of_var, meta ) used_variables_shifts.update(used_variables_shifts_of_var) return used_variables_shifts used_variables_shifts = get_shifts_of_used_variables(used_variables, meta) # define vars of which the shift should be ignored for # validity checking, e.g. slowly changing variables vars_to_ignore_shift = [] # fill in the core names for var_to_ignore in vars_to_ignore_shift: used_variables_shifts.pop(var_to_ignore, None) # check whether they have all the same time shifting is_identical = len(set(used_variables_shifts.values())) == 1 if not is_identical: # dont add to df logger.debug( f"Calculation of the non-shifted {desired_variable} is " f"not valid due non consistent " f"time of measurement (shifting) of the required " f"variables {used_variables_shifts}. " f"There wont be a pass-through for this variable. " f"Info: If used variables have been calculated themself " f"the shift of the used variables " f"for that calculation are checked." ) else: # add to df shift = list(used_variables_shifts.values())[0] if shift is not None: df_shifted[f"{desired_variable}_no_{shift}"] = df_no_shift[ desired_variable ] return df_shifted
def _find_pass_through_core_names(columns: list, output_format: dict) -> list: """ Identify pass-through variable names based on the output format and their column suffix. This function analyzes a list of column names and identifies those that represent pass-through core variables based on the provided output format. It takes into account suffix mappings to match the required shifting. suffix mapping key = desired shifting during import suffix mapping value = desired shifting during export Args: columns (list): A list of column names to analyze. output_format (dict): A dictionary specifying the desired format and shifting of the data. Returns: list: A list of column names representing pass-through variables that shall actually be passed through. """ selected_columns = [] suffix_mapping = {"_no_prec2ind": "ind2prec", "_no_foll2ind": "ind2foll"} for col in columns: core_name = col.split("_no_")[0] if core_name in output_format: suffix = col[len(core_name) :] if ( suffix in suffix_mapping and output_format[core_name]["time_of_meas_shift"] == suffix_mapping[suffix] ): selected_columns.append(col) return selected_columns def _find_and_apply_full_hour_shifts(df: pd.DataFrame, output_format: dict) -> tuple: """ Find variables that require a full-hour shift to avoid double interpolation. This function identifies pass-through variables in the DataFrame `df` that are specified in the `output_format` to be shifted by a full hour in total. It performs the necessary full hour shift on these variables to prevent double interpolation. Args: df (pd.DataFrame): The DataFrame containing data to be shifted. output_format (dict): A dictionary specifying the desired format and shifting of the data. Returns: tuple: List of added pass-through variables and the modified DataFrame. """ selected_columns = [] suffix_mapping_forward = {"_no_prec2ind": "ind2foll"} suffix_mapping_backward = {"_no_foll2ind": "ind2prec"} for col in df.columns: core_name = col.split("_no_")[0] if core_name in output_format: suffix = col[len(core_name) :] if ( suffix in suffix_mapping_forward and output_format[core_name]["time_of_meas_shift"] == suffix_mapping_forward[suffix] ): df.loc[:, col] = df[col].shift(periods=1, freq="h", axis=0) selected_columns.append(col) elif ( suffix in suffix_mapping_backward and output_format[core_name]["time_of_meas_shift"] == suffix_mapping_backward[suffix] ): df.loc[:, col] = df[col].shift(periods=-1, freq="h", axis=0) selected_columns.append(col) return selected_columns, df
[docs]def pass_through_measurements_with_back_and_forth_interpolating( core2output_df: pd.DataFrame, format_outputter: dict ) -> pd.DataFrame: """ Insert pass-through measurements to the output dataframe to avoid back-and-forth or double shifting interpolation. It deletes the double interpolated variables and inserts the pass-through ones where applicable. Args: core2output_df (pd.DataFrame): DataFrame containing core data in the process of core2outputfile. format_outputter (dict): Dictionary specifying the format of output data. Returns: pd.DataFrame: The modified `core2output_df` DataFrame with pass-through variables. """ pass_trough_variables = _find_pass_through_core_names( core2output_df.columns, format_outputter ) shift_full_hour_variables, core2output_df = _find_and_apply_full_hour_shifts( core2output_df, format_outputter ) all_vars = pass_trough_variables + shift_full_hour_variables # delete interpolated variables and insert pass through or full hour shift variables for column_name in all_vars: core_name = column_name.split("_no")[0] # Drop the original variable core2output_df.drop(columns=[core_name], inplace=True) # Rename the "_noShift" variable by removing the suffix core2output_df.rename(columns={column_name: core_name}, inplace=True) return core2output_df