Coverage for aixcalibuha/calibration/calibrator.py: 19%

263 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2026-04-20 14:06 +0000

1""" 

2Module containing the basic class to calibrate 

3a dynamic model, e.g. a modelica model. 

4""" 

5 

6import os 

7import json 

8from pathlib import Path 

9import time 

10import logging 

11from typing import Dict, Union 

12from copy import copy 

13import numpy as np 

14import pandas as pd 

15from ebcpy import data_types, Optimizer 

16from ebcpy.simulationapi import SimulationAPI 

17from aixcalibuha.utils import visualizer, MaxIterationsReached, MaxTimeReached, convert_mat_to_suffix, \ 

18 empty_postprocessing 

19from aixcalibuha import CalibrationClass, Goals, TunerParas 

20 

21 

22class Calibrator(Optimizer): 

23 """ 

24 This class can Calibrator be used for single 

25 time-intervals of calibration. 

26 

27 :param str,Path working_directory: 

28 Working directory 

29 :param ebcpy.simulationapi.SimulationAPI sim_api: 

30 Simulation-API for running the models 

31 :param CalibrationClass calibration_class: 

32 Class with information on Goals and tuner-parameters for calibration 

33 :keyword str result_path: 

34 If given, then the resulting parameter values will be stored in a JSON file 

35 at the given path. 

36 :keyword float timedelta: 

37 If you use this class for calibrating a single time-interval, 

38 you can set the timedelta to instantiate the simulation before 

39 actually evaluating it for the objective. 

40 The given float (default is 0) is subtracted from the start_time 

41 of your calibration_class. You can find a visualisation of said timedelta 

42 in the img folder of the project. 

43 :keyword boolean save_files: 

44 If true, all simulation files for each iteration will be saved! 

45 :keword suffix_files: 

46 Default 'csv'. Specifies the data format to store the simulation files in. 

47 Options are 'csv' and 'parquet' to save only the goals. 

48 If you want to keep the original 'mat' file specify 'mat' here (not recommended due to high disk size usage). 

49 :keyword str parquet_engine: 

50 The engine to use for the data format parquet. 

51 Supported options can be extracted 

52 from the ebcpy.TimeSeriesData.save() function. 

53 Default is 'pyarrow'. 

54 :keyword boolean verbose_logging: 

55 Default is True. If False, the standard Logger without 

56 Visualization in Form of plots is used. 

57 If you use this, the following keyword arguments below will help 

58 to further adjust the logging. 

59 :keyword boolean show_plot: 

60 Default is True. If False, all created plots are not shown during 

61 calibration but only stored at the end of the process. 

62 :keyword boolean create_tsd_plot: 

63 Default is True. If False, the plot of the time series data (goals) 

64 is not created and thus shown in during calibration. It therefore is 

65 also not stored, even if you set the save_tsd_plot keyword-argument to true. 

66 :keyword boolean save_tsd_plot: 

67 Default is False. If True, at each iteration the created plot of the 

68 time-series is saved. This may make the process much slower 

69 :keyword boolean fail_on_error: 

70 Default is False. If True, the calibration will stop with an error if 

71 the simulation fails. See also: ``ret_val_on_error`` 

72 :keyword float,np.NAN ret_val_on_error: 

73 Default is np.NAN. If ``fail_on_error`` is false, you can specify here 

74 which value to return in the case of a failed simulation. Possible 

75 options are np.NaN, np.inf or some other high numbers. be aware that this 

76 max influence the solver. 

77 :keyword dict fixed_parameters: 

78 Default is an empty dict. This dict may be used to add certain parameters 

79 to the simulation which are not tuned / variable during calibration. 

80 Such parameters may be used if the default values in the model don't 

81 represent the parameter values you want to use. 

82 :keyword boolean apply_penalty: 

83 Default is true. Specifies if a penalty function should be applied or not. 

84 :keyword boolean penalty_factor: 

85 Default is 0. Quantifies the impact of the penalty term on the objective function. 

86 The penalty factor is added to the objective function. 

87 :keyword boolean recalibration_count: 

88 Default is 0. Works as a counter and specifies the current cycle of recalibration. 

89 :keyword boolean perform_square_deviation: 

90 Default is false. 

91 If true the penalty function will evaluate the penalty factor with a quadratic approach. 

92 :keyword int max_itercount: 

93 Default is Infinity. 

94 Maximum number of iterations of calibration. 

95 This may be useful to explicitly limit the calibration 

96 time. 

97 :keyword int max_time": 

98 Deault is Infinity. 

99 Maximum time in seconds, after which the calibration is stopped. Useful to explicitly limit the calibration time. 

100 :keyword str plot_file_type: 

101 File ending of created plots. 

102 Any supported option in matplotlib, e.g. svg, png, pdf ... 

103 Default is png 

104 

105 """ 

106 

107 def __init__(self, 

108 working_directory: Union[Path, str], 

109 sim_api: SimulationAPI, 

110 calibration_class: CalibrationClass, 

111 **kwargs): 

112 """Instantiate instance attributes""" 

113 # %% Kwargs 

114 # Initialize supported keywords with default value 

115 # Pop the items so they wont be added when calling the 

116 # __init__ of the parent class. Always pop with a default value in case 

117 # the keyword is not passed. 

118 self.verbose_logging = kwargs.pop("verbose_logging", True) 

119 self.save_files = kwargs.pop("save_files", False) 

120 self.suffix_files = kwargs.pop("suffix_files", "csv") 

121 self.parquet_engine = kwargs.pop('parquet_engine', 'pyarrow') 

122 self.timedelta = kwargs.pop("timedelta", 0) 

123 self.fail_on_error = kwargs.pop("fail_on_error", False) 

124 self.ret_val_on_error = kwargs.pop("ret_val_on_error", np.NAN) 

125 self.fixed_parameters = kwargs.pop("fixed_parameters", {}) 

126 self.apply_penalty = kwargs.pop("apply_penalty", True) 

127 self.penalty_factor = kwargs.pop("penalty_factor", 0) 

128 self.recalibration_count = kwargs.pop("recalibration_count", 0) 

129 self.perform_square_deviation = kwargs.pop("square_deviation", False) 

130 self.result_path = kwargs.pop('result_path', None) 

131 self.max_itercount = kwargs.pop('max_itercount', np.inf) 

132 self.max_time = kwargs.pop('max_time', np.inf) 

133 self.save_current_best_iterate = kwargs.pop('save_current_best_iterate', False) 

134 self.at_calibration = True # Boolean to indicate if validating or calibrating 

135 # Extract kwargs for the visualizer 

136 visualizer_kwargs = { 

137 "save_tsd_plot": kwargs.pop("save_tsd_plot", False), 

138 "create_tsd_plot": kwargs.pop("create_tsd_plot", True), 

139 "show_plot": kwargs.pop("show_plot", True), 

140 "show_plot_pause_time": kwargs.pop("show_plot_pause_time", 1e-3), 

141 "file_type": kwargs.pop("plot_file_type", "png"), 

142 } 

143 

144 # Check if types are correct: 

145 # Booleans: 

146 _bool_kwargs = ["save_files"] 

147 for bool_keyword in _bool_kwargs: 

148 keyword_value = self.__getattribute__(bool_keyword) 

149 if not isinstance(keyword_value, bool): 

150 raise TypeError(f"Given {bool_keyword} is of type " 

151 f"{type(keyword_value).__name__} but should be type bool") 

152 

153 # %% Initialize all public parameters 

154 super().__init__(working_directory, **kwargs) 

155 # Set sim_api 

156 self.sim_api = sim_api 

157 

158 if not isinstance(calibration_class, CalibrationClass): 

159 raise TypeError(f"calibration_classes is of type {type(calibration_class).__name__} " 

160 f"but should be CalibrationClass") 

161 self.calibration_class = calibration_class 

162 # Scale tuner on boundaries 

163 self.x0 = self.tuner_paras.scale(self.tuner_paras.get_initial_values()) 

164 if self.tuner_paras.bounds is None: 

165 self.bounds = None 

166 else: 

167 # As tuner-parameters are scaled between 0 and 1, the scaled bounds are always 0 and 1 

168 self.bounds = [(0, 1) for i in range(len(self.x0))] 

169 # Add the values to the simulation setup. 

170 self.sim_api.set_sim_setup( 

171 {"start_time": self.calibration_class.start_time - self.timedelta, 

172 "stop_time": self.calibration_class.stop_time} 

173 ) 

174 

175 # %% Setup the logger 

176 # De-register the logger setup in the optimization class: 

177 if self.verbose_logging: 

178 self.logger = visualizer.CalibrationVisualizer( 

179 working_directory=working_directory, 

180 name=self.__class__.__name__, 

181 calibration_class=self.calibration_class, 

182 logger=self.logger, 

183 **visualizer_kwargs 

184 ) 

185 else: 

186 self.logger = visualizer.CalibrationLogger( 

187 working_directory=working_directory, 

188 name=self.__class__.__name__, 

189 calibration_class=self.calibration_class, 

190 logger=self.logger 

191 ) 

192 

193 self.working_directory_of_class = working_directory # Single class does not need an extra folder 

194 

195 # Set the output interval according the the given Goals 

196 mean_freq = self.goals.get_meas_frequency() 

197 self.logger.log("Setting output_interval of simulation according " 

198 f"to measurement target data frequency: {mean_freq}") 

199 self.sim_api.sim_setup.output_interval = mean_freq 

200 self.start_time = time.perf_counter() 

201 

202 def _check_for_termination(self): 

203 if self._counter >= self.max_itercount: 

204 raise MaxIterationsReached( 

205 "Terminating calibration as the maximum number " 

206 f"of iterations {self.max_itercount} has been reached." 

207 ) 

208 

209 if time.perf_counter() - self.start_time > self.max_time: 

210 raise MaxTimeReached( 

211 f"Terminating calibration as the maximum time of {self.max_time} s has been " 

212 f"reached" 

213 ) 

214 

215 

216 def obj(self, xk, *args, verbose: bool = False): 

217 """ 

218 Default objective function. 

219 The usual function will be implemented here: 

220 

221 1. Convert the set to modelica-units 

222 2. Simulate the converted-set 

223 3. Get data as a dataFrame 

224 4. Get penalty factor for the penalty function 

225 5. Calculate the objective based on statistical values 

226 

227 :param np.array xk: 

228 Array with normalized values for the minimizer 

229 :param int work_id: 

230 id for worker in Multiprocessing 

231 :param bool verbose: 

232 If True, returns the objective value and the unweighted objective dict (for validation). 

233 If False, returns only the objective value (for optimization). 

234 :return: 

235 If verbose == False (default) 

236 Objective value based on the used quality measurement 

237 If verbose == True 

238 Objective value and unweighted objective dict as a tuple 

239 :rtype: float or tuple 

240 """ 

241 # Info: This function is called by the optimization framework (scipy, dlib, etc.) 

242 # Initialize class objects 

243 self._current_iterate = xk 

244 self._counter += 1 

245 

246 # Convert set if multiple goals of different scales are used 

247 xk_descaled = self.tuner_paras.descale(xk) 

248 

249 # Set initial values of variable and fixed parameters 

250 self.sim_api.result_names = self.goals.get_sim_var_names() 

251 initial_names = self.tuner_paras.get_names() 

252 parameters = self.fixed_parameters.copy() 

253 parameters.update({name: value for name, value in zip(initial_names, xk_descaled.values)}) 

254 # Simulate 

255 # pylint: disable=broad-except 

256 try: 

257 # Generate the folder name for the calibration 

258 if self.save_files: 

259 if self.suffix_files == "mat": 

260 postprocess_mat_result = empty_postprocessing 

261 kwargs_postprocessing = {} 

262 else: 

263 postprocess_mat_result = convert_mat_to_suffix 

264 kwargs_postprocessing = { 

265 'variable_names': self.sim_api.result_names, 

266 'suffix_files': self.suffix_files, 

267 'parquet_engine': self.parquet_engine 

268 } 

269 savepath_files = os.path.join(self.sim_api.working_directory, 

270 f"simulation_{self._counter}") 

271 if self.sim_api.__class__.__name__ == "DymolaAPI": 

272 self.calibration_class.input_kwargs["postprocess_mat_result"] = postprocess_mat_result 

273 self.calibration_class.input_kwargs["kwargs_postprocessing"] = kwargs_postprocessing 

274 _filepath = self.sim_api.simulate( 

275 parameters=parameters, 

276 return_option="savepath", 

277 savepath=savepath_files, 

278 inputs=self.calibration_class.inputs, 

279 **self.calibration_class.input_kwargs 

280 ) 

281 # %% Load results and write to goals object 

282 sim_target_data = data_types.TimeSeriesData(_filepath) 

283 else: 

284 sim_target_data = self.sim_api.simulate( 

285 parameters=parameters, 

286 inputs=self.calibration_class.inputs, 

287 **self.calibration_class.input_kwargs 

288 ) 

289 except Exception as err: 

290 if self.fail_on_error: 

291 self.logger.error("Simulation failed. Raising the error.") 

292 raise err 

293 self.logger.error( 

294 f"Simulation failed. Returning '{self.ret_val_on_error}' " 

295 f"for the optimization. Error message: {err}" 

296 ) 

297 return self.ret_val_on_error 

298 

299 total_res, unweighted_objective = self._kpi_and_logging_calculation( 

300 xk_descaled=xk_descaled, 

301 counter=self._counter, 

302 results=sim_target_data 

303 ) 

304 self._check_for_termination() 

305 

306 if verbose: 

307 return total_res, unweighted_objective 

308 

309 return total_res 

310 

311 def mp_obj(self, x, *args): 

312 # Initialize list for results 

313 num_evals = len(x) 

314 total_res_list = np.empty([num_evals, 1]) 

315 # Set initial values of variable and fixed parameters 

316 self.sim_api.result_names = self.goals.get_sim_var_names() 

317 initial_names = self.tuner_paras.get_names() 

318 parameters = self.fixed_parameters.copy() 

319 

320 parameter_list = [] 

321 xk_descaled_list = [] 

322 for _xk_single in x: 

323 # Convert set if multiple goals of different scales are used 

324 xk_descaled = self.tuner_paras.descale(_xk_single) 

325 xk_descaled_list.append(xk_descaled) 

326 # Update Parameters 

327 parameter_copy = parameters.copy() 

328 parameter_copy.update( 

329 {name: value for name, value in zip(initial_names, xk_descaled.values)}) 

330 parameter_list.append(parameter_copy) 

331 

332 # Simulate 

333 if self.save_files: 

334 if self.suffix_files == "mat": 

335 postprocess_mat_result = empty_postprocessing 

336 kwargs_postprocessing = {} 

337 else: 

338 postprocess_mat_result = convert_mat_to_suffix 

339 kwargs_postprocessing = { 

340 'variable_names': self.sim_api.result_names, 

341 'suffix_files': self.suffix_files, 

342 'parquet_engine': self.parquet_engine 

343 } 

344 result_file_names = [f"simulation_{self._counter + idx}" for idx in 

345 range(len(parameter_list))] 

346 if self.sim_api.__class__.__name__ == "DymolaAPI": 

347 self.calibration_class.input_kwargs["postprocess_mat_result"] = postprocess_mat_result 

348 self.calibration_class.input_kwargs["kwargs_postprocessing"] = kwargs_postprocessing 

349 _filepaths = self.sim_api.simulate( 

350 parameters=parameter_list, 

351 return_option="savepath", 

352 savepath=self.sim_api.working_directory, 

353 result_file_name=result_file_names, 

354 fail_on_error=self.fail_on_error, 

355 inputs=self.calibration_class.inputs, 

356 **self.calibration_class.input_kwargs 

357 ) 

358 # Load results 

359 results = [] 

360 for _filepath in _filepaths: 

361 if _filepath is None: 

362 results.append(None) 

363 else: 

364 results.append(data_types.TimeSeriesData(_filepath)) 

365 else: 

366 results = self.sim_api.simulate( 

367 parameters=parameter_list, 

368 inputs=self.calibration_class.inputs, 

369 fail_on_error=self.fail_on_error, 

370 **self.calibration_class.input_kwargs 

371 ) 

372 

373 for idx, result in enumerate(results): 

374 self._counter += 1 

375 self._current_iterate = result 

376 if result is None: 

377 total_res_list[idx] = self.ret_val_on_error 

378 continue 

379 total_res, unweighted_objective = self._kpi_and_logging_calculation( 

380 xk_descaled=xk_descaled_list[idx], 

381 counter=self._counter, 

382 results=result 

383 ) 

384 # Add single objective to objective list of total Population 

385 total_res_list[idx] = total_res 

386 self._check_for_termination() 

387 

388 return total_res_list 

389 

390 def _kpi_and_logging_calculation(self, *, xk_descaled, counter, results): 

391 """ 

392 Function to calculate everything needed in the obj or mp_obj 

393 function after the simulation finished. 

394 

395 """ 

396 xk = self.tuner_paras.scale(xk_descaled) 

397 

398 self.goals.set_sim_target_data(results) 

399 # Trim results based on start and end-time of cal class 

400 self.goals.set_relevant_time_intervals(self.calibration_class.relevant_intervals) 

401 

402 # %% Evaluate the current objective 

403 # Penalty function (get penalty factor) 

404 if self.recalibration_count > 1 and self.apply_penalty: 

405 # There is no benchmark in the first iteration or 

406 # first iterations were skipped, so no penalty is applied 

407 penaltyfactor = self.get_penalty(xk_descaled, xk) 

408 # Evaluate with penalty 

409 penalty = penaltyfactor 

410 else: 

411 # Evaluate without penalty 

412 penaltyfactor = 1 

413 penalty = None 

414 total_res, unweighted_objective = self.goals.eval_difference( 

415 verbose=True, 

416 penaltyfactor=penaltyfactor 

417 ) 

418 if self.at_calibration: # Only plot if at_calibration 

419 self.logger.calibration_callback_func( 

420 xk=xk, 

421 obj=total_res, 

422 verbose_information=unweighted_objective, 

423 penalty=penalty 

424 ) 

425 # current best iteration step of current calibration class 

426 if total_res < self._current_best_iterate["Objective"]: 

427 # self.best_goals = self.goals 

428 self._current_best_iterate = { 

429 "Iterate": counter, 

430 "Objective": total_res, 

431 "Unweighted Objective": unweighted_objective, 

432 "Parameters": xk_descaled, 

433 "Goals": self.goals, 

434 # For penalty function and for saving goals as csv 

435 "better_current_result": True, 

436 # Changed to false in this script after calling function "save_calibration_results" 

437 "Penaltyfactor": penalty 

438 } 

439 if self.save_current_best_iterate: 

440 parameter_values = self._get_parameter_dict_from_current_best_iterate() 

441 

442 temp_save = { 

443 "parameters": parameter_values, 

444 "objective": total_res 

445 } 

446 with open(self.working_directory.joinpath('best_iterate.json'), 'w') as json_file: 

447 json.dump(temp_save, json_file, indent=4) 

448 

449 return total_res, unweighted_objective 

450 

451 def calibrate(self, framework, method=None, **kwargs) -> dict: 

452 """ 

453 Start the calibration process of the calibration classes, visualize and save the results. 

454 

455 The arguments of this function are equal to the 

456 arguments in Optimizer.optimize(). Look at the docstring 

457 in ebcpy to know which options are available. 

458 """ 

459 # %% Start Calibration: 

460 self.at_calibration = True 

461 self.logger.log(f"Start calibration of model: {self.sim_api.model_name}" 

462 f" with framework-class {self.__class__.__name__}") 

463 self.logger.log(f"Class: {self.calibration_class.name}, Start and Stop-Time " 

464 f"of simulation: {self.calibration_class.start_time}" 

465 f"-{self.calibration_class.stop_time} s\n Time-Intervals used" 

466 f" for objective: {self.calibration_class.relevant_intervals}") 

467 

468 # Setup the visualizer for plotting and logging: 

469 self.logger.calibrate_new_class(self.calibration_class, 

470 working_directory=self.working_directory_of_class, 

471 for_validation=False) 

472 self.logger.log_initial_names() 

473 

474 # Duration of Calibration 

475 t_cal_start = time.time() 

476 

477 # Run optimization 

478 try: 

479 _res = self.optimize( 

480 framework=framework, 

481 method=method, 

482 n_cpu=self.sim_api.n_cpu, 

483 **kwargs) 

484 except (MaxIterationsReached, MaxTimeReached) as err: 

485 self.logger.log(msg=str(err), level=logging.WARNING) 

486 t_cal_stop = time.time() 

487 t_cal = t_cal_stop - t_cal_start 

488 

489 # Check if optimization worked correctly 

490 if "Iterate" not in self._current_best_iterate: 

491 raise Exception( 

492 "Some error during calibration yielded no successful iteration. " 

493 "Can't save or return any results." 

494 ) 

495 

496 # %% Save the relevant results. 

497 self.logger.save_calibration_result(self._current_best_iterate, 

498 self.sim_api.model_name, 

499 duration=t_cal, 

500 itercount=self.recalibration_count) 

501 # Reset 

502 self._current_best_iterate['better_current_result'] = False 

503 

504 # Save calibrated parameter values in JSON 

505 parameter_values = self._get_parameter_dict_from_current_best_iterate() 

506 self.save_results(parameter_values=parameter_values, 

507 filename=self.calibration_class.name) 

508 return parameter_values 

509 

510 def _get_parameter_dict_from_current_best_iterate(self) -> dict: 

511 """ 

512 Get the parameter dictionary from the current best iterate. 

513 """ 

514 parameter_values = {} 

515 for p_name in self._current_best_iterate['Parameters'].index: 

516 parameter_values[p_name] = self._current_best_iterate['Parameters'][p_name] 

517 return parameter_values 

518 

519 @property 

520 def calibration_class(self) -> CalibrationClass: 

521 """Get the current calibration class""" 

522 return self._cal_class 

523 

524 @calibration_class.setter 

525 def calibration_class(self, calibration_class: CalibrationClass): 

526 """Set the current calibration class""" 

527 self.sim_api.set_sim_setup( 

528 {"start_time": self._apply_start_time_method(start_time=calibration_class.start_time), 

529 "stop_time": calibration_class.stop_time} 

530 ) 

531 self._cal_class = calibration_class 

532 

533 @property 

534 def tuner_paras(self) -> TunerParas: 

535 """Get the current tuner parameters of the calibration class""" 

536 return self.calibration_class.tuner_paras 

537 

538 @tuner_paras.setter 

539 def tuner_paras(self, tuner_paras: TunerParas): 

540 """Set the current tuner parameters of the calibration class""" 

541 self.calibration_class.tuner_paras = tuner_paras 

542 

543 @property 

544 def goals(self) -> Goals: 

545 """Get the current goals of the calibration class""" 

546 return self.calibration_class.goals 

547 

548 @goals.setter 

549 def goals(self, goals: Goals): 

550 """Set the current goals of the calibration class""" 

551 self.calibration_class.goals = goals 

552 

553 @property 

554 def fixed_parameters(self) -> dict: 

555 """Get the currently fixed parameters during calibration""" 

556 return self._fixed_pars 

557 

558 @fixed_parameters.setter 

559 def fixed_parameters(self, fixed_parameters: dict): 

560 """Set the currently fixed parameters during calibration""" 

561 self._fixed_pars = fixed_parameters 

562 

563 def save_results(self, parameter_values: dict, filename: str): 

564 """Saves the given dict into a file with path 

565 self.result_path and name filename.""" 

566 if self.result_path is not None: 

567 os.makedirs(self.result_path, exist_ok=True) 

568 s_path = os.path.join(self.result_path, f'{filename}.json') 

569 with open(s_path, 'w') as json_file: 

570 json.dump(parameter_values, json_file, indent=4) 

571 

572 def validate(self, validation_class: CalibrationClass, calibration_result: Dict, verbose=False): 

573 """ 

574 Validate the given calibration class based on the given 

575 values for tuner_parameters. 

576 

577 :param CalibrationClass validation_class: 

578 The class to validate on 

579 :param dict calibration_result: 

580 The calibration result to apply to the validation class on. 

581 """ 

582 # Start Validation: 

583 self.at_calibration = False 

584 self.logger.log(f"Start validation of model: {self.sim_api.model_name} with " 

585 f"framework-class {self.__class__.__name__}") 

586 # Use start-time of calibration class 

587 self.calibration_class = validation_class 

588 start_time = self._apply_start_time_method( 

589 start_time=self.calibration_class.start_time 

590 ) 

591 old_tuner_paras = copy(self.calibration_class.tuner_paras) 

592 tuner_values = list(calibration_result.values()) 

593 self.calibration_class.tuner_paras = TunerParas( 

594 names=list(calibration_result.keys()), 

595 initial_values=tuner_values, 

596 # Dummy bounds as they are scaled anyway 

597 bounds=[(val - 1, val + 1) for val in tuner_values] 

598 ) 

599 

600 # Set the start-time for the simulation 

601 self.sim_api.sim_setup.start_time = start_time 

602 

603 self.logger.calibrate_new_class(self.calibration_class, 

604 working_directory=self.working_directory_of_class, 

605 for_validation=True) 

606 

607 # Use the results parameter vector to simulate again. 

608 self._counter = 0 # Reset to one 

609 # Scale the tuner parameters 

610 xk = self.tuner_paras.scale(tuner_values) 

611 # Evaluate objective 

612 obj, unweighted_objective = self.obj(xk=xk, verbose=True) 

613 self.logger.validation_callback_func( 

614 obj=obj 

615 ) 

616 # Reset tuner_parameters to avoid unwanted behaviour 

617 self.calibration_class.tuner_paras = old_tuner_paras 

618 if verbose: 

619 weights = [1] 

620 objectives = [obj] 

621 goals = ['all'] 

622 for goal, val in unweighted_objective.items(): 

623 weights.append(val[0]) 

624 objectives.append(val[1]) 

625 goals.append(goal) 

626 index = pd.MultiIndex.from_product( 

627 [[validation_class.name], goals], 

628 names=['Class', 'Goal'] 

629 ) 

630 obj_verbos = pd.DataFrame( 

631 {'weight': weights, validation_class.goals.statistical_measure: objectives}, 

632 index=index 

633 ) 

634 return obj_verbos 

635 return obj 

636 

637 def _handle_error(self, error): 

638 """ 

639 Also save the plots if an error occurs. 

640 See ebcpy.optimization.Optimizer._handle_error for more info. 

641 """ 

642 # This error is our own, we handle it in the calibrate() function 

643 if isinstance(error, (MaxIterationsReached, MaxTimeReached)): 

644 raise error 

645 self.logger.save_calibration_result(best_iterate=self._current_best_iterate, 

646 model_name=self.sim_api.model_name, 

647 duration=0, 

648 itercount=0) 

649 super()._handle_error(error) 

650 

651 def get_penalty(self, current_tuners, current_tuners_scaled): 

652 """ 

653 Get penalty factor for evaluation of current objective. The penaltyfactor 

654 considers deviations of the tuner parameters in the objective function. 

655 First the relative deviation between the current best values 

656 of the tuner parameters from the recalibration steps and 

657 the tuner parameters obtained in the current iteration step is determined. 

658 Then the penaltyfactor is being increased according to the relative deviation. 

659 

660 :param pd.series current_tuner_values: 

661 To add 

662 :return: float penalty 

663 Penaltyfactor for evaluation. 

664 """ 

665 # TO-DO: Add possibility to consider the sensitivity of tuner parameters 

666 

667 # Get lists of tuner values (latest best (with all other tuners) & current values) 

668 previous = self.sim_api.all_tuners_dict 

669 previous_scaled = self.sim_api.all_tuners_dict_scaled 

670 # previous_scaled = list(self.sim_api.all_tuners_dict.keys()) 

671 current = current_tuners 

672 current_scaled = dict(current_tuners_scaled) 

673 

674 # Apply penalty function 

675 penalty = 1 

676 for key, value in current_scaled.items(): 

677 # Add corresponding function for penaltyfactor here 

678 if self.perform_square_deviation: 

679 # Apply quadratic deviation 

680 dev_square = (value - previous_scaled[key]) ** 2 

681 penalty += self.penalty_factor * dev_square 

682 else: 

683 # Apply relative deviation 

684 # Ingore tuner parameter whose current best value is 0 

685 if previous[key] == 0: 

686 continue 

687 # Get relative deviation of tuner values (reference: previous) 

688 try: 

689 dev = abs(current[key] - previous[key]) / abs(previous[key]) 

690 penalty += self.penalty_factor * dev 

691 except ZeroDivisionError: 

692 pass 

693 

694 return penalty 

695 

696 def _apply_start_time_method(self, start_time): 

697 """ 

698 Method to be calculate the start_time based on the used 

699 timedelta method. 

700 

701 :param float start_time: 

702 Start time which was specified by the user in the TOML file. 

703 :return float start_time - self.timedelta: 

704 Calculated "timedelta", if specified in the TOML file. 

705 """ 

706 return start_time - self.timedelta