Coverage for aixcalibuha/calibration/calibrator.py: 64%
263 statements
« prev ^ index » next coverage.py v7.4.4, created at 2026-04-30 14:23 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2026-04-30 14:23 +0000
1"""
2Module containing the basic class to calibrate
3a dynamic model, e.g. a modelica model.
4"""
6import os
7import json
8from pathlib import Path
9import time
10import logging
11from typing import Dict, Union
12from copy import copy
13import numpy as np
14import pandas as pd
15from ebcpy import load_time_series_data, Optimizer
16from ebcpy.simulationapi import SimulationAPI
17from aixcalibuha.utils import visualizer, MaxIterationsReached, MaxTimeReached, convert_mat_to_suffix, \
18 empty_postprocessing
19from aixcalibuha import CalibrationClass, Goals, TunerParas
22class Calibrator(Optimizer):
23 """
24 This class can Calibrator be used for single
25 time-intervals of calibration.
27 :param str,Path working_directory:
28 Working directory
29 :param ebcpy.simulationapi.SimulationAPI sim_api:
30 Simulation-API for running the models
31 :param CalibrationClass calibration_class:
32 Class with information on Goals and tuner-parameters for calibration
33 :keyword str result_path:
34 If given, then the resulting parameter values will be stored in a JSON file
35 at the given path.
36 :keyword float timedelta:
37 If you use this class for calibrating a single time-interval,
38 you can set the timedelta to instantiate the simulation before
39 actually evaluating it for the objective.
40 The given float (default is 0) is subtracted from the start_time
41 of your calibration_class. You can find a visualisation of said timedelta
42 in the img folder of the project.
43 :keyword boolean save_files:
44 If true, all simulation files for each iteration will be saved!
45 :keyword str suffix_files:
46 Default 'csv'. Specifies the data format to store the simulation files in.
47 Options are 'csv', 'parquet', or 'parquet.COMPRESSION' (e.g. 'parquet.snappy',
48 'parquet.gzip') to save only the goals.
49 If you want to keep the original 'mat' file specify 'mat' here
50 (not recommended due to high disk size usage).
51 :keyword str parquet_engine:
52 The engine to use for the data format parquet.
53 Supported options can be extracted
54 from the ebcpy DataFrame accessor ``df.tsd.save()`` function.
55 Default is 'pyarrow'.
56 :keyword boolean verbose_logging:
57 Default is True. If False, the standard Logger without
58 Visualization in Form of plots is used.
59 If you use this, the following keyword arguments below will help
60 to further adjust the logging.
61 :keyword boolean show_plot:
62 Default is True. If False, all created plots are not shown during
63 calibration but only stored at the end of the process.
64 :keyword boolean create_tsd_plot:
65 Default is True. If False, the plot of the time series data (goals)
66 is not created and thus shown in during calibration. It therefore is
67 also not stored, even if you set the save_tsd_plot keyword-argument to true.
68 :keyword boolean save_tsd_plot:
69 Default is False. If True, at each iteration the created plot of the
70 time-series is saved. This may make the process much slower
71 :keyword boolean fail_on_error:
72 Default is False. If True, the calibration will stop with an error if
73 the simulation fails. See also: ``ret_val_on_error``
74 :keyword float,np.nan ret_val_on_error:
75 Default is np.nan. If ``fail_on_error`` is false, you can specify here
76 which value to return in the case of a failed simulation. Possible
77 options are np.nan, np.inf or some other high numbers. be aware that this
78 max influence the solver.
79 :keyword dict fixed_parameters:
80 Default is an empty dict. This dict may be used to add certain parameters
81 to the simulation which are not tuned / variable during calibration.
82 Such parameters may be used if the default values in the model don't
83 represent the parameter values you want to use.
84 :keyword boolean apply_penalty:
85 Default is true. Specifies if a penalty function should be applied or not.
86 :keyword boolean penalty_factor:
87 Default is 0. Quantifies the impact of the penalty term on the objective function.
88 The penalty factor is added to the objective function.
89 :keyword boolean recalibration_count:
90 Default is 0. Works as a counter and specifies the current cycle of recalibration.
91 :keyword boolean perform_square_deviation:
92 Default is false.
93 If true the penalty function will evaluate the penalty factor with a quadratic approach.
94 :keyword int max_itercount:
95 Default is Infinity.
96 Maximum number of iterations of calibration.
97 This may be useful to explicitly limit the calibration
98 time.
99 :keyword int max_time":
100 Deault is Infinity.
101 Maximum time in seconds, after which the calibration is stopped. Useful to explicitly limit the calibration time.
102 :keyword str plot_file_type:
103 File ending of created plots.
104 Any supported option in matplotlib, e.g. svg, png, pdf ...
105 Default is png
107 """
109 def __init__(self,
110 working_directory: Union[Path, str],
111 sim_api: SimulationAPI,
112 calibration_class: CalibrationClass,
113 **kwargs):
114 """Instantiate instance attributes"""
115 # %% Kwargs
116 # Initialize supported keywords with default value
117 # Pop the items so they wont be added when calling the
118 # __init__ of the parent class. Always pop with a default value in case
119 # the keyword is not passed.
120 self.verbose_logging = kwargs.pop("verbose_logging", True)
121 self.save_files = kwargs.pop("save_files", False)
122 self.suffix_files = kwargs.pop("suffix_files", "csv")
123 self.parquet_engine = kwargs.pop('parquet_engine', 'pyarrow')
124 self.timedelta = kwargs.pop("timedelta", 0)
125 self.fail_on_error = kwargs.pop("fail_on_error", False)
126 self.ret_val_on_error = kwargs.pop("ret_val_on_error", np.nan)
127 self.fixed_parameters = kwargs.pop("fixed_parameters", {})
128 self.apply_penalty = kwargs.pop("apply_penalty", True)
129 self.penalty_factor = kwargs.pop("penalty_factor", 0)
130 self.recalibration_count = kwargs.pop("recalibration_count", 0)
131 self.perform_square_deviation = kwargs.pop("square_deviation", False)
132 self.result_path = kwargs.pop('result_path', None)
133 self.max_itercount = kwargs.pop('max_itercount', np.inf)
134 self.max_time = kwargs.pop('max_time', np.inf)
135 self.save_current_best_iterate = kwargs.pop('save_current_best_iterate', False)
136 self.at_calibration = True # Boolean to indicate if validating or calibrating
137 # Extract kwargs for the visualizer
138 visualizer_kwargs = {
139 "save_tsd_plot": kwargs.pop("save_tsd_plot", False),
140 "create_tsd_plot": kwargs.pop("create_tsd_plot", True),
141 "show_plot": kwargs.pop("show_plot", True),
142 "show_plot_pause_time": kwargs.pop("show_plot_pause_time", 1e-3),
143 "file_type": kwargs.pop("plot_file_type", "png"),
144 }
146 # Check if types are correct:
147 # Booleans:
148 _bool_kwargs = ["save_files"]
149 for bool_keyword in _bool_kwargs:
150 keyword_value = self.__getattribute__(bool_keyword)
151 if not isinstance(keyword_value, bool):
152 raise TypeError(f"Given {bool_keyword} is of type "
153 f"{type(keyword_value).__name__} but should be type bool")
155 # %% Initialize all public parameters
156 super().__init__(working_directory, **kwargs)
157 # Set sim_api
158 self.sim_api = sim_api
160 if not isinstance(calibration_class, CalibrationClass):
161 raise TypeError(f"calibration_classes is of type {type(calibration_class).__name__} "
162 f"but should be CalibrationClass")
163 self.calibration_class = calibration_class
164 # Scale tuner on boundaries
165 self.x0 = self.tuner_paras.scale(self.tuner_paras.get_initial_values())
166 if self.tuner_paras.bounds is None:
167 self.bounds = None
168 else:
169 # As tuner-parameters are scaled between 0 and 1, the scaled bounds are always 0 and 1
170 self.bounds = [(0, 1) for i in range(len(self.x0))]
171 # Add the values to the simulation setup.
172 self.sim_api.set_sim_setup(
173 {"start_time": self.calibration_class.start_time - self.timedelta,
174 "stop_time": self.calibration_class.stop_time}
175 )
177 # %% Setup the logger
178 # De-register the logger setup in the optimization class:
179 if self.verbose_logging:
180 self.logger = visualizer.CalibrationVisualizer(
181 working_directory=working_directory,
182 name=self.__class__.__name__,
183 calibration_class=self.calibration_class,
184 logger=self.logger,
185 **visualizer_kwargs
186 )
187 else:
188 self.logger = visualizer.CalibrationLogger(
189 working_directory=working_directory,
190 name=self.__class__.__name__,
191 calibration_class=self.calibration_class,
192 logger=self.logger
193 )
195 self.working_directory_of_class = working_directory # Single class does not need an extra folder
197 # Set the output interval according the the given Goals
198 mean_freq = self.goals.get_meas_frequency()
199 self.logger.log("Setting output_interval of simulation according "
200 f"to measurement target data frequency: {mean_freq}")
201 self.sim_api.sim_setup.output_interval = mean_freq
202 self.start_time = time.perf_counter()
204 def _check_for_termination(self):
205 if self._counter >= self.max_itercount:
206 raise MaxIterationsReached(
207 "Terminating calibration as the maximum number "
208 f"of iterations {self.max_itercount} has been reached."
209 )
211 if time.perf_counter() - self.start_time > self.max_time:
212 raise MaxTimeReached(
213 f"Terminating calibration as the maximum time of {self.max_time} s has been "
214 f"reached"
215 )
218 def obj(self, xk, *args, verbose: bool = False):
219 """
220 Default objective function.
221 The usual function will be implemented here:
223 1. Convert the set to modelica-units
224 2. Simulate the converted-set
225 3. Get data as a dataFrame
226 4. Get penalty factor for the penalty function
227 5. Calculate the objective based on statistical values
229 :param np.array xk:
230 Array with normalized values for the minimizer
231 :param int work_id:
232 id for worker in Multiprocessing
233 :param bool verbose:
234 If True, returns the objective value and the unweighted objective dict (for validation).
235 If False, returns only the objective value (for optimization).
236 :return:
237 If verbose == False (default)
238 Objective value based on the used quality measurement
239 If verbose == True
240 Objective value and unweighted objective dict as a tuple
241 :rtype: float or tuple
242 """
243 # Info: This function is called by the optimization framework (scipy, dlib, etc.)
244 # Initialize class objects
245 self._current_iterate = xk
246 self._counter += 1
248 # Convert set if multiple goals of different scales are used
249 xk_descaled = self.tuner_paras.descale(xk)
251 # Set initial values of variable and fixed parameters
252 self.sim_api.result_names = self.goals.get_sim_var_names()
253 initial_names = self.tuner_paras.get_names()
254 parameters = self.fixed_parameters.copy()
255 parameters.update({name: value for name, value in zip(initial_names, xk_descaled.values)})
256 # Simulate
257 # pylint: disable=broad-except
258 try:
259 # Generate the folder name for the calibration
260 if self.save_files:
261 if self.suffix_files == "mat":
262 postprocess_mat_result = empty_postprocessing
263 kwargs_postprocessing = {}
264 else:
265 postprocess_mat_result = convert_mat_to_suffix
266 kwargs_postprocessing = {
267 'variable_names': self.sim_api.result_names,
268 'suffix_files': self.suffix_files,
269 'parquet_engine': self.parquet_engine
270 }
271 savepath_files = os.path.join(self.sim_api.working_directory,
272 f"simulation_{self._counter}")
273 if self.sim_api.__class__.__name__ == "DymolaAPI":
274 self.calibration_class.input_kwargs["postprocess_mat_result"] = postprocess_mat_result
275 self.calibration_class.input_kwargs["kwargs_postprocessing"] = kwargs_postprocessing
276 _filepath = self.sim_api.simulate(
277 parameters=parameters,
278 return_option="savepath",
279 savepath=savepath_files,
280 inputs=self.calibration_class.inputs,
281 **self.calibration_class.input_kwargs
282 )
283 # %% Load results and write to goals object
284 sim_target_data = load_time_series_data(_filepath)
285 else:
286 sim_target_data = self.sim_api.simulate(
287 parameters=parameters,
288 inputs=self.calibration_class.inputs,
289 **self.calibration_class.input_kwargs
290 )
291 except Exception as err:
292 if self.fail_on_error:
293 self.logger.error("Simulation failed. Raising the error.")
294 raise err
295 self.logger.error(
296 f"Simulation failed. Returning '{self.ret_val_on_error}' "
297 f"for the optimization. Error message: {err}"
298 )
299 return self.ret_val_on_error
301 total_res, unweighted_objective = self._kpi_and_logging_calculation(
302 xk_descaled=xk_descaled,
303 counter=self._counter,
304 results=sim_target_data
305 )
306 self._check_for_termination()
308 if verbose:
309 return total_res, unweighted_objective
311 return total_res
313 def mp_obj(self, x, *args):
314 # Initialize list for results
315 num_evals = len(x)
316 total_res_list = np.empty([num_evals, 1])
317 # Set initial values of variable and fixed parameters
318 self.sim_api.result_names = self.goals.get_sim_var_names()
319 initial_names = self.tuner_paras.get_names()
320 parameters = self.fixed_parameters.copy()
322 parameter_list = []
323 xk_descaled_list = []
324 for _xk_single in x:
325 # Convert set if multiple goals of different scales are used
326 xk_descaled = self.tuner_paras.descale(_xk_single)
327 xk_descaled_list.append(xk_descaled)
328 # Update Parameters
329 parameter_copy = parameters.copy()
330 parameter_copy.update(
331 {name: value for name, value in zip(initial_names, xk_descaled.values)})
332 parameter_list.append(parameter_copy)
334 # Simulate
335 if self.save_files:
336 if self.suffix_files == "mat":
337 postprocess_mat_result = empty_postprocessing
338 kwargs_postprocessing = {}
339 else:
340 postprocess_mat_result = convert_mat_to_suffix
341 kwargs_postprocessing = {
342 'variable_names': self.sim_api.result_names,
343 'suffix_files': self.suffix_files,
344 'parquet_engine': self.parquet_engine
345 }
346 result_file_names = [f"simulation_{self._counter + idx}" for idx in
347 range(len(parameter_list))]
348 if self.sim_api.__class__.__name__ == "DymolaAPI":
349 self.calibration_class.input_kwargs["postprocess_mat_result"] = postprocess_mat_result
350 self.calibration_class.input_kwargs["kwargs_postprocessing"] = kwargs_postprocessing
351 _filepaths = self.sim_api.simulate(
352 parameters=parameter_list,
353 return_option="savepath",
354 savepath=self.sim_api.working_directory,
355 result_file_name=result_file_names,
356 fail_on_error=self.fail_on_error,
357 inputs=self.calibration_class.inputs,
358 **self.calibration_class.input_kwargs
359 )
360 # Load results
361 results = []
362 for _filepath in _filepaths:
363 if _filepath is None:
364 results.append(None)
365 else:
366 results.append(load_time_series_data(_filepath))
367 else:
368 results = self.sim_api.simulate(
369 parameters=parameter_list,
370 inputs=self.calibration_class.inputs,
371 fail_on_error=self.fail_on_error,
372 **self.calibration_class.input_kwargs
373 )
375 for idx, result in enumerate(results):
376 self._counter += 1
377 self._current_iterate = result
378 if result is None:
379 total_res_list[idx] = self.ret_val_on_error
380 continue
381 total_res, unweighted_objective = self._kpi_and_logging_calculation(
382 xk_descaled=xk_descaled_list[idx],
383 counter=self._counter,
384 results=result
385 )
386 # Add single objective to objective list of total Population
387 total_res_list[idx] = total_res
388 self._check_for_termination()
390 return total_res_list
392 def _kpi_and_logging_calculation(self, *, xk_descaled, counter, results):
393 """
394 Function to calculate everything needed in the obj or mp_obj
395 function after the simulation finished.
397 """
398 xk = self.tuner_paras.scale(xk_descaled)
400 self.goals.set_sim_target_data(results)
401 # Trim results based on start and end-time of cal class
402 self.goals.set_relevant_time_intervals(self.calibration_class.relevant_intervals)
404 # %% Evaluate the current objective
405 # Penalty function (get penalty factor)
406 if self.recalibration_count > 1 and self.apply_penalty:
407 # There is no benchmark in the first iteration or
408 # first iterations were skipped, so no penalty is applied
409 penaltyfactor = self.get_penalty(xk_descaled, xk)
410 # Evaluate with penalty
411 penalty = penaltyfactor
412 else:
413 # Evaluate without penalty
414 penaltyfactor = 1
415 penalty = None
416 total_res, unweighted_objective = self.goals.eval_difference(
417 verbose=True,
418 penaltyfactor=penaltyfactor
419 )
420 if self.at_calibration: # Only plot if at_calibration
421 self.logger.calibration_callback_func(
422 xk=xk,
423 obj=total_res,
424 verbose_information=unweighted_objective,
425 penalty=penalty
426 )
427 # current best iteration step of current calibration class
428 if total_res < self._current_best_iterate["Objective"]:
429 # self.best_goals = self.goals
430 self._current_best_iterate = {
431 "Iterate": counter,
432 "Objective": total_res,
433 "Unweighted Objective": unweighted_objective,
434 "Parameters": xk_descaled,
435 "Goals": self.goals,
436 # For penalty function and for saving goals as csv
437 "better_current_result": True,
438 # Changed to false in this script after calling function "save_calibration_results"
439 "Penaltyfactor": penalty
440 }
441 if self.save_current_best_iterate:
442 parameter_values = self._get_parameter_dict_from_current_best_iterate()
444 temp_save = {
445 "parameters": parameter_values,
446 "objective": total_res
447 }
448 with open(self.working_directory.joinpath('best_iterate.json'), 'w') as json_file:
449 json.dump(temp_save, json_file, indent=4)
451 return total_res, unweighted_objective
453 def calibrate(self, framework, method=None, **kwargs) -> dict:
454 """
455 Start the calibration process of the calibration classes, visualize and save the results.
457 The arguments of this function are equal to the
458 arguments in Optimizer.optimize(). Look at the docstring
459 in ebcpy to know which options are available.
460 """
461 # %% Start Calibration:
462 self.at_calibration = True
463 self.logger.log(f"Start calibration of model: {self.sim_api.model_name}"
464 f" with framework-class {self.__class__.__name__}")
465 self.logger.log(f"Class: {self.calibration_class.name}, Start and Stop-Time "
466 f"of simulation: {self.calibration_class.start_time}"
467 f"-{self.calibration_class.stop_time} s\n Time-Intervals used"
468 f" for objective: {self.calibration_class.relevant_intervals}")
470 # Setup the visualizer for plotting and logging:
471 self.logger.calibrate_new_class(self.calibration_class,
472 working_directory=self.working_directory_of_class,
473 for_validation=False)
474 self.logger.log_initial_names()
476 # Duration of Calibration
477 t_cal_start = time.time()
479 # Run optimization
480 try:
481 _res = self.optimize(
482 framework=framework,
483 method=method,
484 n_cpu=self.sim_api.n_cpu,
485 **kwargs)
486 except (MaxIterationsReached, MaxTimeReached) as err:
487 self.logger.log(msg=str(err), level=logging.WARNING)
488 t_cal_stop = time.time()
489 t_cal = t_cal_stop - t_cal_start
491 # Check if optimization worked correctly
492 if "Iterate" not in self._current_best_iterate:
493 raise Exception(
494 "Some error during calibration yielded no successful iteration. "
495 "Can't save or return any results."
496 )
498 # %% Save the relevant results.
499 self.logger.save_calibration_result(self._current_best_iterate,
500 self.sim_api.model_name,
501 duration=t_cal,
502 itercount=self.recalibration_count)
503 # Reset
504 self._current_best_iterate['better_current_result'] = False
506 # Save calibrated parameter values in JSON
507 parameter_values = self._get_parameter_dict_from_current_best_iterate()
508 self.save_results(parameter_values=parameter_values,
509 filename=self.calibration_class.name)
510 return parameter_values
512 def _get_parameter_dict_from_current_best_iterate(self) -> dict:
513 """
514 Get the parameter dictionary from the current best iterate.
515 """
516 parameter_values = {}
517 for p_name in self._current_best_iterate['Parameters'].index:
518 parameter_values[p_name] = self._current_best_iterate['Parameters'][p_name]
519 return parameter_values
521 @property
522 def calibration_class(self) -> CalibrationClass:
523 """Get the current calibration class"""
524 return self._cal_class
526 @calibration_class.setter
527 def calibration_class(self, calibration_class: CalibrationClass):
528 """Set the current calibration class"""
529 self.sim_api.set_sim_setup(
530 {"start_time": self._apply_start_time_method(start_time=calibration_class.start_time),
531 "stop_time": calibration_class.stop_time}
532 )
533 self._cal_class = calibration_class
535 @property
536 def tuner_paras(self) -> TunerParas:
537 """Get the current tuner parameters of the calibration class"""
538 return self.calibration_class.tuner_paras
540 @tuner_paras.setter
541 def tuner_paras(self, tuner_paras: TunerParas):
542 """Set the current tuner parameters of the calibration class"""
543 self.calibration_class.tuner_paras = tuner_paras
545 @property
546 def goals(self) -> Goals:
547 """Get the current goals of the calibration class"""
548 return self.calibration_class.goals
550 @goals.setter
551 def goals(self, goals: Goals):
552 """Set the current goals of the calibration class"""
553 self.calibration_class.goals = goals
555 @property
556 def fixed_parameters(self) -> dict:
557 """Get the currently fixed parameters during calibration"""
558 return self._fixed_pars
560 @fixed_parameters.setter
561 def fixed_parameters(self, fixed_parameters: dict):
562 """Set the currently fixed parameters during calibration"""
563 self._fixed_pars = fixed_parameters
565 def save_results(self, parameter_values: dict, filename: str):
566 """Saves the given dict into a file with path
567 self.result_path and name filename."""
568 if self.result_path is not None:
569 os.makedirs(self.result_path, exist_ok=True)
570 s_path = os.path.join(self.result_path, f'{filename}.json')
571 with open(s_path, 'w') as json_file:
572 json.dump(parameter_values, json_file, indent=4)
574 def validate(self, validation_class: CalibrationClass, calibration_result: Dict, verbose=False):
575 """
576 Validate the given calibration class based on the given
577 values for tuner_parameters.
579 :param CalibrationClass validation_class:
580 The class to validate on
581 :param dict calibration_result:
582 The calibration result to apply to the validation class on.
583 """
584 # Start Validation:
585 self.at_calibration = False
586 self.logger.log(f"Start validation of model: {self.sim_api.model_name} with "
587 f"framework-class {self.__class__.__name__}")
588 # Use start-time of calibration class
589 self.calibration_class = validation_class
590 start_time = self._apply_start_time_method(
591 start_time=self.calibration_class.start_time
592 )
593 old_tuner_paras = copy(self.calibration_class.tuner_paras)
594 tuner_values = list(calibration_result.values())
595 self.calibration_class.tuner_paras = TunerParas(
596 names=list(calibration_result.keys()),
597 initial_values=tuner_values,
598 # Dummy bounds as they are scaled anyway
599 bounds=[(val - 1, val + 1) for val in tuner_values]
600 )
602 # Set the start-time for the simulation
603 self.sim_api.sim_setup.start_time = start_time
605 self.logger.calibrate_new_class(self.calibration_class,
606 working_directory=self.working_directory_of_class,
607 for_validation=True)
609 # Use the results parameter vector to simulate again.
610 self._counter = 0 # Reset to one
611 # Scale the tuner parameters
612 xk = self.tuner_paras.scale(tuner_values)
613 # Evaluate objective
614 obj, unweighted_objective = self.obj(xk=xk, verbose=True)
615 self.logger.validation_callback_func(
616 obj=obj
617 )
618 # Reset tuner_parameters to avoid unwanted behaviour
619 self.calibration_class.tuner_paras = old_tuner_paras
620 if verbose:
621 weights = [1]
622 objectives = [obj]
623 goals = ['all']
624 for goal, val in unweighted_objective.items():
625 weights.append(val[0])
626 objectives.append(val[1])
627 goals.append(goal)
628 index = pd.MultiIndex.from_product(
629 [[validation_class.name], goals],
630 names=['Class', 'Goal']
631 )
632 obj_verbos = pd.DataFrame(
633 {'weight': weights, validation_class.goals.statistical_measure: objectives},
634 index=index
635 )
636 return obj_verbos
637 return obj
639 def _handle_error(self, error):
640 """
641 Also save the plots if an error occurs.
642 See ebcpy.optimization.Optimizer._handle_error for more info.
643 """
644 # This error is our own, we handle it in the calibrate() function
645 if isinstance(error, (MaxIterationsReached, MaxTimeReached)):
646 raise error
647 self.logger.save_calibration_result(best_iterate=self._current_best_iterate,
648 model_name=self.sim_api.model_name,
649 duration=0,
650 itercount=0)
651 super()._handle_error(error)
653 def get_penalty(self, current_tuners, current_tuners_scaled):
654 """
655 Get penalty factor for evaluation of current objective. The penaltyfactor
656 considers deviations of the tuner parameters in the objective function.
657 First the relative deviation between the current best values
658 of the tuner parameters from the recalibration steps and
659 the tuner parameters obtained in the current iteration step is determined.
660 Then the penaltyfactor is being increased according to the relative deviation.
662 :param pd.series current_tuner_values:
663 To add
664 :return: float penalty
665 Penaltyfactor for evaluation.
666 """
667 # TO-DO: Add possibility to consider the sensitivity of tuner parameters
669 # Get lists of tuner values (latest best (with all other tuners) & current values)
670 previous = self.sim_api.all_tuners_dict
671 previous_scaled = self.sim_api.all_tuners_dict_scaled
672 # previous_scaled = list(self.sim_api.all_tuners_dict.keys())
673 current = current_tuners
674 current_scaled = dict(current_tuners_scaled)
676 # Apply penalty function
677 penalty = 1
678 for key, value in current_scaled.items():
679 # Add corresponding function for penaltyfactor here
680 if self.perform_square_deviation:
681 # Apply quadratic deviation
682 dev_square = (value - previous_scaled[key]) ** 2
683 penalty += self.penalty_factor * dev_square
684 else:
685 # Apply relative deviation
686 # Ingore tuner parameter whose current best value is 0
687 if previous[key] == 0:
688 continue
689 # Get relative deviation of tuner values (reference: previous)
690 try:
691 dev = abs(current[key] - previous[key]) / abs(previous[key])
692 penalty += self.penalty_factor * dev
693 except ZeroDivisionError:
694 pass
696 return penalty
698 def _apply_start_time_method(self, start_time):
699 """
700 Method to be calculate the start_time based on the used
701 timedelta method.
703 :param float start_time:
704 Start time which was specified by the user in the TOML file.
705 :return float start_time - self.timedelta:
706 Calculated "timedelta", if specified in the TOML file.
707 """
708 return start_time - self.timedelta