Coverage for aixcalibuha/calibration/multi_class_calibrator.py: 89%
100 statements
« prev ^ index » next coverage.py v7.4.4, created at 2026-04-30 14:23 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2026-04-30 14:23 +0000
1"""
2Module containing a class for
3calibrating multiple calibration classes at once.
4"""
6import os
7from pathlib import Path
8from typing import List, Union
9import numpy as np
10from ebcpy.simulationapi import SimulationAPI
12from aixcalibuha import CalibrationClass, data_types
13from aixcalibuha.calibration import Calibrator
16class MultipleClassCalibrator(Calibrator):
17 r"""
18 Class for calibration of multiple calibration classes.
19 When passing multiple classes of the same name, all names
20 are merged into one class with so called relevant time intervals.
21 These time intervals are used for the evaluation of the objective
22 function. Please have a look at the file in docs\img\typeOfContinouusCalibration.pdf
23 for a better understanding on how this class works.
25 :param str start_time_method:
26 Default is 'fixstart'. Method you want to use to
27 specify the start time of your simulation. If 'fixstart' is selected,
28 the keyword argument fixstart is used for all classes (Default is 0).
29 If 'timedelta' is used, the keyword argument timedelta specifies the
30 time being subtracted from each start time of each calibration class.
31 Please have a look at the file in docs\img\typeOfContinouusCalibration.pdf
32 for a better visualization.
33 :param str calibration_strategy:
34 Default is 'parallel'. Strategy you want to use for multi-class calibration.
35 If 'parallel' is used, parameters will be calibrated on the respective time intervals
36 independently. If 'sequential' is used, the order of the calibration classes matters:
37 The resulting parameter values of one class will be used as starting values for calibration
38 on the next class.
39 :keyword float fix_start_time:
40 Value for the fix start time if start_time_method="fixstart". Default is zero.
41 :keyword float timedelta:
42 Value for timedelta if start_time_method="timedelta". Default is zero.
43 :keyword str merge_multiple_classes:
44 Default True. If False, the given list of calibration-classes
45 is handeled as-is. This means if you pass two CalibrationClass objects
46 with the same name (e.g. "device on"), the calibration process will run
47 for both these classes stand-alone.
48 This will automatically yield an intersection of tuner-parameters, however may
49 have advantages in some cases.
50 """
52 # Default value for the reference time is zero
53 fix_start_time = 0
54 merge_multiple_classes = True
56 def __init__(self,
57 working_directory: Union[Path, str],
58 sim_api: SimulationAPI,
59 calibration_classes: List[CalibrationClass],
60 start_time_method: str = 'fixstart',
61 calibration_strategy: str = 'parallel',
62 **kwargs):
63 # Check if input is correct
64 if not isinstance(calibration_classes, list):
65 raise TypeError("calibration_classes is of type "
66 "%s but should be list" % type(calibration_classes).__name__)
68 for cal_class in calibration_classes:
69 if not isinstance(cal_class, CalibrationClass):
70 raise TypeError(f"calibration_classes is of type {type(cal_class).__name__} "
71 f"but should be CalibrationClass")
72 # Pop kwargs of this class (pass parameters and remove from kwarg dict):
73 self.merge_multiple_classes = kwargs.pop("merge_multiple_classes", True)
74 # Apply (if given) the fix_start_time. Check for correct input as-well.
75 self.fix_start_time = kwargs.pop("fix_start_time", 0)
76 self.timedelta = kwargs.pop("timedelta", 0)
78 # Choose the time-method
79 if start_time_method.lower() not in ["fixstart", "timedelta"]:
80 raise ValueError(f"Given start_time_method {start_time_method} is not supported. "
81 "Please choose between 'fixstart' or 'timedelta'")
82 self.start_time_method = start_time_method
84 # Choose the calibration method
85 if calibration_strategy.lower() not in ['parallel', 'sequential']:
86 raise ValueError(f"Given calibration_strategy {calibration_strategy} is not supported. "
87 f"Please choose between 'parallel' or 'sequential'")
88 self.calibration_strategy = calibration_strategy.lower()
90 # Instantiate parent-class
91 super().__init__(working_directory, sim_api, calibration_classes[0], **kwargs)
92 # Merge the multiple calibration_classes
93 if self.merge_multiple_classes:
94 self.calibration_classes = data_types.merge_calibration_classes(calibration_classes)
95 self._cal_history = []
97 def calibrate(self, framework, method=None, **kwargs) -> dict:
98 """
99 Start the calibration process.
101 :return dict self.res_tuner:
102 Dictionary of the optimized tuner parameter names and values.
103 :return dict self._current_best_iterate:
104 Dictionary of the current best results of tuner parameter,
105 iteration step, objective value, information
106 about the goals object and the penaltyfactor.
107 """
108 # First check possible intersection of tuner-parameteres
109 # and warn the user about it
110 all_tuners = []
111 for cal_class in self.calibration_classes:
112 all_tuners.append(cal_class.tuner_paras.get_names())
113 intersection = set(all_tuners[0]).intersection(*all_tuners)
114 if intersection and len(self.calibration_classes) > 1:
115 self.logger.log("The following tuner-parameters intersect over multiple"
116 f" classes:\n{', '.join(list(intersection))}")
118 # Iterate over the different existing classes
119 for cal_class in self.calibration_classes:
120 # %% Working-Directory:
121 # Alter the working directory for saving the simulations-results
122 self.working_directory_of_class = os.path.join(self.working_directory,
123 f"{cal_class.name}_"
124 f"{cal_class.start_time}_"
125 f"{cal_class.stop_time}")
126 self.sim_api.set_working_directory(self.working_directory_of_class)
128 # %% Calibration-Setup
129 # Reset counter for new calibration
130 self._counter = 0
131 # Retrieve already calibrated parameters (i.e. calibrated in the previous classes)
132 already_calibrated_parameters = {}
133 for cal_run in self._cal_history:
134 for par_name in cal_run['res']['Parameters'].index:
135 already_calibrated_parameters[par_name] = cal_run['res']['Parameters'][par_name]
136 # Set fixed names:
137 self.fixed_parameters.update(already_calibrated_parameters)
139 # Reset best iterate for new class
140 self._current_best_iterate = {"Objective": np.inf}
141 self.calibration_class = cal_class
143 # Set initial values
144 initial_values = self.tuner_paras.get_initial_values()
145 for idx, par_name in enumerate(self.tuner_paras.get_names()):
146 if self.calibration_strategy == "sequential":
147 # Use already calibrated values as initial value for new calibration
148 # Delete it from fixed values and retreive the value
149 initial_values[idx] = self.fixed_parameters.pop(par_name,
150 initial_values[idx])
151 else:
152 try:
153 self.fixed_parameters.pop(par_name) # Just delete, don't use the value
154 except KeyError:
155 pass # Faster than checking if is in dict.
157 self.x0 = self.tuner_paras.scale(initial_values)
158 # Either bounds are present or not.
159 # If present, the obj will scale the values to 0 and 1. If not
160 # we have an unconstrained optimization.
161 if self.tuner_paras.bounds is None:
162 self.bounds = None
163 else:
164 self.bounds = [(0, 1) for i in range(len(self.x0))]
166 # %% Execution
167 # Run the single ModelicaCalibration
168 super().calibrate(framework=framework, method=method, **kwargs)
170 # %% Post-processing
171 # Append result to list for future perturbation based on older results.
172 self._cal_history.append({"res": self._current_best_iterate,
173 "cal_class": cal_class})
175 res_tuner = self.check_intersection_of_tuner_parameters()
177 # Save calibrated parameter values in JSON
178 parameter_values = {}
179 for cal_run in self._cal_history:
180 for p_name in cal_run['res']['Parameters'].index:
181 parameter_values[p_name] = cal_run['res']['Parameters'][p_name]
182 for p_name, res_intersection in res_tuner.items():
183 parameter_values[p_name] = res_intersection
184 self.save_results(parameter_values=parameter_values,
185 filename='MultiClassCalibrationResult')
187 return parameter_values
189 def _apply_start_time_method(self, start_time):
190 """
191 Method to be calculate the start_time based on the used
192 start-time-method (timedelta or fix-start).
194 :param float start_time:
195 Start time which was specified by the user in the TOML file.
196 :return float start_time - self.timedelta:
197 Calculated "timedelta", if specified in the TOML file.
198 :return float self.fix_start_time:
199 Fix start time which was specified by the user in the TOML file.
200 """
201 if self.start_time_method == "timedelta":
202 # Check if timedelta does not fall below the
203 # start_time (start_time should not be lower then zero)
204 if start_time - self.timedelta < 0:
205 # pylint: disable=import-outside-toplevel
206 import warnings
207 warnings.warn(
208 'Simulation start time current calibration class \n'
209 ' falls below 0, because of the chosen timedelta. '
210 'The start time will be set to 0 seconds.'
211 )
212 return 0
213 # Using timedelta, _ref_time is subtracted of the given start-time
214 return start_time - self.timedelta
215 else:
216 # With fixed start, the _ref_time parameter is always returned
217 return self.fix_start_time
219 def check_intersection_of_tuner_parameters(self):
220 """
221 Checks intersections between tuner parameters.
223 :return dict res_tuner:
224 Dictionary of the optimized tuner parameter names and values.
225 """
227 # merge all tuners (writes all values from all classes in one dictionary)
228 merged_tuner_parameters = {}
229 for cal_class in self._cal_history:
230 for tuner_name, best_value in cal_class["res"]["Parameters"].items():
231 if (tuner_name in merged_tuner_parameters and
232 best_value not in merged_tuner_parameters[tuner_name]):
233 merged_tuner_parameters[tuner_name].append(best_value)
234 else:
235 merged_tuner_parameters[tuner_name] = [best_value]
237 # Get tuner parameter
238 res_tuner = {}
239 for tuner_para, values in merged_tuner_parameters.items():
240 res_tuner[tuner_para] = values[0]
242 # pop single values, as of no interest
243 intersected_tuners = {}
244 for tuner_para, values in merged_tuner_parameters.items():
245 if len(values) >= 2:
246 intersected_tuners[tuner_para] = values
248 # Handle tuner intersections
249 if intersected_tuners.keys():
250 # Plot or log the information, depending on which logger you are using:
251 self.logger.log_intersection_of_tuners(intersected_tuners,
252 itercount=self.recalibration_count)
254 # Return average value of ALL tuner parameters (not only intersected).
255 # Reason: if there is an intersection of a tuner parameter, but
256 # the results of both calibration classes are exactly the same, there
257 # is no intersection and the affected parameter will not be
258 # delivered to "res_tuner" if one of the other tuners
259 # intersect and "intersected_tuners.keys()" is true.
260 average_tuner_parameter = {}
261 for tuner_para, values in merged_tuner_parameters.items():
262 average_tuner_parameter[tuner_para] = sum(values) / len(values)
264 self.logger.log("The tuner parameters used for evaluation "
265 "are averaged as follows:\n "
266 "{}".format(' ,'.join([f"{tuner}={value}"
267 for tuner, value in
268 average_tuner_parameter.items()])))
270 # Create result-dictionary
271 res_tuner = average_tuner_parameter
273 return res_tuner