Coverage for aixcalibuha/calibration/multi_class_calibrator.py: 89%

100 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2026-04-30 14:23 +0000

1""" 

2Module containing a class for 

3calibrating multiple calibration classes at once. 

4""" 

5 

6import os 

7from pathlib import Path 

8from typing import List, Union 

9import numpy as np 

10from ebcpy.simulationapi import SimulationAPI 

11 

12from aixcalibuha import CalibrationClass, data_types 

13from aixcalibuha.calibration import Calibrator 

14 

15 

16class MultipleClassCalibrator(Calibrator): 

17 r""" 

18 Class for calibration of multiple calibration classes. 

19 When passing multiple classes of the same name, all names 

20 are merged into one class with so called relevant time intervals. 

21 These time intervals are used for the evaluation of the objective 

22 function. Please have a look at the file in docs\img\typeOfContinouusCalibration.pdf 

23 for a better understanding on how this class works. 

24 

25 :param str start_time_method: 

26 Default is 'fixstart'. Method you want to use to 

27 specify the start time of your simulation. If 'fixstart' is selected, 

28 the keyword argument fixstart is used for all classes (Default is 0). 

29 If 'timedelta' is used, the keyword argument timedelta specifies the 

30 time being subtracted from each start time of each calibration class. 

31 Please have a look at the file in docs\img\typeOfContinouusCalibration.pdf 

32 for a better visualization. 

33 :param str calibration_strategy: 

34 Default is 'parallel'. Strategy you want to use for multi-class calibration. 

35 If 'parallel' is used, parameters will be calibrated on the respective time intervals 

36 independently. If 'sequential' is used, the order of the calibration classes matters: 

37 The resulting parameter values of one class will be used as starting values for calibration 

38 on the next class. 

39 :keyword float fix_start_time: 

40 Value for the fix start time if start_time_method="fixstart". Default is zero. 

41 :keyword float timedelta: 

42 Value for timedelta if start_time_method="timedelta". Default is zero. 

43 :keyword str merge_multiple_classes: 

44 Default True. If False, the given list of calibration-classes 

45 is handeled as-is. This means if you pass two CalibrationClass objects 

46 with the same name (e.g. "device on"), the calibration process will run 

47 for both these classes stand-alone. 

48 This will automatically yield an intersection of tuner-parameters, however may 

49 have advantages in some cases. 

50 """ 

51 

52 # Default value for the reference time is zero 

53 fix_start_time = 0 

54 merge_multiple_classes = True 

55 

56 def __init__(self, 

57 working_directory: Union[Path, str], 

58 sim_api: SimulationAPI, 

59 calibration_classes: List[CalibrationClass], 

60 start_time_method: str = 'fixstart', 

61 calibration_strategy: str = 'parallel', 

62 **kwargs): 

63 # Check if input is correct 

64 if not isinstance(calibration_classes, list): 

65 raise TypeError("calibration_classes is of type " 

66 "%s but should be list" % type(calibration_classes).__name__) 

67 

68 for cal_class in calibration_classes: 

69 if not isinstance(cal_class, CalibrationClass): 

70 raise TypeError(f"calibration_classes is of type {type(cal_class).__name__} " 

71 f"but should be CalibrationClass") 

72 # Pop kwargs of this class (pass parameters and remove from kwarg dict): 

73 self.merge_multiple_classes = kwargs.pop("merge_multiple_classes", True) 

74 # Apply (if given) the fix_start_time. Check for correct input as-well. 

75 self.fix_start_time = kwargs.pop("fix_start_time", 0) 

76 self.timedelta = kwargs.pop("timedelta", 0) 

77 

78 # Choose the time-method 

79 if start_time_method.lower() not in ["fixstart", "timedelta"]: 

80 raise ValueError(f"Given start_time_method {start_time_method} is not supported. " 

81 "Please choose between 'fixstart' or 'timedelta'") 

82 self.start_time_method = start_time_method 

83 

84 # Choose the calibration method 

85 if calibration_strategy.lower() not in ['parallel', 'sequential']: 

86 raise ValueError(f"Given calibration_strategy {calibration_strategy} is not supported. " 

87 f"Please choose between 'parallel' or 'sequential'") 

88 self.calibration_strategy = calibration_strategy.lower() 

89 

90 # Instantiate parent-class 

91 super().__init__(working_directory, sim_api, calibration_classes[0], **kwargs) 

92 # Merge the multiple calibration_classes 

93 if self.merge_multiple_classes: 

94 self.calibration_classes = data_types.merge_calibration_classes(calibration_classes) 

95 self._cal_history = [] 

96 

97 def calibrate(self, framework, method=None, **kwargs) -> dict: 

98 """ 

99 Start the calibration process. 

100 

101 :return dict self.res_tuner: 

102 Dictionary of the optimized tuner parameter names and values. 

103 :return dict self._current_best_iterate: 

104 Dictionary of the current best results of tuner parameter, 

105 iteration step, objective value, information 

106 about the goals object and the penaltyfactor. 

107 """ 

108 # First check possible intersection of tuner-parameteres 

109 # and warn the user about it 

110 all_tuners = [] 

111 for cal_class in self.calibration_classes: 

112 all_tuners.append(cal_class.tuner_paras.get_names()) 

113 intersection = set(all_tuners[0]).intersection(*all_tuners) 

114 if intersection and len(self.calibration_classes) > 1: 

115 self.logger.log("The following tuner-parameters intersect over multiple" 

116 f" classes:\n{', '.join(list(intersection))}") 

117 

118 # Iterate over the different existing classes 

119 for cal_class in self.calibration_classes: 

120 # %% Working-Directory: 

121 # Alter the working directory for saving the simulations-results 

122 self.working_directory_of_class = os.path.join(self.working_directory, 

123 f"{cal_class.name}_" 

124 f"{cal_class.start_time}_" 

125 f"{cal_class.stop_time}") 

126 self.sim_api.set_working_directory(self.working_directory_of_class) 

127 

128 # %% Calibration-Setup 

129 # Reset counter for new calibration 

130 self._counter = 0 

131 # Retrieve already calibrated parameters (i.e. calibrated in the previous classes) 

132 already_calibrated_parameters = {} 

133 for cal_run in self._cal_history: 

134 for par_name in cal_run['res']['Parameters'].index: 

135 already_calibrated_parameters[par_name] = cal_run['res']['Parameters'][par_name] 

136 # Set fixed names: 

137 self.fixed_parameters.update(already_calibrated_parameters) 

138 

139 # Reset best iterate for new class 

140 self._current_best_iterate = {"Objective": np.inf} 

141 self.calibration_class = cal_class 

142 

143 # Set initial values 

144 initial_values = self.tuner_paras.get_initial_values() 

145 for idx, par_name in enumerate(self.tuner_paras.get_names()): 

146 if self.calibration_strategy == "sequential": 

147 # Use already calibrated values as initial value for new calibration 

148 # Delete it from fixed values and retreive the value 

149 initial_values[idx] = self.fixed_parameters.pop(par_name, 

150 initial_values[idx]) 

151 else: 

152 try: 

153 self.fixed_parameters.pop(par_name) # Just delete, don't use the value 

154 except KeyError: 

155 pass # Faster than checking if is in dict. 

156 

157 self.x0 = self.tuner_paras.scale(initial_values) 

158 # Either bounds are present or not. 

159 # If present, the obj will scale the values to 0 and 1. If not 

160 # we have an unconstrained optimization. 

161 if self.tuner_paras.bounds is None: 

162 self.bounds = None 

163 else: 

164 self.bounds = [(0, 1) for i in range(len(self.x0))] 

165 

166 # %% Execution 

167 # Run the single ModelicaCalibration 

168 super().calibrate(framework=framework, method=method, **kwargs) 

169 

170 # %% Post-processing 

171 # Append result to list for future perturbation based on older results. 

172 self._cal_history.append({"res": self._current_best_iterate, 

173 "cal_class": cal_class}) 

174 

175 res_tuner = self.check_intersection_of_tuner_parameters() 

176 

177 # Save calibrated parameter values in JSON 

178 parameter_values = {} 

179 for cal_run in self._cal_history: 

180 for p_name in cal_run['res']['Parameters'].index: 

181 parameter_values[p_name] = cal_run['res']['Parameters'][p_name] 

182 for p_name, res_intersection in res_tuner.items(): 

183 parameter_values[p_name] = res_intersection 

184 self.save_results(parameter_values=parameter_values, 

185 filename='MultiClassCalibrationResult') 

186 

187 return parameter_values 

188 

189 def _apply_start_time_method(self, start_time): 

190 """ 

191 Method to be calculate the start_time based on the used 

192 start-time-method (timedelta or fix-start). 

193 

194 :param float start_time: 

195 Start time which was specified by the user in the TOML file. 

196 :return float start_time - self.timedelta: 

197 Calculated "timedelta", if specified in the TOML file. 

198 :return float self.fix_start_time: 

199 Fix start time which was specified by the user in the TOML file. 

200 """ 

201 if self.start_time_method == "timedelta": 

202 # Check if timedelta does not fall below the 

203 # start_time (start_time should not be lower then zero) 

204 if start_time - self.timedelta < 0: 

205 # pylint: disable=import-outside-toplevel 

206 import warnings 

207 warnings.warn( 

208 'Simulation start time current calibration class \n' 

209 ' falls below 0, because of the chosen timedelta. ' 

210 'The start time will be set to 0 seconds.' 

211 ) 

212 return 0 

213 # Using timedelta, _ref_time is subtracted of the given start-time 

214 return start_time - self.timedelta 

215 else: 

216 # With fixed start, the _ref_time parameter is always returned 

217 return self.fix_start_time 

218 

219 def check_intersection_of_tuner_parameters(self): 

220 """ 

221 Checks intersections between tuner parameters. 

222 

223 :return dict res_tuner: 

224 Dictionary of the optimized tuner parameter names and values. 

225 """ 

226 

227 # merge all tuners (writes all values from all classes in one dictionary) 

228 merged_tuner_parameters = {} 

229 for cal_class in self._cal_history: 

230 for tuner_name, best_value in cal_class["res"]["Parameters"].items(): 

231 if (tuner_name in merged_tuner_parameters and 

232 best_value not in merged_tuner_parameters[tuner_name]): 

233 merged_tuner_parameters[tuner_name].append(best_value) 

234 else: 

235 merged_tuner_parameters[tuner_name] = [best_value] 

236 

237 # Get tuner parameter 

238 res_tuner = {} 

239 for tuner_para, values in merged_tuner_parameters.items(): 

240 res_tuner[tuner_para] = values[0] 

241 

242 # pop single values, as of no interest 

243 intersected_tuners = {} 

244 for tuner_para, values in merged_tuner_parameters.items(): 

245 if len(values) >= 2: 

246 intersected_tuners[tuner_para] = values 

247 

248 # Handle tuner intersections 

249 if intersected_tuners.keys(): 

250 # Plot or log the information, depending on which logger you are using: 

251 self.logger.log_intersection_of_tuners(intersected_tuners, 

252 itercount=self.recalibration_count) 

253 

254 # Return average value of ALL tuner parameters (not only intersected). 

255 # Reason: if there is an intersection of a tuner parameter, but 

256 # the results of both calibration classes are exactly the same, there 

257 # is no intersection and the affected parameter will not be 

258 # delivered to "res_tuner" if one of the other tuners 

259 # intersect and "intersected_tuners.keys()" is true. 

260 average_tuner_parameter = {} 

261 for tuner_para, values in merged_tuner_parameters.items(): 

262 average_tuner_parameter[tuner_para] = sum(values) / len(values) 

263 

264 self.logger.log("The tuner parameters used for evaluation " 

265 "are averaged as follows:\n " 

266 "{}".format(' ,'.join([f"{tuner}={value}" 

267 for tuner, value in 

268 average_tuner_parameter.items()]))) 

269 

270 # Create result-dictionary 

271 res_tuner = average_tuner_parameter 

272 

273 return res_tuner