Source code for aixcalibuha.calibration.calibrator

"""
Module containing the basic class to calibrate
a dynamic model, e.g. a modelica model.
"""

import os
import json
import time
import logging
from typing import Dict
from copy import copy
import numpy as np
import pandas as pd
from ebcpy import data_types, Optimizer
from ebcpy.simulationapi import SimulationAPI
from aixcalibuha.utils import visualizer, MaxIterationsReached
from aixcalibuha import CalibrationClass, Goals, TunerParas


[docs]class Calibrator(Optimizer): """ This class can Calibrator be used for single time-intervals of calibration. :param str,os.path.normpath cd: Working directory :param ebcpy.simulationapi.SimulationAPI sim_api: Simulation-API for running the models :param CalibrationClass calibration_class: Class with information on Goals and tuner-parameters for calibration :keyword str result_path: If given, then the resulting parameter values will be stored in a JSON file at the given path. :keyword float timedelta: If you use this class for calibrating a single time-interval, you can set the timedelta to instantiate the simulation before actually evaluating it for the objective. The given float (default is 0) is subtracted from the start_time of your calibration_class. You can find a visualisation of said timedelta in the img folder of the project. :keyword boolean save_files: If true, all simulation files for each iteration will be saved! :keyword boolean verbose_logging: Default is True. If False, the standard Logger without Visualization in Form of plots is used. If you use this, the following keyword arguments below will help to further adjust the logging. :keyword boolean show_plot: Default is True. If False, all created plots are not shown during calibration but only stored at the end of the process. :keyword boolean create_tsd_plot: Default is True. If False, the plot of the time series data (goals) is not created and thus shown in during calibration. It therefore is also not stored, even if you set the save_tsd_plot keyword-argument to true. :keyword boolean save_tsd_plot: Default is False. If True, at each iteration the created plot of the time-series is saved. This may make the process much slower :keyword boolean fail_on_error: Default is False. If True, the calibration will stop with an error if the simulation fails. See also: ``ret_val_on_error`` :keyword float,np.NAN ret_val_on_error: Default is np.NAN. If ``fail_on_error`` is false, you can specify here which value to return in the case of a failed simulation. Possible options are np.NaN, np.inf or some other high numbers. be aware that this max influence the solver. :keyword dict fixed_parameters: Default is an empty dict. This dict may be used to add certain parameters to the simulation which are not tuned / variable during calibration. Such parameters may be used if the default values in the model don't represent the parameter values you want to use. :keyword boolean apply_penalty: Default is true. Specifies if a penalty function should be applied or not. :keyword boolean penalty_factor: Default is 0. Quantifies the impact of the penalty term on the objective function. The penalty factor is added to the objective function. :keyword boolean recalibration_count: Default is 0. Works as a counter and specifies the current cycle of recalibration. :keyword boolean perform_square_deviation: Default is false. If true the penalty function will evaluate the penalty factor with a quadratic approach. :keyword int max_itercount: Default is Infinity. Maximum number of iterations of calibration. This may be useful to explicitly limit the calibration time. :keyword str plot_file_type: File ending of created plots. Any supported option in matplotlib, e.g. svg, png, pdf ... Default is png """ def __init__(self, cd: str, sim_api: SimulationAPI, calibration_class: CalibrationClass, **kwargs): """Instantiate instance attributes""" # %% Kwargs # Initialize supported keywords with default value # Pop the items so they wont be added when calling the # __init__ of the parent class. Always pop with a default value in case # the keyword is not passed. self.verbose_logging = kwargs.pop("verbose_logging", True) self.save_files = kwargs.pop("save_files", False) self.timedelta = kwargs.pop("timedelta", 0) self.fail_on_error = kwargs.pop("fail_on_error", False) self.ret_val_on_error = kwargs.pop("ret_val_on_error", np.NAN) self.fixed_parameters = kwargs.pop("fixed_parameters", {}) self.apply_penalty = kwargs.pop("apply_penalty", True) self.penalty_factor = kwargs.pop("penalty_factor", 0) self.recalibration_count = kwargs.pop("recalibration_count", 0) self.perform_square_deviation = kwargs.pop("square_deviation", False) self.result_path = kwargs.pop('result_path', None) self.max_itercount = kwargs.pop('max_itercount', np.inf) self.at_calibration = True # Boolean to indicate if validating or calibrating # Extract kwargs for the visualizer visualizer_kwargs = { "save_tsd_plot": kwargs.pop("save_tsd_plot", False), "create_tsd_plot": kwargs.pop("create_tsd_plot", True), "show_plot": kwargs.pop("show_plot", True), "show_plot_pause_time": kwargs.pop("show_plot_pause_time", 1e-3), "file_type": kwargs.pop("plot_file_type", "png"), } # Check if types are correct: # Booleans: _bool_kwargs = ["save_files"] for bool_keyword in _bool_kwargs: keyword_value = self.__getattribute__(bool_keyword) if not isinstance(keyword_value, bool): raise TypeError(f"Given {bool_keyword} is of type " f"{type(keyword_value).__name__} but should be type bool") # %% Initialize all public parameters super().__init__(cd, **kwargs) # Set sim_api self.sim_api = sim_api if not isinstance(calibration_class, CalibrationClass): raise TypeError(f"calibration_classes is of type {type(calibration_class).__name__} " f"but should be CalibrationClass") self.calibration_class = calibration_class # Scale tuner on boundaries self.x0 = self.tuner_paras.scale(self.tuner_paras.get_initial_values()) if self.tuner_paras.bounds is None: self.bounds = None else: # As tuner-parameters are scaled between 0 and 1, the scaled bounds are always 0 and 1 self.bounds = [(0, 1) for i in range(len(self.x0))] # Add the values to the simulation setup. self.sim_api.set_sim_setup( {"start_time": self.calibration_class.start_time - self.timedelta, "stop_time": self.calibration_class.stop_time} ) # %% Setup the logger # De-register the logger setup in the optimization class: if self.verbose_logging: self.logger = visualizer.CalibrationVisualizer( cd=cd, name=self.__class__.__name__, calibration_class=self.calibration_class, logger=self.logger, **visualizer_kwargs ) else: self.logger = visualizer.CalibrationLogger( cd=cd, name=self.__class__.__name__, calibration_class=self.calibration_class, logger=self.logger ) self.cd_of_class = cd # Single class does not need an extra folder # Set the output interval according the the given Goals mean_freq = self.goals.get_meas_frequency() self.logger.log("Setting output_interval of simulation according " f"to measurement target data frequency: {mean_freq}") self.sim_api.sim_setup.output_interval = mean_freq
[docs] def obj(self, xk, *args): """ Default objective function. The usual function will be implemented here: 1. Convert the set to modelica-units 2. Simulate the converted-set 3. Get data as a dataFrame 4. Get penalty factor for the penalty function 5. Calculate the objective based on statistical values :param np.array xk: Array with normalized values for the minimizer :param int work_id: id for worker in Multiprocessing :return: Objective value based on the used quality measurement :rtype: float """ # Info: This function is called by the optimization framework (scipy, dlib, etc.) # Initialize class objects self._current_iterate = xk self._counter += 1 # Convert set if multiple goals of different scales are used xk_descaled = self.tuner_paras.descale(xk) # Set initial values of variable and fixed parameters self.sim_api.result_names = self.goals.get_sim_var_names() initial_names = self.tuner_paras.get_names() parameters = self.fixed_parameters.copy() parameters.update({name: value for name, value in zip(initial_names, xk_descaled.values)}) # Simulate # pylint: disable=broad-except try: # Generate the folder name for the calibration if self.save_files: savepath_files = os.path.join(self.sim_api.cd, f"simulation_{self._counter}") _filepath = self.sim_api.simulate( parameters=parameters, return_option="savepath", savepath=savepath_files, inputs=self.calibration_class.inputs, **self.calibration_class.input_kwargs ) # %% Load results and write to goals object sim_target_data = data_types.TimeSeriesData(_filepath) else: sim_target_data = self.sim_api.simulate( parameters=parameters, inputs=self.calibration_class.inputs, **self.calibration_class.input_kwargs ) except Exception as err: if self.fail_on_error: self.logger.error("Simulation failed. Raising the error.") raise err self.logger.error( f"Simulation failed. Returning '{self.ret_val_on_error}' " f"for the optimization. Error message: {err}" ) return self.ret_val_on_error total_res, unweighted_objective = self._kpi_and_logging_calculation( xk_descaled=xk_descaled, counter=self._counter, results=sim_target_data ) return total_res, unweighted_objective
[docs] def mp_obj(self, x, *args): # Initialize list for results num_evals = len(x) total_res_list = np.empty([num_evals, 1]) # Set initial values of variable and fixed parameters self.sim_api.result_names = self.goals.get_sim_var_names() initial_names = self.tuner_paras.get_names() parameters = self.fixed_parameters.copy() parameter_list = [] xk_descaled_list = [] for _xk_single in x: # Convert set if multiple goals of different scales are used xk_descaled = self.tuner_paras.descale(_xk_single) xk_descaled_list.append(xk_descaled) # Update Parameters parameter_copy = parameters.copy() parameter_copy.update( {name: value for name, value in zip(initial_names, xk_descaled.values)}) parameter_list.append(parameter_copy) # Simulate if self.save_files: result_file_names = [f"simulation_{self._counter + idx}" for idx in range(len(parameter_list))] _filepaths = self.sim_api.simulate( parameters=parameter_list, return_option="savepath", savepath=self.sim_api.cd, result_file_name=result_file_names, fail_on_error=self.fail_on_error, inputs=self.calibration_class.inputs, **self.calibration_class.input_kwargs ) # Load results results = [] for _filepath in _filepaths: if _filepath is None: results.append(None) else: results.append(data_types.TimeSeriesData(_filepath)) else: results = self.sim_api.simulate( parameters=parameter_list, inputs=self.calibration_class.inputs, fail_on_error=self.fail_on_error, **self.calibration_class.input_kwargs ) for idx, result in enumerate(results): self._counter += 1 self._current_iterate = result if result is None: total_res_list[idx] = self.ret_val_on_error continue total_res, unweighted_objective = self._kpi_and_logging_calculation( xk_descaled=xk_descaled_list[idx], counter=self._counter, results=result ) # Add single objective to objective list of total Population total_res_list[idx] = total_res return total_res_list
def _kpi_and_logging_calculation(self, *, xk_descaled, counter, results): """ Function to calculate everything needed in the obj or mp_obj function after the simulation finished. """ xk = self.tuner_paras.scale(xk_descaled) self.goals.set_sim_target_data(results) # Trim results based on start and end-time of cal class self.goals.set_relevant_time_intervals(self.calibration_class.relevant_intervals) # %% Evaluate the current objective # Penalty function (get penalty factor) if self.recalibration_count > 1 and self.apply_penalty: # There is no benchmark in the first iteration or # first iterations were skipped, so no penalty is applied penaltyfactor = self.get_penalty(xk_descaled, xk) # Evaluate with penalty penalty = penaltyfactor else: # Evaluate without penalty penaltyfactor = 1 penalty = None total_res, unweighted_objective = self.goals.eval_difference( verbose=True, penaltyfactor=penaltyfactor ) if self.at_calibration: # Only plot if at_calibration self.logger.calibration_callback_func( xk=xk, obj=total_res, verbose_information=unweighted_objective, penalty=penalty ) # current best iteration step of current calibration class if total_res < self._current_best_iterate["Objective"]: # self.best_goals = self.goals self._current_best_iterate = { "Iterate": counter, "Objective": total_res, "Unweighted Objective": unweighted_objective, "Parameters": xk_descaled, "Goals": self.goals, # For penalty function and for saving goals as csv "better_current_result": True, # Changed to false in this script after calling function "save_calibration_results" "Penaltyfactor": penalty } if counter >= self.max_itercount: raise MaxIterationsReached( "Terminating calibration as the maximum number " f"of iterations {self.max_itercount} has been reached." ) return total_res, unweighted_objective
[docs] def calibrate(self, framework, method=None, **kwargs) -> dict: """ Start the calibration process of the calibration classes, visualize and save the results. The arguments of this function are equal to the arguments in Optimizer.optimize(). Look at the docstring in ebcpy to know which options are available. """ # %% Start Calibration: self.at_calibration = True self.logger.log(f"Start calibration of model: {self.sim_api.model_name}" f" with framework-class {self.__class__.__name__}") self.logger.log(f"Class: {self.calibration_class.name}, Start and Stop-Time " f"of simulation: {self.calibration_class.start_time}" f"-{self.calibration_class.stop_time} s\n Time-Intervals used" f" for objective: {self.calibration_class.relevant_intervals}") # Setup the visualizer for plotting and logging: self.logger.calibrate_new_class(self.calibration_class, cd=self.cd_of_class, for_validation=False) self.logger.log_initial_names() # Duration of Calibration t_cal_start = time.time() # Run optimization try: _res = self.optimize( framework=framework, method=method, n_cpu=self.sim_api.n_cpu, **kwargs) except MaxIterationsReached as err: self.logger.log(msg=str(err), level=logging.WARNING) t_cal_stop = time.time() t_cal = t_cal_stop - t_cal_start # Check if optimization worked correctly if "Iterate" not in self._current_best_iterate: raise Exception( "Some error during calibration yielded no successful iteration. " "Can't save or return any results." ) # %% Save the relevant results. self.logger.save_calibration_result(self._current_best_iterate, self.sim_api.model_name, duration=t_cal, itercount=self.recalibration_count) # Reset self._current_best_iterate['better_current_result'] = False # Save calibrated parameter values in JSON parameter_values = {} for p_name in self._current_best_iterate['Parameters'].index: parameter_values[p_name] = self._current_best_iterate['Parameters'][p_name] self.save_results(parameter_values=parameter_values, filename=self.calibration_class.name) return parameter_values
@property def calibration_class(self) -> CalibrationClass: """Get the current calibration class""" return self._cal_class @calibration_class.setter def calibration_class(self, calibration_class: CalibrationClass): """Set the current calibration class""" self.sim_api.set_sim_setup( {"start_time": self._apply_start_time_method(start_time=calibration_class.start_time), "stop_time": calibration_class.stop_time} ) self._cal_class = calibration_class @property def tuner_paras(self) -> TunerParas: """Get the current tuner parameters of the calibration class""" return self.calibration_class.tuner_paras @tuner_paras.setter def tuner_paras(self, tuner_paras: TunerParas): """Set the current tuner parameters of the calibration class""" self.calibration_class.tuner_paras = tuner_paras @property def goals(self) -> Goals: """Get the current goals of the calibration class""" return self.calibration_class.goals @goals.setter def goals(self, goals: Goals): """Set the current goals of the calibration class""" self.calibration_class.goals = goals @property def fixed_parameters(self) -> dict: """Get the currently fixed parameters during calibration""" return self._fixed_pars @fixed_parameters.setter def fixed_parameters(self, fixed_parameters: dict): """Set the currently fixed parameters during calibration""" self._fixed_pars = fixed_parameters
[docs] def save_results(self, parameter_values: dict, filename: str): """Saves the given dict into a file with path self.result_path and name filename.""" if self.result_path is not None: os.makedirs(self.result_path, exist_ok=True) s_path = os.path.join(self.result_path, f'{filename}.json') with open(s_path, 'w') as json_file: json.dump(parameter_values, json_file, indent=4)
[docs] def validate(self, validation_class: CalibrationClass, calibration_result: Dict, verbose=False): """ Validate the given calibration class based on the given values for tuner_parameters. :param CalibrationClass validation_class: The class to validate on :param dict calibration_result: The calibration result to apply to the validation class on. """ # Start Validation: self.at_calibration = False self.logger.log(f"Start validation of model: {self.sim_api.model_name} with " f"framework-class {self.__class__.__name__}") # Use start-time of calibration class self.calibration_class = validation_class start_time = self._apply_start_time_method( start_time=self.calibration_class.start_time ) old_tuner_paras = copy(self.calibration_class.tuner_paras) tuner_values = list(calibration_result.values()) self.calibration_class.tuner_paras = TunerParas( names=list(calibration_result.keys()), initial_values=tuner_values, # Dummy bounds as they are scaled anyway bounds=[(val - 1, val + 1) for val in tuner_values] ) # Set the start-time for the simulation self.sim_api.sim_setup.start_time = start_time self.logger.calibrate_new_class(self.calibration_class, cd=self.cd_of_class, for_validation=True) # Use the results parameter vector to simulate again. self._counter = 0 # Reset to one # Scale the tuner parameters xk = self.tuner_paras.scale(tuner_values) # Evaluate objective obj, unweighted_objective = self.obj(xk=xk) self.logger.validation_callback_func( obj=obj ) # Reset tuner_parameters to avoid unwanted behaviour self.calibration_class.tuner_paras = old_tuner_paras if verbose: weights = [1] objectives = [obj] goals = ['all'] for goal, val in unweighted_objective.items(): weights.append(val[0]) objectives.append(val[1]) goals.append(goal) index = pd.MultiIndex.from_product( [[validation_class.name], goals], names=['Class', 'Goal'] ) obj_verbos = pd.DataFrame( {'weight': weights, validation_class.goals.statistical_measure: objectives}, index=index ) return obj_verbos return obj
def _handle_error(self, error): """ Also save the plots if an error occurs. See ebcpy.optimization.Optimizer._handle_error for more info. """ # This error is our own, we handle it in the calibrate() function if isinstance(error, MaxIterationsReached): raise error self.logger.save_calibration_result(best_iterate=self._current_best_iterate, model_name=self.sim_api.model_name, duration=0, itercount=0) super()._handle_error(error)
[docs] def get_penalty(self, current_tuners, current_tuners_scaled): """ Get penalty factor for evaluation of current objective. The penaltyfactor considers deviations of the tuner parameters in the objective function. First the relative deviation between the current best values of the tuner parameters from the recalibration steps and the tuner parameters obtained in the current iteration step is determined. Then the penaltyfactor is being increased according to the relative deviation. :param pd.series current_tuner_values: To add :return: float penalty Penaltyfactor for evaluation. """ # TO-DO: Add possibility to consider the sensitivity of tuner parameters # Get lists of tuner values (latest best (with all other tuners) & current values) previous = self.sim_api.all_tuners_dict previous_scaled = self.sim_api.all_tuners_dict_scaled # previous_scaled = list(self.sim_api.all_tuners_dict.keys()) current = current_tuners current_scaled = dict(current_tuners_scaled) # Apply penalty function penalty = 1 for key, value in current_scaled.items(): # Add corresponding function for penaltyfactor here if self.perform_square_deviation: # Apply quadratic deviation dev_square = (value - previous_scaled[key]) ** 2 penalty += self.penalty_factor * dev_square else: # Apply relative deviation # Ingore tuner parameter whose current best value is 0 if previous[key] == 0: continue # Get relative deviation of tuner values (reference: previous) try: dev = abs(current[key] - previous[key]) / abs(previous[key]) penalty += self.penalty_factor * dev except ZeroDivisionError: pass return penalty
def _apply_start_time_method(self, start_time): """ Method to be calculate the start_time based on the used timedelta method. :param float start_time: Start time which was specified by the user in the TOML file. :return float start_time - self.timedelta: Calculated "timedelta", if specified in the TOML file. """ return start_time - self.timedelta