Source code for aixcalibuha.utils.visualizer

"""
Module with classes and function to help visualize
different processes inside the framework. Both plots
and print-function/log-function will be implemented here.
The Visualizer Class inherits the Logger class, as logging
will always be used as a default.
"""
import os
import logging
import csv
from shutil import copyfile
import matplotlib.pyplot as plt
import numpy as np
from ebcpy.utils import setup_logger
import aixcalibuha


[docs]def short_name(ini_name: str, max_len: int): """ Shortens long strings to a max length from the front. long_string_name => ...ing_name with len(new_name) = max_len :param str ini_name: Long string to shorten. :param int max_len: Max len of the new string. :return: str The shorten string. """ if len(ini_name) > max_len: num_dots = len(ini_name) - max_len if num_dots > 3: num_dots = 3 formatted_name = "." * num_dots + ini_name[-(max_len - num_dots):] else: formatted_name = ini_name return formatted_name
[docs]class CalibrationLogger: """Base class for showing the process of functions in this Framework with print-statements and saving everything relevant as a log-file. :param str,os.path.normpath cd: Directory where to store the output of the Logger and possible child-classes. If the given directory can not be created, an error will be raised. :param str name: Name of the reason of logging, e.g. classification, processing etc. :param aixcalibuha.CalibrationClass calibration_class: Calibration class used in the calibration-process. :param logging.Logger logger: If given, this logger is used to print and or save the messsages. Else, a new one is set up. """ # Instantiate class parameters integer_prec = 4 # Number of integer parts decimal_prec = 6 _counter_calibration = 0 # Number of function calls of calibration _prec = decimal_prec _width = integer_prec + decimal_prec + 1 # Calculate the actual width def __init__(self, cd, name, calibration_class, logger=None): """Instantiate class parameters""" self._tuner_paras = None self._goals = None if logger is None: self.logger = setup_logger(cd=cd, name=name) else: if not isinstance(logger, logging.Logger): raise TypeError(f"Given logger is of type {type(logger)} " f"but should be type logging.Logger") self.logger = logger self.cd = cd self.calibration_class = calibration_class
[docs] def log(self, msg, level=logging.INFO): """Wrapper function to directly log in the internal logger""" self.logger.log(msg=msg, level=level)
[docs] def error(self, msg): """Wrapper function to directly log an error""" self.logger.error(msg=msg)
def _set_prec_and_with_for_tuner_paras(self): if self.tuner_paras.bounds is None: self.integer_prec = 4 # Number of integer parts else: bounds_min, bounds_max = self.tuner_paras.get_bounds() maximal_value = max(max(bounds_max), max(abs(bounds_min))) self.integer_prec = len(str(int(maximal_value))) self._counter_calibration = 0 # Number of function calls of calibration self._width = self.integer_prec + self.decimal_prec + 1 # Calculate the actual width
[docs] def calibration_callback_func(self, xk, obj, verbose_information, penalty=None): """ Logs the current values of the objective function. :param np.array xk: Array with the current values of the calibration :param float obj: Current objective value. :param dict verbose_information: A dict with difference-values of for all goals and the corresponding weightings :param float penalty: Penaltyfactor from current evaluation """ xk_descaled = self.tuner_paras.descale(xk) self._counter_calibration += 1 if penalty is None: info_string = self._get_tuner_para_values_as_string( xk_descaled, obj, verbose_information ) else: info_string = self._get_tuner_para_values_as_string( xk_descaled, obj, verbose_information, penalty ) self.logger.info(info_string)
[docs] def validation_callback_func(self, obj): """ Log the validation result information :param float obj: Objective value of validation. """ self.log(f"{self.goals.statistical_measure} of validation: {obj}")
[docs] def save_calibration_result(self, best_iterate, model_name, **kwargs): """ Process the result, re-run the simulation and generate a logFile for the minimal quality measurement :param dict best_iterate: Result object of the minimization :param str model_name: Name of the model being calibrated """ if "Iterate" not in best_iterate: self.logger.error("No best iterate. Can't save result") return result_log = f"\nResults for calibration of model: {model_name}\n" result_log += f"Number of iterations: {self._counter_calibration}\n" result_log += "Final parameter values:\n" # Set the iteration counter to the actual number of the best iteration is printed self._counter_calibration = best_iterate["Iterate"] result_log += f"{self._get_tuner_para_names_as_string()}\n" final_values = self._get_tuner_para_values_as_string( xk_descaled=best_iterate["Parameters"], obj=best_iterate["Objective"], unweighted_objective=best_iterate["Unweighted Objective"], penalty=best_iterate["Penaltyfactor"]) result_log += f"{final_values}\n" self.logger.info(result_log) self._counter_calibration = 0
[docs] def calibrate_new_class(self, calibration_class, cd=None, for_validation=False): """Function to setup the figures for a new class of calibration. This function is called when instantiating this Class. If you uses continuuos calibration classes, call this function before starting the next calibration-class. :param aixcalibuha.CalibrationClass calibration_class: Class holding information on names, tuner_paras, goals and time-intervals of calibration. :param str,os.path.normpath cd: Optional change in working directory to store files :param bool for_validation: If it's only for validation, only plot the goals """ if cd is not None: self.cd = cd self.calibration_class = calibration_class
@property def cd(self) -> str: """Get the current working directory for storing plots.""" return self._cd @cd.setter def cd(self, cd: str): """Set the current working directory for storing plots.""" if not os.path.exists(cd): os.makedirs(cd) self._cd = cd @property def tuner_paras(self) -> aixcalibuha.TunerParas: return self._tuner_paras @tuner_paras.setter def tuner_paras(self, tuner_paras): """ Set the currently used TunerParas object to use the information for logging. :param tuner_paras: aixcalibuha. """ if not isinstance(tuner_paras, aixcalibuha.TunerParas): raise TypeError(f"Given tuner_paras is of " f"type {type(tuner_paras).__name__} " "but type TunerParas is needed.") self._tuner_paras = tuner_paras self._set_prec_and_with_for_tuner_paras() @property def goals(self) -> aixcalibuha.Goals: """Get current goals instance""" return self._goals @goals.setter def goals(self, goals: aixcalibuha.Goals): """ Set the currently used Goals object to use the information for logging. :param ebcpy.aixcalibuha.Goals goals: Goals to be set to the object """ if not isinstance(goals, aixcalibuha.Goals): raise TypeError(f"Given goals is of type {type(goals).__name__} " "but type Goals is needed.") self._goals = goals @property def calibration_class(self) -> aixcalibuha.CalibrationClass: """Get current calibration class object""" return self._calibration_class @calibration_class.setter def calibration_class(self, calibration_class: aixcalibuha.CalibrationClass): """ Set the current calibration class. :param aixcalibuha.CalibrationClass calibration_class: Class holding information on names, tuner_paras, goals and time-intervals of calibration. """ if not isinstance(calibration_class, aixcalibuha.CalibrationClass): raise TypeError(f"Given calibration_class " f"is of type {type(calibration_class).__name__} " "but type CalibrationClass is needed.") self._calibration_class = calibration_class self.tuner_paras = calibration_class.tuner_paras if calibration_class.goals is not None: self.goals = calibration_class.goals
[docs] def log_initial_names(self): """Function to log the initial names and the statistical measure before calibration.""" self.logger.info(self._get_tuner_para_names_as_string())
[docs] def log_intersection_of_tuners(self, intersected_tuner_parameters, **kwargs): """ If an intersection for multiple classes occurs, an information about the statistics of the dataset has to be provided. :param dict intersected_tuner_parameters: Dict with cols being the name of the tuner parameter and the value being the list with all the different "best" values for the tuner parameter. """ _formatted_str = "\n".join([f"{tuner}: {values}" for tuner, values in intersected_tuner_parameters.items()]) self.logger.info("Multiple 'best' values for the following tuner parameters " "were identified in different classes:\n%s", _formatted_str)
def _get_tuner_para_names_as_string(self): """ Returns a string with the names of current tunerParameters :return: str info_string The desired string """ initial_names = list(self.tuner_paras.get_names()) info_string = "{0:9s}".format("Iteration") # Names of tuner parameter for ini_name in initial_names: # Limit string length to a certain amount. # The full name has to be displayed somewhere else formatted_name = short_name(ini_name=ini_name, max_len=self._width) info_string += " {0:{width}s}".format(formatted_name, width=self._width) # Add string for qualitative measurement used (e.g. NRMSE, MEA etc.) info_string += " {0:{width}s}".format(self.goals.statistical_measure, width=self._width) info_string += "penaltyfactor" info_string += f" Unweighted {self.goals.statistical_measure}" return info_string def _get_tuner_para_values_as_string(self, xk_descaled, obj, unweighted_objective, penalty=None): """ Returns a string with the values of current tuner parameters as well as the objective value. :param np.array xk_descaled: Array with the current values of the calibration, descaled to bounds :param float obj: Current objective value. :param dict unweighted_objective: Further information about the objective value of each individual goal :param None/float penalty: Penaltyfactor. :return: str The desired string. """ # This will limit the number of iterations to 999999999 (for correct format). # More iterations will most likely never be used. info_string = '{0:9d}'.format(self._counter_calibration) for x_value in xk_descaled: info_string += " {0:{width}.{prec}f}".format(x_value, width=self._width, prec=self._prec) # Add the last return value of the objective function. info_string += " {0:{width}.{prec}f}".format(obj, width=self._width, prec=self._prec) if penalty: info_string += " {0:{width}.{prec}f}".format(penalty, width=self._width, prec=self._prec - 3) else: info_string += " {}".format("-") _verbose_info = "= " + " + ".join(["{0:.{prec}}*{1:.{prec}}".format(val[0], val[1], prec=4) for goal, val in unweighted_objective.items()]) info_string += f" {_verbose_info}" return info_string
[docs]class CalibrationVisualizer(CalibrationLogger): """More advanced class to not only log ongoing function evaluations but also show the process of the functions by plotting interesting causalities and saving these plots. :keyword boolean show_plot: If False, all created plots are not shown during calibration but only stored at the end of the process. :keyword boolean create_tsd_plot: If False, the plot of the time series data (goals) is not created and thus shown in during calibration. It therefore is also not stored, even if you set the save_tsd_plot keyword-argument to true. :keyword boolean save_tsd_plot: If True, at each iteration the created plot of the time-series is saved. This may make the process much slower :keyword float show_plot_pause_time: Set the time (in seconds) the plt.draw() pauses. May be altered if show_plot yields plot which disappear to fast. Default is 1-e3 s. """ def __init__(self, cd, name, calibration_class, logger=None, **kwargs): """Instantiate class parameters""" # Instantiate the logger: super().__init__(cd=cd, name=name, calibration_class=calibration_class, logger=logger) # Setup dummy parameters so class-functions # now the type of those later created objects: self._n_cols_goals, self._n_rows_goals, self._n_cols_tuner, self._n_rows_tuner = 1, 1, 1, 1 self.fig_tuner, self.ax_tuner = None, None self.fig_goal, self.ax_goal = None, None self.fig_obj, self.ax_obj = None, None self._num_goals = 0 self.goals_dir = "TimeSeriesPlot" # Set supported kwargs: plt.ioff() # Turn of interactive mode. self.save_tsd_plot = kwargs.get("save_tsd_plot", False) self.create_tsd_plot = kwargs.get("create_tsd_plot", True) self.show_plot = kwargs.get("show_plot", True) self.file_type = kwargs.get("file_type", "svg") self.show_plot_pause_time = kwargs.get("show_plot_pause_time", 1e-3) if not isinstance(self.show_plot_pause_time, (float, int)): raise TypeError( f"Given 'show_plot_pause_time' needs to " f"be float or int but is {type(self.show_plot_pause_time)}." )
[docs] def calibrate_new_class(self, calibration_class, cd=None, for_validation=False): """Function to setup the figures for a new class of calibration. This function is called when instantiating this Class. If you uses continuuos calibration classes, call this function before starting the next calibration-class. :param aixcalibuha.CalibrationClass calibration_class: Class holding information on names, tuner_paras, goals and time-intervals of calibration. :param str,os.path.normpath cd: Optional change in working directory to store files :param bool for_validation: If it's only for validation, only plot the goals """ super().calibrate_new_class(calibration_class, cd) name = calibration_class.name # Close all old figures to create new ones. plt.close("all") if not for_validation: # %% Set-up figure for objective-plotting self.fig_obj, self.ax_obj = plt.subplots(1, 1) self.fig_obj.suptitle(name + ": Objective") self.ax_obj.set_ylabel(self.goals.statistical_measure) self.ax_obj.set_xlabel("Number iterations") # If the changes are small, it seems like the plot does # not fit the printed values. This boolean assures that no offset is used. self.ax_obj.ticklabel_format(useOffset=False) # %% Setup Tuner-Paras figure # Make a almost quadratic layout based on the number of tuner-parameters evolved. num_tuners = len(self.tuner_paras.get_names()) self._n_cols_tuner = int(np.floor(np.sqrt(num_tuners))) self._n_rows_tuner = int(np.ceil(num_tuners / self._n_cols_tuner)) self.fig_tuner, self.ax_tuner = plt.subplots(self._n_rows_tuner, self._n_cols_tuner, squeeze=False, sharex=True) self.fig_tuner.suptitle(name + ": Tuner Parameters") self._plot_tuner_parameters(for_setup=True) # %% Setup Goals figure # Only a dummy, as the figure is recreated at every iteration if self.goals is not None: self._num_goals = len(self.goals.get_goals_list()) self._n_cols_goals = int(np.floor(np.sqrt(self._num_goals))) self._n_rows_goals = int(np.ceil(self._num_goals / self._n_cols_goals)) self.fig_goal, self.ax_goal = plt.subplots(self._n_rows_goals, self._n_cols_goals, squeeze=False, sharex=True) self.fig_goal.suptitle(name + ": Goals")
[docs] def calibration_callback_func(self, xk, obj, verbose_information, penalty=None): """ Logs the current values of the objective function. :param np.array xk: Array with the current values of the calibration :param float obj: Current objective value. :param dict verbose_information: A dict with difference-values of for all goals and the corresponding weightings """ # Call the logger function to print and log super().calibration_callback_func(xk, obj, verbose_information, penalty) # Plot the current objective value self.ax_obj.plot(self._counter_calibration, obj, "ro") # Plot the tuner parameters self._plot_tuner_parameters(xk=xk) # Plot the measured and simulated data if self.goals is not None and self.create_tsd_plot: self._plot_goals() self._show_plot()
[docs] def validation_callback_func(self, obj): """ Log the validation result information. Also plot if selected. :param float obj: Objective value of validation. """ super().validation_callback_func(obj=obj) # Plot the measured and simulated data if self.goals is not None and self.create_tsd_plot: self._plot_goals(at_validation=True) self._show_plot(for_validation=True)
def _show_plot(self, for_validation=False): """Show plot if activated""" if not self.show_plot: return plt.draw() if self.create_tsd_plot: self.fig_goal.canvas.draw_idle() if not for_validation: self.fig_obj.canvas.draw_idle() self.fig_tuner.canvas.draw_idle() plt.pause(self.show_plot_pause_time) else: plt.show()
[docs] def save_calibration_result(self, best_iterate, model_name, **kwargs): """ Process the result, re-run the simulation and generate a logFile for the minimal quality measurement :param scipy.optimize.minimize.result best_iterate: Result object of the minimization :param str model_name: Name of the model being calibrated """ if "Iterate" not in best_iterate: self.logger.error("No best iterate. Can't save result") return super().save_calibration_result(best_iterate, model_name, **kwargs) itercount = kwargs["itercount"] duration = kwargs["duration"] # Extract filepathes iterpath = os.path.join(self.cd, f'Iteration_{itercount}') if not os.path.exists(iterpath): os.mkdir(iterpath) filepath_tuner = os.path.join(iterpath, "tuner_parameter_plot.%s" % self.file_type) filepath_obj = os.path.join(iterpath, "objective_plot.%s" % self.file_type) if self.save_tsd_plot: bestgoal = os.path.join(self.cd, self.goals_dir, str(best_iterate["Iterate"]) + f"_goals.{self.file_type}") # Copy best goals figure copyfile(bestgoal, f'{iterpath}\\best_goals.%s' % self.file_type) # Save calibration results as csv res_dict = dict(best_iterate['Parameters']) res_dict['Objective'] = best_iterate["Objective"] res_dict['Duration'] = duration res_csv = f'{self.cd}\\Iteration_{itercount}\\RESUL' \ f'TS_{self.calibration_class.name}_iteration{itercount}.csv' with open(res_csv, 'w') as rescsv: writer = csv.DictWriter(rescsv, res_dict.keys()) writer.writeheader() writer.writerow(res_dict) # Save figures & close plots self.fig_tuner.savefig(filepath_tuner) self.fig_obj.savefig(filepath_obj) plt.close("all") if best_iterate['better_current_result'] and self.save_tsd_plot: # save improvement of recalibration ("best goals df" as csv) best_iterate['Goals'].get_goals_data().to_csv( os.path.join(iterpath, 'goals_df.csv'), sep=",", decimal="." )
[docs] def log_intersection_of_tuners(self, intersected_tuner_parameters, **kwargs): """ If an intersection for multiple classes occurs, an information about the statistics of the dataset has to be provided. :param dict intersected_tuner_parameters: Dict with cols being the name of the tuner parameter and the value being the list with all the different "best" values for the tuner parameter. """ super().log_intersection_of_tuners(intersected_tuner_parameters, **kwargs) x_labels = intersected_tuner_parameters.keys() data = list(intersected_tuner_parameters.values()) fig_intersection, ax_intersection = plt.subplots(1, len(x_labels), squeeze=False) for i, x_label in enumerate(x_labels): # Remove name of record (modelica) for visualization x_label_vis = x_label.replace('TunerParameter.', '') cur_ax = ax_intersection[0][i] cur_ax.violinplot(data[i], showmeans=True, showmedians=False, showextrema=True) # cur_ax.plot([1] * len(data[i]), data[i], "ro", label="Results") cur_ax.plot([1] * len(data[i]), data[i], "ro", label="Ergebnisse") cur_ax.get_xaxis().set_tick_params(direction='out') cur_ax.xaxis.set_ticks_position('bottom') cur_ax.set_xticks(np.arange(1, 2)) cur_ax.set_xlim(0.25, 1.75) cur_ax.set_xticklabels([x_label_vis]) cur_ax.legend(loc="upper right") # Always store in the parent diretory as this info is relevant for all classes # fig_intersection.suptitle("Intersection of Tuner Parameters") fig_intersection.suptitle("Intersection of Tuner Parameters") path_intersections = os.path.join(os.path.dirname(self.cd), "tunerintersections") if not os.path.exists(path_intersections): os.makedirs(path_intersections) if "itercount" in kwargs: fig_intersection.savefig( os.path.join( path_intersections, f'tuner_parameter_intersection_plot_it{kwargs["itercount"]}.{self.file_type}') ) else: fig_intersection.savefig( os.path.join(path_intersections, f'tuner_parameter_intersection_plot.{self.file_type}') ) if self.show_plot: plt.draw() plt.pause(15)
def _plot_tuner_parameters(self, xk=None, for_setup=False): """ Plot the tuner parameter values history for better user interaction :param np.array xk: current iterate, scaled. :param bool for_setup: True if the function is called to initialize the calibration """ tuner_counter = 0 for row in range(self._n_rows_tuner): for col in range(self._n_cols_tuner): cur_ax = self.ax_tuner[row][col] tuner_names_vis = self.tuner_paras.get_names() # Remove name of record (modelica) for i, name in enumerate(tuner_names_vis): tuner_names_vis[i] = name.replace('TunerParameter.', '') if tuner_counter >= len(self.tuner_paras.get_names()): cur_ax.axis("off") else: tuner_para_name = self.tuner_paras.get_names()[tuner_counter] if for_setup: cur_ax.set_ylabel(tuner_names_vis[tuner_counter]) max_value = self.tuner_paras.get_value(tuner_para_name, "max") min_value = self.tuner_paras.get_value(tuner_para_name, "min") ini_val = self.tuner_paras.get_value(tuner_para_name, "initial_value") cur_ax.axhline(max_value, color="r") cur_ax.axhline(min_value, color="r") cur_ax.plot(self._counter_calibration, ini_val, "bo") if xk is not None: cur_val = self.tuner_paras.descale(xk)[tuner_counter] cur_ax.plot(self._counter_calibration, cur_val, "bo") tuner_counter += 1 def _plot_goals(self, at_validation=False): """Plot the measured and simulated data for the current iterate""" # Get information on the relevant-intervals of the calibration: rel_intervals = self.calibration_class.relevant_intervals _goals_df = self.goals.get_goals_data() _goals_names = self.goals.get_goals_list() goal_counter = 0 for row in range(self._n_rows_goals): for col in range(self._n_cols_goals): cur_ax = self.ax_goal[row][col] cur_ax.cla() if goal_counter >= self._num_goals: cur_ax.axis("off") else: cur_goal = _goals_names[goal_counter] cur_ax.plot(_goals_df[cur_goal, self.goals.sim_tag_str], label=cur_goal + f"_{self.goals.sim_tag_str}", linestyle="--", color="r") cur_ax.plot(_goals_df[cur_goal, self.goals.meas_tag_str], label=cur_goal + f"_{self.goals.meas_tag_str}", color="b") # Mark the disregarded intervals in grey _start = self.calibration_class.start_time _first = True # Only create one label for interval in rel_intervals: _end = interval[0] if _first: cur_ax.axvspan(_start, _end, facecolor="grey", alpha=0.7, label="Disregarded Interval") _first = False # Only create one label else: cur_ax.axvspan(_start, _end, facecolor="grey", alpha=0.7) _start = interval[1] # Final plot of grey cur_ax.axvspan(rel_intervals[-1][-1], self.calibration_class.stop_time, facecolor="grey", alpha=0.5) cur_ax.legend(loc="lower right") cur_ax.set_xlabel("Time / s") goal_counter += 1 if at_validation: name_id = "Validation" else: name_id = self._counter_calibration if self.save_tsd_plot: _savedir = os.path.join(self.cd, self.goals_dir) if not os.path.exists(_savedir): os.makedirs(_savedir) self.fig_goal.savefig( os.path.join(_savedir, f"{name_id}_goals.{self.file_type}"))