Coverage for aixcalibuha/calibration/calibrator.py: 57%
226 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-01-27 10:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-01-27 10:48 +0000
1"""
2Module containing the basic class to calibrate
3a dynamic model, e.g. a modelica model.
4"""
6import os
7import json
8import time
9import logging
10from typing import Dict
11from copy import copy
12import numpy as np
13import pandas as pd
14from ebcpy import data_types, Optimizer
15from ebcpy.simulationapi import SimulationAPI
16from aixcalibuha.utils import visualizer, MaxIterationsReached
17from aixcalibuha import CalibrationClass, Goals, TunerParas
20class Calibrator(Optimizer):
21 """
22 This class can Calibrator be used for single
23 time-intervals of calibration.
25 :param str,os.path.normpath cd:
26 Working directory
27 :param ebcpy.simulationapi.SimulationAPI sim_api:
28 Simulation-API for running the models
29 :param CalibrationClass calibration_class:
30 Class with information on Goals and tuner-parameters for calibration
31 :keyword str result_path:
32 If given, then the resulting parameter values will be stored in a JSON file
33 at the given path.
34 :keyword float timedelta:
35 If you use this class for calibrating a single time-interval,
36 you can set the timedelta to instantiate the simulation before
37 actually evaluating it for the objective.
38 The given float (default is 0) is subtracted from the start_time
39 of your calibration_class. You can find a visualisation of said timedelta
40 in the img folder of the project.
41 :keyword boolean save_files:
42 If true, all simulation files for each iteration will be saved!
43 :keyword boolean verbose_logging:
44 Default is True. If False, the standard Logger without
45 Visualization in Form of plots is used.
46 If you use this, the following keyword arguments below will help
47 to further adjust the logging.
48 :keyword boolean show_plot:
49 Default is True. If False, all created plots are not shown during
50 calibration but only stored at the end of the process.
51 :keyword boolean create_tsd_plot:
52 Default is True. If False, the plot of the time series data (goals)
53 is not created and thus shown in during calibration. It therefore is
54 also not stored, even if you set the save_tsd_plot keyword-argument to true.
55 :keyword boolean save_tsd_plot:
56 Default is False. If True, at each iteration the created plot of the
57 time-series is saved. This may make the process much slower
58 :keyword boolean fail_on_error:
59 Default is False. If True, the calibration will stop with an error if
60 the simulation fails. See also: ``ret_val_on_error``
61 :keyword float,np.NAN ret_val_on_error:
62 Default is np.NAN. If ``fail_on_error`` is false, you can specify here
63 which value to return in the case of a failed simulation. Possible
64 options are np.NaN, np.inf or some other high numbers. be aware that this
65 max influence the solver.
66 :keyword dict fixed_parameters:
67 Default is an empty dict. This dict may be used to add certain parameters
68 to the simulation which are not tuned / variable during calibration.
69 Such parameters may be used if the default values in the model don't
70 represent the parameter values you want to use.
71 :keyword boolean apply_penalty:
72 Default is true. Specifies if a penalty function should be applied or not.
73 :keyword boolean penalty_factor:
74 Default is 0. Quantifies the impact of the penalty term on the objective function.
75 The penalty factor is added to the objective function.
76 :keyword boolean recalibration_count:
77 Default is 0. Works as a counter and specifies the current cycle of recalibration.
78 :keyword boolean perform_square_deviation:
79 Default is false.
80 If true the penalty function will evaluate the penalty factor with a quadratic approach.
81 :keyword int max_itercount:
82 Default is Infinity.
83 Maximum number of iterations of calibration.
84 This may be useful to explicitly limit the calibration
85 time.
86 :keyword str plot_file_type:
87 File ending of created plots.
88 Any supported option in matplotlib, e.g. svg, png, pdf ...
89 Default is png
91 """
93 def __init__(self,
94 cd: str,
95 sim_api: SimulationAPI,
96 calibration_class: CalibrationClass,
97 **kwargs):
98 """Instantiate instance attributes"""
99 # %% Kwargs
100 # Initialize supported keywords with default value
101 # Pop the items so they wont be added when calling the
102 # __init__ of the parent class. Always pop with a default value in case
103 # the keyword is not passed.
104 self.verbose_logging = kwargs.pop("verbose_logging", True)
105 self.save_files = kwargs.pop("save_files", False)
106 self.timedelta = kwargs.pop("timedelta", 0)
107 self.fail_on_error = kwargs.pop("fail_on_error", False)
108 self.ret_val_on_error = kwargs.pop("ret_val_on_error", np.NAN)
109 self.fixed_parameters = kwargs.pop("fixed_parameters", {})
110 self.apply_penalty = kwargs.pop("apply_penalty", True)
111 self.penalty_factor = kwargs.pop("penalty_factor", 0)
112 self.recalibration_count = kwargs.pop("recalibration_count", 0)
113 self.perform_square_deviation = kwargs.pop("square_deviation", False)
114 self.result_path = kwargs.pop('result_path', None)
115 self.max_itercount = kwargs.pop('max_itercount', np.inf)
116 self.at_calibration = True # Boolean to indicate if validating or calibrating
117 # Extract kwargs for the visualizer
118 visualizer_kwargs = {
119 "save_tsd_plot": kwargs.pop("save_tsd_plot", False),
120 "create_tsd_plot": kwargs.pop("create_tsd_plot", True),
121 "show_plot": kwargs.pop("show_plot", True),
122 "show_plot_pause_time": kwargs.pop("show_plot_pause_time", 1e-3),
123 "file_type": kwargs.pop("plot_file_type", "png"),
124 }
126 # Check if types are correct:
127 # Booleans:
128 _bool_kwargs = ["save_files"]
129 for bool_keyword in _bool_kwargs:
130 keyword_value = self.__getattribute__(bool_keyword)
131 if not isinstance(keyword_value, bool):
132 raise TypeError(f"Given {bool_keyword} is of type "
133 f"{type(keyword_value).__name__} but should be type bool")
135 # %% Initialize all public parameters
136 super().__init__(cd, **kwargs)
137 # Set sim_api
138 self.sim_api = sim_api
140 if not isinstance(calibration_class, CalibrationClass):
141 raise TypeError(f"calibration_classes is of type {type(calibration_class).__name__} "
142 f"but should be CalibrationClass")
143 self.calibration_class = calibration_class
144 # Scale tuner on boundaries
145 self.x0 = self.tuner_paras.scale(self.tuner_paras.get_initial_values())
146 if self.tuner_paras.bounds is None:
147 self.bounds = None
148 else:
149 # As tuner-parameters are scaled between 0 and 1, the scaled bounds are always 0 and 1
150 self.bounds = [(0, 1) for i in range(len(self.x0))]
151 # Add the values to the simulation setup.
152 self.sim_api.set_sim_setup(
153 {"start_time": self.calibration_class.start_time - self.timedelta,
154 "stop_time": self.calibration_class.stop_time}
155 )
157 # %% Setup the logger
158 # De-register the logger setup in the optimization class:
159 if self.verbose_logging:
160 self.logger = visualizer.CalibrationVisualizer(
161 cd=cd,
162 name=self.__class__.__name__,
163 calibration_class=self.calibration_class,
164 logger=self.logger,
165 **visualizer_kwargs
166 )
167 else:
168 self.logger = visualizer.CalibrationLogger(
169 cd=cd,
170 name=self.__class__.__name__,
171 calibration_class=self.calibration_class,
172 logger=self.logger
173 )
175 self.cd_of_class = cd # Single class does not need an extra folder
177 # Set the output interval according the the given Goals
178 mean_freq = self.goals.get_meas_frequency()
179 self.logger.log("Setting output_interval of simulation according "
180 f"to measurement target data frequency: {mean_freq}")
181 self.sim_api.sim_setup.output_interval = mean_freq
183 def obj(self, xk, *args):
184 """
185 Default objective function.
186 The usual function will be implemented here:
188 1. Convert the set to modelica-units
189 2. Simulate the converted-set
190 3. Get data as a dataFrame
191 4. Get penalty factor for the penalty function
192 5. Calculate the objective based on statistical values
194 :param np.array xk:
195 Array with normalized values for the minimizer
196 :param int work_id:
197 id for worker in Multiprocessing
198 :return:
199 Objective value based on the used quality measurement
200 :rtype: float
201 """
202 # Info: This function is called by the optimization framework (scipy, dlib, etc.)
203 # Initialize class objects
204 self._current_iterate = xk
205 self._counter += 1
207 # Convert set if multiple goals of different scales are used
208 xk_descaled = self.tuner_paras.descale(xk)
210 # Set initial values of variable and fixed parameters
211 self.sim_api.result_names = self.goals.get_sim_var_names()
212 initial_names = self.tuner_paras.get_names()
213 parameters = self.fixed_parameters.copy()
214 parameters.update({name: value for name, value in zip(initial_names, xk_descaled.values)})
215 # Simulate
216 # pylint: disable=broad-except
217 try:
218 # Generate the folder name for the calibration
219 if self.save_files:
220 savepath_files = os.path.join(self.sim_api.cd,
221 f"simulation_{self._counter}")
222 _filepath = self.sim_api.simulate(
223 parameters=parameters,
224 return_option="savepath",
225 savepath=savepath_files,
226 inputs=self.calibration_class.inputs,
227 **self.calibration_class.input_kwargs
228 )
229 # %% Load results and write to goals object
230 sim_target_data = data_types.TimeSeriesData(_filepath)
231 else:
232 sim_target_data = self.sim_api.simulate(
233 parameters=parameters,
234 inputs=self.calibration_class.inputs,
235 **self.calibration_class.input_kwargs
236 )
237 except Exception as err:
238 if self.fail_on_error:
239 self.logger.error("Simulation failed. Raising the error.")
240 raise err
241 self.logger.error(
242 f"Simulation failed. Returning '{self.ret_val_on_error}' "
243 f"for the optimization. Error message: {err}"
244 )
245 return self.ret_val_on_error
247 total_res, unweighted_objective = self._kpi_and_logging_calculation(
248 xk_descaled=xk_descaled,
249 counter=self._counter,
250 results=sim_target_data
251 )
253 return total_res, unweighted_objective
255 def mp_obj(self, x, *args):
256 # Initialize list for results
257 num_evals = len(x)
258 total_res_list = np.empty([num_evals, 1])
259 # Set initial values of variable and fixed parameters
260 self.sim_api.result_names = self.goals.get_sim_var_names()
261 initial_names = self.tuner_paras.get_names()
262 parameters = self.fixed_parameters.copy()
264 parameter_list = []
265 xk_descaled_list = []
266 for _xk_single in x:
267 # Convert set if multiple goals of different scales are used
268 xk_descaled = self.tuner_paras.descale(_xk_single)
269 xk_descaled_list.append(xk_descaled)
270 # Update Parameters
271 parameter_copy = parameters.copy()
272 parameter_copy.update(
273 {name: value for name, value in zip(initial_names, xk_descaled.values)})
274 parameter_list.append(parameter_copy)
276 # Simulate
277 if self.save_files:
278 result_file_names = [f"simulation_{self._counter + idx}" for idx in
279 range(len(parameter_list))]
280 _filepaths = self.sim_api.simulate(
281 parameters=parameter_list,
282 return_option="savepath",
283 savepath=self.sim_api.cd,
284 result_file_name=result_file_names,
285 fail_on_error=self.fail_on_error,
286 inputs=self.calibration_class.inputs,
287 **self.calibration_class.input_kwargs
288 )
289 # Load results
290 results = []
291 for _filepath in _filepaths:
292 if _filepath is None:
293 results.append(None)
294 else:
295 results.append(data_types.TimeSeriesData(_filepath))
296 else:
297 results = self.sim_api.simulate(
298 parameters=parameter_list,
299 inputs=self.calibration_class.inputs,
300 fail_on_error=self.fail_on_error,
301 **self.calibration_class.input_kwargs
302 )
304 for idx, result in enumerate(results):
305 self._counter += 1
306 self._current_iterate = result
307 if result is None:
308 total_res_list[idx] = self.ret_val_on_error
309 continue
310 total_res, unweighted_objective = self._kpi_and_logging_calculation(
311 xk_descaled=xk_descaled_list[idx],
312 counter=self._counter,
313 results=result
314 )
315 # Add single objective to objective list of total Population
316 total_res_list[idx] = total_res
318 return total_res_list
320 def _kpi_and_logging_calculation(self, *, xk_descaled, counter, results):
321 """
322 Function to calculate everything needed in the obj or mp_obj
323 function after the simulation finished.
325 """
326 xk = self.tuner_paras.scale(xk_descaled)
328 self.goals.set_sim_target_data(results)
329 # Trim results based on start and end-time of cal class
330 self.goals.set_relevant_time_intervals(self.calibration_class.relevant_intervals)
332 # %% Evaluate the current objective
333 # Penalty function (get penalty factor)
334 if self.recalibration_count > 1 and self.apply_penalty:
335 # There is no benchmark in the first iteration or
336 # first iterations were skipped, so no penalty is applied
337 penaltyfactor = self.get_penalty(xk_descaled, xk)
338 # Evaluate with penalty
339 penalty = penaltyfactor
340 else:
341 # Evaluate without penalty
342 penaltyfactor = 1
343 penalty = None
344 total_res, unweighted_objective = self.goals.eval_difference(
345 verbose=True,
346 penaltyfactor=penaltyfactor
347 )
348 if self.at_calibration: # Only plot if at_calibration
349 self.logger.calibration_callback_func(
350 xk=xk,
351 obj=total_res,
352 verbose_information=unweighted_objective,
353 penalty=penalty
354 )
355 # current best iteration step of current calibration class
356 if total_res < self._current_best_iterate["Objective"]:
357 # self.best_goals = self.goals
358 self._current_best_iterate = {
359 "Iterate": counter,
360 "Objective": total_res,
361 "Unweighted Objective": unweighted_objective,
362 "Parameters": xk_descaled,
363 "Goals": self.goals,
364 # For penalty function and for saving goals as csv
365 "better_current_result": True,
366 # Changed to false in this script after calling function "save_calibration_results"
367 "Penaltyfactor": penalty
368 }
370 if counter >= self.max_itercount:
371 raise MaxIterationsReached(
372 "Terminating calibration as the maximum number "
373 f"of iterations {self.max_itercount} has been reached."
374 )
376 return total_res, unweighted_objective
378 def calibrate(self, framework, method=None, **kwargs) -> dict:
379 """
380 Start the calibration process of the calibration classes, visualize and save the results.
382 The arguments of this function are equal to the
383 arguments in Optimizer.optimize(). Look at the docstring
384 in ebcpy to know which options are available.
385 """
386 # %% Start Calibration:
387 self.at_calibration = True
388 self.logger.log(f"Start calibration of model: {self.sim_api.model_name}"
389 f" with framework-class {self.__class__.__name__}")
390 self.logger.log(f"Class: {self.calibration_class.name}, Start and Stop-Time "
391 f"of simulation: {self.calibration_class.start_time}"
392 f"-{self.calibration_class.stop_time} s\n Time-Intervals used"
393 f" for objective: {self.calibration_class.relevant_intervals}")
395 # Setup the visualizer for plotting and logging:
396 self.logger.calibrate_new_class(self.calibration_class,
397 cd=self.cd_of_class,
398 for_validation=False)
399 self.logger.log_initial_names()
401 # Duration of Calibration
402 t_cal_start = time.time()
404 # Run optimization
405 try:
406 _res = self.optimize(
407 framework=framework,
408 method=method,
409 n_cpu=self.sim_api.n_cpu,
410 **kwargs)
411 except MaxIterationsReached as err:
412 self.logger.log(msg=str(err), level=logging.WARNING)
413 t_cal_stop = time.time()
414 t_cal = t_cal_stop - t_cal_start
416 # Check if optimization worked correctly
417 if "Iterate" not in self._current_best_iterate:
418 raise Exception(
419 "Some error during calibration yielded no successful iteration. "
420 "Can't save or return any results."
421 )
423 # %% Save the relevant results.
424 self.logger.save_calibration_result(self._current_best_iterate,
425 self.sim_api.model_name,
426 duration=t_cal,
427 itercount=self.recalibration_count)
428 # Reset
429 self._current_best_iterate['better_current_result'] = False
431 # Save calibrated parameter values in JSON
432 parameter_values = {}
433 for p_name in self._current_best_iterate['Parameters'].index:
434 parameter_values[p_name] = self._current_best_iterate['Parameters'][p_name]
435 self.save_results(parameter_values=parameter_values,
436 filename=self.calibration_class.name)
437 return parameter_values
439 @property
440 def calibration_class(self) -> CalibrationClass:
441 """Get the current calibration class"""
442 return self._cal_class
444 @calibration_class.setter
445 def calibration_class(self, calibration_class: CalibrationClass):
446 """Set the current calibration class"""
447 self.sim_api.set_sim_setup(
448 {"start_time": self._apply_start_time_method(start_time=calibration_class.start_time),
449 "stop_time": calibration_class.stop_time}
450 )
451 self._cal_class = calibration_class
453 @property
454 def tuner_paras(self) -> TunerParas:
455 """Get the current tuner parameters of the calibration class"""
456 return self.calibration_class.tuner_paras
458 @tuner_paras.setter
459 def tuner_paras(self, tuner_paras: TunerParas):
460 """Set the current tuner parameters of the calibration class"""
461 self.calibration_class.tuner_paras = tuner_paras
463 @property
464 def goals(self) -> Goals:
465 """Get the current goals of the calibration class"""
466 return self.calibration_class.goals
468 @goals.setter
469 def goals(self, goals: Goals):
470 """Set the current goals of the calibration class"""
471 self.calibration_class.goals = goals
473 @property
474 def fixed_parameters(self) -> dict:
475 """Get the currently fixed parameters during calibration"""
476 return self._fixed_pars
478 @fixed_parameters.setter
479 def fixed_parameters(self, fixed_parameters: dict):
480 """Set the currently fixed parameters during calibration"""
481 self._fixed_pars = fixed_parameters
483 def save_results(self, parameter_values: dict, filename: str):
484 """Saves the given dict into a file with path
485 self.result_path and name filename."""
486 if self.result_path is not None:
487 os.makedirs(self.result_path, exist_ok=True)
488 s_path = os.path.join(self.result_path, f'{filename}.json')
489 with open(s_path, 'w') as json_file:
490 json.dump(parameter_values, json_file, indent=4)
492 def validate(self, validation_class: CalibrationClass, calibration_result: Dict, verbose=False):
493 """
494 Validate the given calibration class based on the given
495 values for tuner_parameters.
497 :param CalibrationClass validation_class:
498 The class to validate on
499 :param dict calibration_result:
500 The calibration result to apply to the validation class on.
501 """
502 # Start Validation:
503 self.at_calibration = False
504 self.logger.log(f"Start validation of model: {self.sim_api.model_name} with "
505 f"framework-class {self.__class__.__name__}")
506 # Use start-time of calibration class
507 self.calibration_class = validation_class
508 start_time = self._apply_start_time_method(
509 start_time=self.calibration_class.start_time
510 )
511 old_tuner_paras = copy(self.calibration_class.tuner_paras)
512 tuner_values = list(calibration_result.values())
513 self.calibration_class.tuner_paras = TunerParas(
514 names=list(calibration_result.keys()),
515 initial_values=tuner_values,
516 # Dummy bounds as they are scaled anyway
517 bounds=[(val - 1, val + 1) for val in tuner_values]
518 )
520 # Set the start-time for the simulation
521 self.sim_api.sim_setup.start_time = start_time
523 self.logger.calibrate_new_class(self.calibration_class,
524 cd=self.cd_of_class,
525 for_validation=True)
527 # Use the results parameter vector to simulate again.
528 self._counter = 0 # Reset to one
529 # Scale the tuner parameters
530 xk = self.tuner_paras.scale(tuner_values)
531 # Evaluate objective
532 obj, unweighted_objective = self.obj(xk=xk)
533 self.logger.validation_callback_func(
534 obj=obj
535 )
536 # Reset tuner_parameters to avoid unwanted behaviour
537 self.calibration_class.tuner_paras = old_tuner_paras
538 if verbose:
539 weights = [1]
540 objectives = [obj]
541 goals = ['all']
542 for goal, val in unweighted_objective.items():
543 weights.append(val[0])
544 objectives.append(val[1])
545 goals.append(goal)
546 index = pd.MultiIndex.from_product(
547 [[validation_class.name], goals],
548 names=['Class', 'Goal']
549 )
550 obj_verbos = pd.DataFrame(
551 {'weight': weights, validation_class.goals.statistical_measure: objectives},
552 index=index
553 )
554 return obj_verbos
555 return obj
557 def _handle_error(self, error):
558 """
559 Also save the plots if an error occurs.
560 See ebcpy.optimization.Optimizer._handle_error for more info.
561 """
562 # This error is our own, we handle it in the calibrate() function
563 if isinstance(error, MaxIterationsReached):
564 raise error
565 self.logger.save_calibration_result(best_iterate=self._current_best_iterate,
566 model_name=self.sim_api.model_name,
567 duration=0,
568 itercount=0)
569 super()._handle_error(error)
571 def get_penalty(self, current_tuners, current_tuners_scaled):
572 """
573 Get penalty factor for evaluation of current objective. The penaltyfactor
574 considers deviations of the tuner parameters in the objective function.
575 First the relative deviation between the current best values
576 of the tuner parameters from the recalibration steps and
577 the tuner parameters obtained in the current iteration step is determined.
578 Then the penaltyfactor is being increased according to the relative deviation.
580 :param pd.series current_tuner_values:
581 To add
582 :return: float penalty
583 Penaltyfactor for evaluation.
584 """
585 # TO-DO: Add possibility to consider the sensitivity of tuner parameters
587 # Get lists of tuner values (latest best (with all other tuners) & current values)
588 previous = self.sim_api.all_tuners_dict
589 previous_scaled = self.sim_api.all_tuners_dict_scaled
590 # previous_scaled = list(self.sim_api.all_tuners_dict.keys())
591 current = current_tuners
592 current_scaled = dict(current_tuners_scaled)
594 # Apply penalty function
595 penalty = 1
596 for key, value in current_scaled.items():
597 # Add corresponding function for penaltyfactor here
598 if self.perform_square_deviation:
599 # Apply quadratic deviation
600 dev_square = (value - previous_scaled[key]) ** 2
601 penalty += self.penalty_factor * dev_square
602 else:
603 # Apply relative deviation
604 # Ingore tuner parameter whose current best value is 0
605 if previous[key] == 0:
606 continue
607 # Get relative deviation of tuner values (reference: previous)
608 try:
609 dev = abs(current[key] - previous[key]) / abs(previous[key])
610 penalty += self.penalty_factor * dev
611 except ZeroDivisionError:
612 pass
614 return penalty
616 def _apply_start_time_method(self, start_time):
617 """
618 Method to be calculate the start_time based on the used
619 timedelta method.
621 :param float start_time:
622 Start time which was specified by the user in the TOML file.
623 :return float start_time - self.timedelta:
624 Calculated "timedelta", if specified in the TOML file.
625 """
626 return start_time - self.timedelta