Source code for aixcalibuha.data_types

"""
Module containing data types to enable an automatic usage of
different other modules in the Python package.
"""
import warnings
import logging
from typing import Union, Callable
from copy import deepcopy
import pandas as pd
import numpy as np
from ebcpy import TimeSeriesData
from ebcpy.utils.statistics_analyzer import StatisticsAnalyzer
from ebcpy.preprocessing import convert_datetime_index_to_float_index

# pylint: disable=I1101

logger = logging.getLogger(__name__)


[docs]class Goals: """ Class for one or multiple goals. Used to evaluate the difference between current simulation and measured data :param (ebcpy.data_types.TimeSeriesData, pd.DataFrame) meas_target_data: The dataset of the measurement. It acts as a point of reference for the simulation output. If the dimensions of the given DataFrame and later added simulation-data are not equal, an error is raised. Has to hold all variables listed under the MEASUREMENT_NAME variable in the variable_names dict. :param dict variable_names: A dictionary to construct the goals-DataFrame using pandas MultiIndex-Functionality. The dict has to follow the structure. ``variable_names = {VARIABLE_NAME: [MEASUREMENT_NAME, SIMULATION_NAME]}`` - VARIABLE_NAME: A string which holds the actual name of the variable you use as a goal. E.g.: ``VARIABLE_NAME="Temperature_Condenser_Outflow"`` - MEASUREMENT_NAME: Is either a string or a tuple. Hold the name the variable has inside the given meas_target_data. If you want to specify a tag you have to pass a tuple, like: ``(MEASUREMENT_NAME, TAG_NAME)``. Else just pass a string. E.g.: ``MEASUREMENT_NAME="HydraulicBench[4].T_Out"`` or ``MEASUREMENT_NAME=("HydraulicBench[4].T_Out", "preprocessed")`` - SIMULATION_NAME is either a string or a tuple, just like MEASUREMENT_NAME. E.g. (for Modelica): ``SIMULATION_NAME="HeatPump.Condenser.Vol.T"`` You may use a tuple instead of a list OR a dict with key "meas" for measurement and key "sim" for simulation. These options may be relevant for your own code readability. E.g. ``variable_names = {VARIABLE_NAME: {"meas":MEASUREMENT_NAME, "sim": SIMULATION_NAME}}`` :param str statistical_measure: Measure to calculate the scalar of the objective, One of the supported methods in ebcpy.utils.statistics_analyzer.StatisticsAnalyzer e.g. RMSE, MAE, NRMSE :param list weightings: Values between 0 and 1 to account for multiple Goals to be evaluated. If multiple goals are selected, and weightings is None, each weighting will be equal to 1/(Number of goals). The weighting is scaled so that the sum will equal 1. """ # Set default string for measurement reference meas_tag_str = "meas" sim_tag_str = "sim" def __init__(self, meas_target_data: Union[TimeSeriesData, pd.DataFrame], variable_names: dict, statistical_measure: str, weightings: list = None): """Initialize class-objects and check correct input.""" # Open the meas target data: if not isinstance(meas_target_data, (TimeSeriesData, pd.DataFrame)): raise TypeError(f"Given meas_target_data is of type {type(meas_target_data).__name__} " "but TimeSeriesData is required.") if not isinstance(variable_names, dict): raise TypeError(f"Given variable_names is of type {type(variable_names).__name__} " f"but a dict is required.") # Extract the measurement-information out of the dict. self.variable_names = variable_names # Used to speed up the frequently used set_sim_target_data function self._sim_var_matcher = {} _columns = [] # Used to extract relevant part of df _rename_cols_dict = {} for var_name, meas_sim_info in self.variable_names.items(): # First extract the information about the measurement out of the dict if isinstance(meas_sim_info, dict): meas_info = meas_sim_info[self.meas_tag_str] self._sim_var_matcher[var_name] = meas_sim_info[self.sim_tag_str] elif isinstance(meas_sim_info, (list, tuple)): meas_info = meas_sim_info[0] self._sim_var_matcher[var_name] = meas_sim_info[1] else: raise TypeError(f"Variable {var_name} of variable_names has a value" "neither being a dict, list or tuple.") # Now get the info to extract the values out of the given tsd # Convert string with into a list of tuples containing the relevant tag. # If mulitple tags exist, and the default tag (self.meas_tag_str) # is not present, an error is raised. if isinstance(meas_info, str): if isinstance(meas_target_data[meas_info], pd.Series): raise TypeError("Given meas_target_data contains columns without a tag." "Please only pass MultiIndex-DataFrame objects.") tags = meas_target_data[meas_info].columns _rename_cols_dict[meas_info] = var_name if len(tags) != 1 and self.meas_tag_str not in tags: raise TypeError("Not able to automatically select variables and tags. " f"Variable {meas_info} has mutliple tags, none of which " f"is specified as {self.meas_tag_str}.") if self.meas_tag_str in tags: _columns.append((meas_info, self.meas_tag_str)) else: _columns.append((meas_info, tags[0])) elif isinstance(meas_info, tuple): _rename_cols_dict[meas_info[0]] = var_name _columns.append(meas_info) else: raise TypeError(f"Measurement Info on variable {var_name} is " "neither of type string or tuple.") # Take the subset of the given tsd based on var_names and tags. self._tsd = meas_target_data[_columns].copy() # Rename all variables to the given var_name (key of self.variable_names) self._tsd = self._tsd.rename(columns=_rename_cols_dict, level=0) # Rename all tags to the default measurement name for consistency. tags = dict(zip(self._tsd.columns.levels[1], [self.meas_tag_str for _ in range(len(_columns))])) self._tsd = self._tsd.rename(columns=tags, level=1) # Save the tsd to a tsd_ref object # Used to never lose the original dataframe. # _tsd may be altered by relevant intervals, this object never! self._tsd_ref = self._tsd.copy() # Set the statistical analyzer: self.statistical_measure = statistical_measure # Set the weightings, if not specified. self._num_goals = len(_columns) if weightings is None: self.weightings = np.array([1 / self._num_goals for i in range(self._num_goals)]) else: if not isinstance(weightings, (list, np.ndarray)): raise TypeError(f"weightings is of type {type(weightings).__name__} " f"but should be of type list.") if len(weightings) != self._num_goals: raise IndexError(f"The given number of weightings ({len(weightings)}) " f"does not match the number of " f"goals ({self._num_goals})") self.weightings = np.array(weightings) / sum(weightings) def __str__(self): """Overwrite string method to present the Goals-Object more nicely.""" return str(self._tsd) @property def statistical_measure(self): """The statistical measure of this Goal instance""" return self._stat_meas @statistical_measure.setter def statistical_measure(self, statistical_measure: Union[str, Callable]): """ Set the new statistical measure. The value must be supported by the method argument in the ``StatisticsAnalyzer`` class of ``ebcpy``. """ self._stat_analyzer = StatisticsAnalyzer(method=statistical_measure) if callable(statistical_measure): self._stat_meas = statistical_measure.__name__ else: self._stat_meas = statistical_measure
[docs] def eval_difference(self, verbose=False, penaltyfactor=1): """ Evaluate the difference of the measurement and simulated data based on the chosen statistical_measure. :param boolean verbose: If True, a dict with difference-values of for all goals and the corresponding weightings is returned together with the total difference. This can be useful to better understand which goals is performing well in an optimization and which goals needs further is not performing well. :param float penaltyfactor: Muliplty result with this factor to account for penatlies of some sort. :return: float total_difference weighted ouput for all goals. """ total_difference = 0 _verbose_calculation = {} for i, goal_name in enumerate(self.variable_names.keys()): if self._tsd.isnull().values.any(): raise ValueError("There are not valid values in the " "simulated target data. Probably the time " "interval of measured and simulated data " "are not equal. \nPlease check the frequencies " "in the toml file (output_interval & frequency).") _diff = self._stat_analyzer.calc( meas=self._tsd[(goal_name, self.meas_tag_str)], sim=self._tsd[(goal_name, self.sim_tag_str)] ) # Apply penalty function _diff = _diff * penaltyfactor _verbose_calculation[goal_name] = (self.weightings[i], _diff) total_difference += self.weightings[i] * _diff if verbose: return total_difference, _verbose_calculation return total_difference
[docs] def set_sim_target_data(self, sim_target_data): """Alter the object with new simulation data self._sim_target_data based on the given dataframe sim_target_data. :param TimeSeriesData sim_target_data: Object with simulation target data. This data should be the output of a simulation, hence "sim"-target-data. """ # Start with the base self._tsd = self._tsd_ref.copy() # Check index type if not isinstance(sim_target_data.index, type(self._tsd.index)): raise IndexError( f"Given sim_target_data is using {type(sim_target_data.index).__name__}" f" as an index, but the reference results (measured-data) was declared" f" using the {type(self._tsd_ref.index).__name__}. Convert your" f" measured-data index to solve this error." ) # Three critical cases may occur: # 1. sim_target_data is bigger (in len) than _tsd # --> Only the first part is accepted # 2. sim_target_data is smaller than _tsd # --> Missing values become NaN, which is fine. If no other function eliminates # the NaNs, an error is raised when doing eval_difference(). # 3. The index differs: # --> All new values are NaN. However, this should raise an error, as an error # in eval_difference would not lead back to this function. # Check if index matches in relevant intersection: sta = max(self._tsd.index[0], sim_target_data.index[0]) sto = min(self._tsd.index[-1], sim_target_data.index[-1]) if len(self._tsd.loc[sta:sto].index) != len(sim_target_data.loc[sta:sto].index): raise ValueError(f"Given indexes have different lengths " f"({len(self._tsd.loc[sta:sto].index)} vs " f"{len(sim_target_data.loc[sta:sto].index)}). " f"Can't compare them. ") mask = self._tsd.loc[sta:sto].index != sim_target_data.loc[sta:sto].index if np.any(mask): diff = self._tsd.loc[sta:sto].index - sim_target_data.loc[sta:sto].index raise IndexError(f"Measured and simulated data differ on {np.count_nonzero(mask)}" f" index points. Affected index part: {diff[mask]}. " f"This will lead to errors in evaluation, " f"hence we raise the error already here. " f"Check output_interval, equidistant_output and " f"frequency of measured data to find the reason for " f"this error. The have to match.") # Resize simulation data to match to meas data for goal_name in self.variable_names.keys(): _tsd_sim = sim_target_data.loc[sta:sto, self._sim_var_matcher[goal_name]] if len(_tsd_sim.columns) > 1: raise ValueError("Given sim_target_data contains multiple tags for variable " f"{self._sim_var_matcher[goal_name]}. " "Can't select one automatically.") self._tsd.loc[sta:sto, (goal_name, self.sim_tag_str)] = _tsd_sim.values # Sort the index for better visualisation self._tsd = self._tsd.sort_index(axis=1)
[docs] def set_relevant_time_intervals(self, intervals): """ For many calibration-uses cases, different time-intervals of the measured and simulated data are relevant. Set the interval to be used with this function. This will change both measured and simulated data. Therefore, the eval_difference function can be called at every moment. :param list intervals: List with time-intervals. Each list element has to be a tuple with the first element being the start_time as float or int and the second item being the end_time of the interval as float or int. E.g: [(0, 100), [150, 200), (500, 600)] """ _df_ref = self._tsd.copy() # Create initial False mask _mask = np.full(_df_ref.index.shape, False) # Dynamically make mask for multiple possible time-intervals for _start_time, _end_time in intervals: _mask = _mask | ((_df_ref.index >= _start_time) & (_df_ref.index <= _end_time)) self._tsd = _df_ref.loc[_mask]
[docs] def get_goals_list(self): """Get the internal list containing all goals.""" return list(self.variable_names.keys())
[docs] def get_goals_data(self): """Get the current time-series-data object.""" return self._tsd.copy()
[docs] def get_sim_var_names(self): """Get the names of the simulation variables. :returns list sim_var_names: Names of the simulation variables as a list """ return list(self._sim_var_matcher.values())
[docs] def get_meas_frequency(self): """ Get the frequency of the measurement data. :returns: float: Mean frequency of the index """ mean, std = self._tsd_ref.frequency if std >= 1e-8: logger.critical("The index of your measurement data is not " "equally sampled. The standard deviation is %s." "The may lead to errors when mapping measurements to simulation " "results.", mean.std()) return mean
[docs]class TunerParas: """ Class for tuner parameters. Tuner parameters are parameters of a model which are constant during simulation but are varied during calibration or other analysis. :param list names: List of names of the tuner parameters :param float,int initial_values: Initial values for optimization. Even though some optimization methods don't require an initial guess, specifying a initial guess based on expected values or experience is helpful to better check the results of the calibration :param list,tuple bounds: Tuple or list of float or ints for lower and upper bound to the tuner parameter. The bounds object is optional, however highly recommend for calibration or optimization in general. As soon as you tune parameters with different units, such as Capacity and heat conductivity, the solver will fail to find good solutions. Example: >>> tuner_paras = TunerParas(names=["C", "m_flow_2", "heatConv_a"], >>> initial_values=[5000, 0.02, 200], >>> bounds=[(4000, 6000), (0.01, 0.1), (10, 300)]) >>> print(tuner_paras) initial_value min max scale names C 5000.00 4000.00 6000.0 2000.00 m_flow_2 0.02 0.01 0.1 0.09 heatConv_a 200.00 10.00 300.0 290.00 """ def __init__(self, names, initial_values, bounds=None): """Initialize class-objects and check correct input.""" # Check if the given input-parameters are of correct format. If not, raise an error. for name in names: if not isinstance(name, str): raise TypeError(f"Given name is of type {type(name).__name__} " "and not of type str.") # Check if all names are unique: if len(names) != len(set(names)): raise ValueError("Given names contain duplicates. " "This will yield errors in later stages" "such as calibration of sensitivity analysis.") try: # Calculate the sum, as this will fail if the elements are not float or int. sum(initial_values) except TypeError as err: raise TypeError("initial_values contains other " "instances than float or int.") from err if len(names) != len(initial_values): raise ValueError(f"shape mismatch: names has length {len(names)}" f" and initial_values {len(initial_values)}.") self._bounds = bounds if bounds is None: _bound_min = -np.inf _bound_max = np.inf else: if len(bounds) != len(names): raise ValueError(f"shape mismatch: bounds has length {len(bounds)} " f"and names {len(names)}.") _bound_min, _bound_max = [], [] for bound in bounds: _bound_min.append(bound[0]) _bound_max.append(bound[1]) self._df = pd.DataFrame({"names": names, "initial_value": initial_values, "min": _bound_min, "max": _bound_max}) self._df = self._df.set_index("names") self._set_scale() def __str__(self): """Overwrite string method to present the TunerParas-Object more nicely.""" return str(self._df)
[docs] def scale(self, descaled): """ Scales the given value to the bounds of the tuner parameter between 0 and 1 :param np.array,list descaled: Value to be scaled :return: np.array scaled: Scaled value between 0 and 1 """ # If no bounds are given, scaling is not possible--> descaled = scaled if self._bounds is None: return descaled _scaled = (descaled - self._df["min"]) / self._df["scale"] if not all((_scaled >= 0) & (_scaled <= 1)): warnings.warn("Given descaled values are outside " "of bounds. Automatically limiting " "the values with respect to the bounds.") return np.clip(_scaled, a_min=0, a_max=1)
[docs] def descale(self, scaled): """ Converts the given scaled value to an descaled one. :param np.array,list scaled: Scaled input value between 0 and 1 :return: np.array descaled: descaled value based on bounds. """ # If no bounds are given, scaling is not possible--> descaled = scaled if not self._bounds: return scaled _scaled = np.array(scaled) if not all((_scaled >= 0 - 1e4) & (_scaled <= 1 + 1e4)): warnings.warn("Given scaled values are outside of bounds. " "Automatically limiting the values with " "respect to the bounds.") _scaled = np.clip(_scaled, a_min=0, a_max=1) return _scaled * self._df["scale"] + self._df["min"]
@property def bounds(self): """Get property bounds""" return self._bounds
[docs] def get_names(self): """Return the names of the tuner parameters""" return list(self._df.index)
[docs] def get_initial_values(self): """Return the initial values of the tuner parameters""" return self._df["initial_value"].values
[docs] def get_bounds(self): """Return the bound-values of the tuner parameters""" return self._df["min"].values, self._df["max"].values
[docs] def get_value(self, name, col): """Function to get a value of a specific tuner parameter""" return self._df.loc[name, col]
[docs] def set_value(self, name, col, value): """Function to set a value of a specific tuner parameter""" if not isinstance(value, (float, int)): raise ValueError(f"Given value is of type {type(value).__name__} " "but float or int is required") if col not in ["max", "min", "initial_value"]: raise KeyError("Can only alter max, min and initial_value") self._df.loc[name, col] = value self._set_scale()
[docs] def remove_names(self, names): """ Remove gives list of names from the Tuner-parameters :param list names: List with names inside of the TunerParas-dataframe """ self._df = self._df.drop(names)
def _set_scale(self): self._df["scale"] = self._df["max"] - self._df["min"] if not self._df[self._df["scale"] <= 0].empty: raise ValueError( "The given lower bounds are greater equal " "than the upper bounds, resulting in a " f"negative scale: \n{str(self._df['scale'])}" )
[docs]class CalibrationClass: """ Class used for calibration of time-series data. :param str name: Name of the class, e.g. 'device on' :param float,int start_time: Time at which the class starts :param float,int stop_time: Time at which the class ends :param Goals goals: Goals parameters which are relevant in this class. As this class may be used in the classifier, a Goals-Class may not be available at all times and can be added later. :param TunerParas tuner_paras: As this class may be used in the classifier, a TunerParas-Class may not be available at all times and can be added later. :param list relevant_intervals: List with time-intervals relevant for the calibration. Each list element has to be a tuple with the first element being the start-time as float/int and the second item being the end-time of the interval as float/int. E.g: For a class with start_time=0 and stop_time=1000, given following intervals [(0, 100), [150, 200), (500, 600)] will only evaluate the data between 0-100, 150-200 and 500-600. The given intervals may overlap. Furthermore the intervals do not need to be in an ascending order or be limited to the start_time and end_time parameters. :keyword (pd.DataFrame, ebcpy.data_types.TimeSeriesData) inputs: TimeSeriesData or DataFrame that holds input data for the simulation to run. The time-index should be float index and match the overall ranges set by start- and stop-time. :keyword dict input_kwargs: If inputs are provided, additional input keyword-args passed to the simulation API can be specified. Using FMUs, you don't need to specify anything. Using DymolaAPI, you have to specify 'table_name' and 'file_name' """ def __init__(self, name, start_time, stop_time, goals=None, tuner_paras=None, relevant_intervals=None, **kwargs): """Initialize class-objects and check correct input.""" self.name = name self._start_time = start_time self.stop_time = stop_time self._goals = None self._tuner_paras = None if goals is not None: self.goals = goals if tuner_paras is not None: self.tuner_paras = tuner_paras if relevant_intervals is not None: self.relevant_intervals = relevant_intervals else: # Then all is relevant self.relevant_intervals = [(start_time, stop_time)] self._inputs = None inputs = kwargs.get('inputs', None) if inputs is not None: self.inputs = inputs # Trigger the property setter self.input_kwargs = kwargs.get('input_kwargs', {}) @property def name(self): """Get name of calibration class""" return self._name @name.setter def name(self, name: str): """Set name of calibration class""" if not isinstance(name, str): raise TypeError(f"Name of CalibrationClass is {type(name)} " f"but has to be of type str") self._name = name @property def start_time(self) -> Union[float, int]: """Get start time of calibration class""" return self._start_time @start_time.setter def start_time(self, start_time: Union[float, int]): """Set start time of calibration class""" if not start_time <= self.stop_time: raise ValueError("The given start-time is " "higher than the stop-time.") self._start_time = start_time @property def stop_time(self) -> Union[float, int]: """Get stop time of calibration class""" return self._stop_time @stop_time.setter def stop_time(self, stop_time: Union[float, int]): """Set stop time of calibration class""" if not self.start_time <= stop_time: raise ValueError("The given stop-time is " "lower than the start-time.") self._stop_time = stop_time @property def tuner_paras(self) -> TunerParas: """Get the tuner parameters of the calibration-class""" return self._tuner_paras @tuner_paras.setter def tuner_paras(self, tuner_paras): """ Set the tuner parameters for the calibration-class. :param tuner_paras: TunerParas """ if not isinstance(tuner_paras, TunerParas): raise TypeError(f"Given tuner_paras is of type {type(tuner_paras).__name__} " "but should be type TunerParas") self._tuner_paras = deepcopy(tuner_paras) @property def goals(self) -> Goals: """Get current goals instance""" return self._goals @goals.setter def goals(self, goals: Goals): """ Set the goals object for the calibration-class. :param Goals goals: Goals-data-type """ if not isinstance(goals, Goals): raise TypeError(f"Given goals parameter is of type {type(goals).__name__} " "but should be type Goals") self._goals = deepcopy(goals) @property def relevant_intervals(self) -> list: """Get current relevant_intervals""" return self._relevant_intervals @relevant_intervals.setter def relevant_intervals(self, relevant_intervals: list): """Set current relevant_intervals""" self._relevant_intervals = relevant_intervals @property def inputs(self) -> Union[TimeSeriesData, pd.DataFrame]: """Get the inputs for this calibration class""" return self._inputs @inputs.setter def inputs(self, inputs: Union[TimeSeriesData, pd.DataFrame]): """Set the inputs for this calibration class""" # Check correct index: if not isinstance(inputs, (TimeSeriesData, pd.DataFrame)): raise TypeError(f"Inputs need to be either TimeSeriesData " f"or pd.DataFrame, but you passed {type(inputs)}") if isinstance(inputs.index, pd.DatetimeIndex): inputs = convert_datetime_index_to_float_index(inputs) self._inputs = inputs
[docs]def merge_calibration_classes(calibration_classes): """ Given a list of multiple calibration-classes, this function merges given objects by the "name" attribute. Relevant intervals are set, in order to maintain the start and stop-time info. :param list calibration_classes: List containing multiple CalibrationClass-Objects :return: list cal_classes_merged: A list containing one CalibrationClass-Object for each different "name" of class. Example: >>> cal_classes = [CalibrationClass("on", 0, 100), >>> CalibrationClass("off", 100, 200), >>> CalibrationClass("on", 200, 300)] >>> merged_classes = merge_calibration_classes(cal_classes) Is equal to: >>> merged_classes = [CalibrationClass("on", 0, 300, >>> relevant_intervals=[(0,100), (200,300)]), >>> CalibrationClass("off", 100, 200)] """ # Use a dict for easy name-access temp_merged = {} for cal_class in calibration_classes: _name = cal_class.name # First create dictionary with all calibration classes if _name in temp_merged: temp_merged[_name]["intervals"] += cal_class.relevant_intervals else: temp_merged[_name] = {"goals": cal_class.goals, "tuner_paras": cal_class.tuner_paras, "intervals": deepcopy(cal_class.relevant_intervals), "inputs": deepcopy(cal_class.inputs), "input_kwargs": deepcopy(cal_class.input_kwargs) } # Convert dict to actual calibration-classes cal_classes_merged = [] for _name, values in temp_merged.items(): # Flatten the list of tuples and get the start- and stop-values start_time = min(sum(values["intervals"], ())) stop_time = max(sum(values["intervals"], ())) cal_classes_merged.append(CalibrationClass( _name, start_time, stop_time, goals=values["goals"], tuner_paras=values["tuner_paras"], relevant_intervals=values["intervals"], inputs=values["inputs"], input_kwargs=values["input_kwargs"]) ) return cal_classes_merged