Coverage for aixcalibuha/calibration/multi_class_calibrator.py: 89%

98 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-01-27 10:48 +0000

1""" 

2Module containing a class for 

3calibrating multiple calibration classes at once. 

4""" 

5 

6import os 

7from typing import List 

8import numpy as np 

9from aixcalibuha import CalibrationClass, data_types 

10from aixcalibuha.calibration import Calibrator 

11 

12 

13class MultipleClassCalibrator(Calibrator): 

14 r""" 

15 Class for calibration of multiple calibration classes. 

16 When passing multiple classes of the same name, all names 

17 are merged into one class with so called relevant time intervals. 

18 These time intervals are used for the evaluation of the objective 

19 function. Please have a look at the file in docs\img\typeOfContinouusCalibration.pdf 

20 for a better understanding on how this class works. 

21 

22 :param str start_time_method: 

23 Default is 'fixstart'. Method you want to use to 

24 specify the start time of your simulation. If 'fixstart' is selected, 

25 the keyword argument fixstart is used for all classes (Default is 0). 

26 If 'timedelta' is used, the keyword argument timedelta specifies the 

27 time being subtracted from each start time of each calibration class. 

28 Please have a look at the file in docs\img\typeOfContinouusCalibration.pdf 

29 for a better visualization. 

30 :param str calibration_strategy: 

31 Default is 'parallel'. Strategy you want to use for multi-class calibration. 

32 If 'parallel' is used, parameters will be calibrated on the respective time intervals 

33 independently. If 'sequential' is used, the order of the calibration classes matters: 

34 The resulting parameter values of one class will be used as starting values for calibration 

35 on the next class. 

36 :keyword float fix_start_time: 

37 Value for the fix start time if start_time_method="fixstart". Default is zero. 

38 :keyword float timedelta: 

39 Value for timedelta if start_time_method="timedelta". Default is zero. 

40 :keyword str merge_multiple_classes: 

41 Default True. If False, the given list of calibration-classes 

42 is handeled as-is. This means if you pass two CalibrationClass objects 

43 with the same name (e.g. "device on"), the calibration process will run 

44 for both these classes stand-alone. 

45 This will automatically yield an intersection of tuner-parameters, however may 

46 have advantages in some cases. 

47 """ 

48 

49 # Default value for the reference time is zero 

50 fix_start_time = 0 

51 merge_multiple_classes = True 

52 

53 def __init__(self, 

54 cd: str, 

55 sim_api, 

56 calibration_classes: List[CalibrationClass], 

57 start_time_method: str = 'fixstart', 

58 calibration_strategy: str = 'parallel', 

59 **kwargs): 

60 # Check if input is correct 

61 if not isinstance(calibration_classes, list): 

62 raise TypeError("calibration_classes is of type " 

63 "%s but should be list" % type(calibration_classes).__name__) 

64 

65 for cal_class in calibration_classes: 

66 if not isinstance(cal_class, CalibrationClass): 

67 raise TypeError(f"calibration_classes is of type {type(cal_class).__name__} " 

68 f"but should be CalibrationClass") 

69 # Pop kwargs of this class (pass parameters and remove from kwarg dict): 

70 self.merge_multiple_classes = kwargs.pop("merge_multiple_classes", True) 

71 # Apply (if given) the fix_start_time. Check for correct input as-well. 

72 self.fix_start_time = kwargs.pop("fix_start_time", 0) 

73 self.timedelta = kwargs.pop("timedelta", 0) 

74 

75 # Choose the time-method 

76 if start_time_method.lower() not in ["fixstart", "timedelta"]: 

77 raise ValueError(f"Given start_time_method {start_time_method} is not supported. " 

78 "Please choose between 'fixstart' or 'timedelta'") 

79 self.start_time_method = start_time_method 

80 

81 # Choose the calibration method 

82 if calibration_strategy.lower() not in ['parallel', 'sequential']: 

83 raise ValueError(f"Given calibration_strategy {calibration_strategy} is not supported. " 

84 f"Please choose between 'parallel' or 'sequential'") 

85 self.calibration_strategy = calibration_strategy.lower() 

86 

87 # Instantiate parent-class 

88 super().__init__(cd, sim_api, calibration_classes[0], **kwargs) 

89 # Merge the multiple calibration_classes 

90 if self.merge_multiple_classes: 

91 self.calibration_classes = data_types.merge_calibration_classes(calibration_classes) 

92 self._cal_history = [] 

93 

94 def calibrate(self, framework, method=None, **kwargs) -> dict: 

95 """ 

96 Start the calibration process. 

97 

98 :return dict self.res_tuner: 

99 Dictionary of the optimized tuner parameter names and values. 

100 :return dict self._current_best_iterate: 

101 Dictionary of the current best results of tuner parameter, 

102 iteration step, objective value, information 

103 about the goals object and the penaltyfactor. 

104 """ 

105 # First check possible intersection of tuner-parameteres 

106 # and warn the user about it 

107 all_tuners = [] 

108 for cal_class in self.calibration_classes: 

109 all_tuners.append(cal_class.tuner_paras.get_names()) 

110 intersection = set(all_tuners[0]).intersection(*all_tuners) 

111 if intersection and len(self.calibration_classes) > 1: 

112 self.logger.log("The following tuner-parameters intersect over multiple" 

113 f" classes:\n{', '.join(list(intersection))}") 

114 

115 # Iterate over the different existing classes 

116 for cal_class in self.calibration_classes: 

117 #%% Working-Directory: 

118 # Alter the working directory for saving the simulations-results 

119 self.cd_of_class = os.path.join(self.cd, 

120 f"{cal_class.name}_" 

121 f"{cal_class.start_time}_" 

122 f"{cal_class.stop_time}") 

123 self.sim_api.set_cd(self.cd_of_class) 

124 

125 #%% Calibration-Setup 

126 # Reset counter for new calibration 

127 self._counter = 0 

128 # Retrieve already calibrated parameters (i.e. calibrated in the previous classes) 

129 already_calibrated_parameters = {} 

130 for cal_run in self._cal_history: 

131 for par_name in cal_run['res']['Parameters'].index: 

132 already_calibrated_parameters[par_name] = cal_run['res']['Parameters'][par_name] 

133 # Set fixed names: 

134 self.fixed_parameters.update(already_calibrated_parameters) 

135 

136 # Reset best iterate for new class 

137 self._current_best_iterate = {"Objective": np.inf} 

138 self.calibration_class = cal_class 

139 

140 # Set initial values 

141 initial_values = self.tuner_paras.get_initial_values() 

142 for idx, par_name in enumerate(self.tuner_paras.get_names()): 

143 if self.calibration_strategy == "sequential": 

144 # Use already calibrated values as initial value for new calibration 

145 # Delete it from fixed values and retreive the value 

146 initial_values[idx] = self.fixed_parameters.pop(par_name, 

147 initial_values[idx]) 

148 else: 

149 try: 

150 self.fixed_parameters.pop(par_name) # Just delete, don't use the value 

151 except KeyError: 

152 pass # Faster than checking if is in dict. 

153 

154 self.x0 = self.tuner_paras.scale(initial_values) 

155 # Either bounds are present or not. 

156 # If present, the obj will scale the values to 0 and 1. If not 

157 # we have an unconstrained optimization. 

158 if self.tuner_paras.bounds is None: 

159 self.bounds = None 

160 else: 

161 self.bounds = [(0, 1) for i in range(len(self.x0))] 

162 

163 #%% Execution 

164 # Run the single ModelicaCalibration 

165 super().calibrate(framework=framework, method=method, **kwargs) 

166 

167 #%% Post-processing 

168 # Append result to list for future perturbation based on older results. 

169 self._cal_history.append({"res": self._current_best_iterate, 

170 "cal_class": cal_class}) 

171 

172 res_tuner = self.check_intersection_of_tuner_parameters() 

173 

174 # Save calibrated parameter values in JSON 

175 parameter_values = {} 

176 for cal_run in self._cal_history: 

177 for p_name in cal_run['res']['Parameters'].index: 

178 parameter_values[p_name] = cal_run['res']['Parameters'][p_name] 

179 for p_name, res_intersection in res_tuner.items(): 

180 parameter_values[p_name] = res_intersection 

181 self.save_results(parameter_values=parameter_values, 

182 filename='MultiClassCalibrationResult') 

183 

184 return parameter_values 

185 

186 def _apply_start_time_method(self, start_time): 

187 """ 

188 Method to be calculate the start_time based on the used 

189 start-time-method (timedelta or fix-start). 

190 

191 :param float start_time: 

192 Start time which was specified by the user in the TOML file. 

193 :return float start_time - self.timedelta: 

194 Calculated "timedelta", if specified in the TOML file. 

195 :return float self.fix_start_time: 

196 Fix start time which was specified by the user in the TOML file. 

197 """ 

198 if self.start_time_method == "timedelta": 

199 # Check if timedelta does not fall below the 

200 # start_time (start_time should not be lower then zero) 

201 if start_time - self.timedelta < 0: 

202 # pylint: disable=import-outside-toplevel 

203 import warnings 

204 warnings.warn( 

205 'Simulation start time current calibration class \n' 

206 ' falls below 0, because of the chosen timedelta. ' 

207 'The start time will be set to 0 seconds.' 

208 ) 

209 return 0 

210 # Using timedelta, _ref_time is subtracted of the given start-time 

211 return start_time - self.timedelta 

212 else: 

213 # With fixed start, the _ref_time parameter is always returned 

214 return self.fix_start_time 

215 

216 def check_intersection_of_tuner_parameters(self): 

217 """ 

218 Checks intersections between tuner parameters. 

219 

220 :return dict res_tuner: 

221 Dictionary of the optimized tuner parameter names and values. 

222 """ 

223 

224 # merge all tuners (writes all values from all classes in one dictionary) 

225 merged_tuner_parameters = {} 

226 for cal_class in self._cal_history: 

227 for tuner_name, best_value in cal_class["res"]["Parameters"].items(): 

228 if (tuner_name in merged_tuner_parameters and 

229 best_value not in merged_tuner_parameters[tuner_name]): 

230 merged_tuner_parameters[tuner_name].append(best_value) 

231 else: 

232 merged_tuner_parameters[tuner_name] = [best_value] 

233 

234 # Get tuner parameter 

235 res_tuner = {} 

236 for tuner_para, values in merged_tuner_parameters.items(): 

237 res_tuner[tuner_para] = values[0] 

238 

239 # pop single values, as of no interest 

240 intersected_tuners = {} 

241 for tuner_para, values in merged_tuner_parameters.items(): 

242 if len(values) >= 2: 

243 intersected_tuners[tuner_para] = values 

244 

245 # Handle tuner intersections 

246 if intersected_tuners.keys(): 

247 # Plot or log the information, depending on which logger you are using: 

248 self.logger.log_intersection_of_tuners(intersected_tuners, 

249 itercount=self.recalibration_count) 

250 

251 # Return average value of ALL tuner parameters (not only intersected). 

252 # Reason: if there is an intersection of a tuner parameter, but 

253 # the results of both calibration classes are exactly the same, there 

254 # is no intersection and the affected parameter will not be 

255 # delivered to "res_tuner" if one of the other tuners 

256 # intersect and "intersected_tuners.keys()" is true. 

257 average_tuner_parameter = {} 

258 for tuner_para, values in merged_tuner_parameters.items(): 

259 average_tuner_parameter[tuner_para] = sum(values) / len(values) 

260 

261 self.logger.log("The tuner parameters used for evaluation " 

262 "are averaged as follows:\n " 

263 "{}".format(' ,'.join([f"{tuner}={value}" 

264 for tuner, value in average_tuner_parameter.items()]))) 

265 

266 # Create result-dictionary 

267 res_tuner = average_tuner_parameter 

268 

269 return res_tuner