Coverage for aixcalibuha/calibration/multi_class_calibrator.py: 89%
98 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-01-27 10:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-01-27 10:48 +0000
1"""
2Module containing a class for
3calibrating multiple calibration classes at once.
4"""
6import os
7from typing import List
8import numpy as np
9from aixcalibuha import CalibrationClass, data_types
10from aixcalibuha.calibration import Calibrator
13class MultipleClassCalibrator(Calibrator):
14 r"""
15 Class for calibration of multiple calibration classes.
16 When passing multiple classes of the same name, all names
17 are merged into one class with so called relevant time intervals.
18 These time intervals are used for the evaluation of the objective
19 function. Please have a look at the file in docs\img\typeOfContinouusCalibration.pdf
20 for a better understanding on how this class works.
22 :param str start_time_method:
23 Default is 'fixstart'. Method you want to use to
24 specify the start time of your simulation. If 'fixstart' is selected,
25 the keyword argument fixstart is used for all classes (Default is 0).
26 If 'timedelta' is used, the keyword argument timedelta specifies the
27 time being subtracted from each start time of each calibration class.
28 Please have a look at the file in docs\img\typeOfContinouusCalibration.pdf
29 for a better visualization.
30 :param str calibration_strategy:
31 Default is 'parallel'. Strategy you want to use for multi-class calibration.
32 If 'parallel' is used, parameters will be calibrated on the respective time intervals
33 independently. If 'sequential' is used, the order of the calibration classes matters:
34 The resulting parameter values of one class will be used as starting values for calibration
35 on the next class.
36 :keyword float fix_start_time:
37 Value for the fix start time if start_time_method="fixstart". Default is zero.
38 :keyword float timedelta:
39 Value for timedelta if start_time_method="timedelta". Default is zero.
40 :keyword str merge_multiple_classes:
41 Default True. If False, the given list of calibration-classes
42 is handeled as-is. This means if you pass two CalibrationClass objects
43 with the same name (e.g. "device on"), the calibration process will run
44 for both these classes stand-alone.
45 This will automatically yield an intersection of tuner-parameters, however may
46 have advantages in some cases.
47 """
49 # Default value for the reference time is zero
50 fix_start_time = 0
51 merge_multiple_classes = True
53 def __init__(self,
54 cd: str,
55 sim_api,
56 calibration_classes: List[CalibrationClass],
57 start_time_method: str = 'fixstart',
58 calibration_strategy: str = 'parallel',
59 **kwargs):
60 # Check if input is correct
61 if not isinstance(calibration_classes, list):
62 raise TypeError("calibration_classes is of type "
63 "%s but should be list" % type(calibration_classes).__name__)
65 for cal_class in calibration_classes:
66 if not isinstance(cal_class, CalibrationClass):
67 raise TypeError(f"calibration_classes is of type {type(cal_class).__name__} "
68 f"but should be CalibrationClass")
69 # Pop kwargs of this class (pass parameters and remove from kwarg dict):
70 self.merge_multiple_classes = kwargs.pop("merge_multiple_classes", True)
71 # Apply (if given) the fix_start_time. Check for correct input as-well.
72 self.fix_start_time = kwargs.pop("fix_start_time", 0)
73 self.timedelta = kwargs.pop("timedelta", 0)
75 # Choose the time-method
76 if start_time_method.lower() not in ["fixstart", "timedelta"]:
77 raise ValueError(f"Given start_time_method {start_time_method} is not supported. "
78 "Please choose between 'fixstart' or 'timedelta'")
79 self.start_time_method = start_time_method
81 # Choose the calibration method
82 if calibration_strategy.lower() not in ['parallel', 'sequential']:
83 raise ValueError(f"Given calibration_strategy {calibration_strategy} is not supported. "
84 f"Please choose between 'parallel' or 'sequential'")
85 self.calibration_strategy = calibration_strategy.lower()
87 # Instantiate parent-class
88 super().__init__(cd, sim_api, calibration_classes[0], **kwargs)
89 # Merge the multiple calibration_classes
90 if self.merge_multiple_classes:
91 self.calibration_classes = data_types.merge_calibration_classes(calibration_classes)
92 self._cal_history = []
94 def calibrate(self, framework, method=None, **kwargs) -> dict:
95 """
96 Start the calibration process.
98 :return dict self.res_tuner:
99 Dictionary of the optimized tuner parameter names and values.
100 :return dict self._current_best_iterate:
101 Dictionary of the current best results of tuner parameter,
102 iteration step, objective value, information
103 about the goals object and the penaltyfactor.
104 """
105 # First check possible intersection of tuner-parameteres
106 # and warn the user about it
107 all_tuners = []
108 for cal_class in self.calibration_classes:
109 all_tuners.append(cal_class.tuner_paras.get_names())
110 intersection = set(all_tuners[0]).intersection(*all_tuners)
111 if intersection and len(self.calibration_classes) > 1:
112 self.logger.log("The following tuner-parameters intersect over multiple"
113 f" classes:\n{', '.join(list(intersection))}")
115 # Iterate over the different existing classes
116 for cal_class in self.calibration_classes:
117 #%% Working-Directory:
118 # Alter the working directory for saving the simulations-results
119 self.cd_of_class = os.path.join(self.cd,
120 f"{cal_class.name}_"
121 f"{cal_class.start_time}_"
122 f"{cal_class.stop_time}")
123 self.sim_api.set_cd(self.cd_of_class)
125 #%% Calibration-Setup
126 # Reset counter for new calibration
127 self._counter = 0
128 # Retrieve already calibrated parameters (i.e. calibrated in the previous classes)
129 already_calibrated_parameters = {}
130 for cal_run in self._cal_history:
131 for par_name in cal_run['res']['Parameters'].index:
132 already_calibrated_parameters[par_name] = cal_run['res']['Parameters'][par_name]
133 # Set fixed names:
134 self.fixed_parameters.update(already_calibrated_parameters)
136 # Reset best iterate for new class
137 self._current_best_iterate = {"Objective": np.inf}
138 self.calibration_class = cal_class
140 # Set initial values
141 initial_values = self.tuner_paras.get_initial_values()
142 for idx, par_name in enumerate(self.tuner_paras.get_names()):
143 if self.calibration_strategy == "sequential":
144 # Use already calibrated values as initial value for new calibration
145 # Delete it from fixed values and retreive the value
146 initial_values[idx] = self.fixed_parameters.pop(par_name,
147 initial_values[idx])
148 else:
149 try:
150 self.fixed_parameters.pop(par_name) # Just delete, don't use the value
151 except KeyError:
152 pass # Faster than checking if is in dict.
154 self.x0 = self.tuner_paras.scale(initial_values)
155 # Either bounds are present or not.
156 # If present, the obj will scale the values to 0 and 1. If not
157 # we have an unconstrained optimization.
158 if self.tuner_paras.bounds is None:
159 self.bounds = None
160 else:
161 self.bounds = [(0, 1) for i in range(len(self.x0))]
163 #%% Execution
164 # Run the single ModelicaCalibration
165 super().calibrate(framework=framework, method=method, **kwargs)
167 #%% Post-processing
168 # Append result to list for future perturbation based on older results.
169 self._cal_history.append({"res": self._current_best_iterate,
170 "cal_class": cal_class})
172 res_tuner = self.check_intersection_of_tuner_parameters()
174 # Save calibrated parameter values in JSON
175 parameter_values = {}
176 for cal_run in self._cal_history:
177 for p_name in cal_run['res']['Parameters'].index:
178 parameter_values[p_name] = cal_run['res']['Parameters'][p_name]
179 for p_name, res_intersection in res_tuner.items():
180 parameter_values[p_name] = res_intersection
181 self.save_results(parameter_values=parameter_values,
182 filename='MultiClassCalibrationResult')
184 return parameter_values
186 def _apply_start_time_method(self, start_time):
187 """
188 Method to be calculate the start_time based on the used
189 start-time-method (timedelta or fix-start).
191 :param float start_time:
192 Start time which was specified by the user in the TOML file.
193 :return float start_time - self.timedelta:
194 Calculated "timedelta", if specified in the TOML file.
195 :return float self.fix_start_time:
196 Fix start time which was specified by the user in the TOML file.
197 """
198 if self.start_time_method == "timedelta":
199 # Check if timedelta does not fall below the
200 # start_time (start_time should not be lower then zero)
201 if start_time - self.timedelta < 0:
202 # pylint: disable=import-outside-toplevel
203 import warnings
204 warnings.warn(
205 'Simulation start time current calibration class \n'
206 ' falls below 0, because of the chosen timedelta. '
207 'The start time will be set to 0 seconds.'
208 )
209 return 0
210 # Using timedelta, _ref_time is subtracted of the given start-time
211 return start_time - self.timedelta
212 else:
213 # With fixed start, the _ref_time parameter is always returned
214 return self.fix_start_time
216 def check_intersection_of_tuner_parameters(self):
217 """
218 Checks intersections between tuner parameters.
220 :return dict res_tuner:
221 Dictionary of the optimized tuner parameter names and values.
222 """
224 # merge all tuners (writes all values from all classes in one dictionary)
225 merged_tuner_parameters = {}
226 for cal_class in self._cal_history:
227 for tuner_name, best_value in cal_class["res"]["Parameters"].items():
228 if (tuner_name in merged_tuner_parameters and
229 best_value not in merged_tuner_parameters[tuner_name]):
230 merged_tuner_parameters[tuner_name].append(best_value)
231 else:
232 merged_tuner_parameters[tuner_name] = [best_value]
234 # Get tuner parameter
235 res_tuner = {}
236 for tuner_para, values in merged_tuner_parameters.items():
237 res_tuner[tuner_para] = values[0]
239 # pop single values, as of no interest
240 intersected_tuners = {}
241 for tuner_para, values in merged_tuner_parameters.items():
242 if len(values) >= 2:
243 intersected_tuners[tuner_para] = values
245 # Handle tuner intersections
246 if intersected_tuners.keys():
247 # Plot or log the information, depending on which logger you are using:
248 self.logger.log_intersection_of_tuners(intersected_tuners,
249 itercount=self.recalibration_count)
251 # Return average value of ALL tuner parameters (not only intersected).
252 # Reason: if there is an intersection of a tuner parameter, but
253 # the results of both calibration classes are exactly the same, there
254 # is no intersection and the affected parameter will not be
255 # delivered to "res_tuner" if one of the other tuners
256 # intersect and "intersected_tuners.keys()" is true.
257 average_tuner_parameter = {}
258 for tuner_para, values in merged_tuner_parameters.items():
259 average_tuner_parameter[tuner_para] = sum(values) / len(values)
261 self.logger.log("The tuner parameters used for evaluation "
262 "are averaged as follows:\n "
263 "{}".format(' ,'.join([f"{tuner}={value}"
264 for tuner, value in average_tuner_parameter.items()])))
266 # Create result-dictionary
267 res_tuner = average_tuner_parameter
269 return res_tuner