Coverage for aixcalibuha/calibration/calibrator.py: 64%

263 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2026-04-30 14:23 +0000

1""" 

2Module containing the basic class to calibrate 

3a dynamic model, e.g. a modelica model. 

4""" 

5 

6import os 

7import json 

8from pathlib import Path 

9import time 

10import logging 

11from typing import Dict, Union 

12from copy import copy 

13import numpy as np 

14import pandas as pd 

15from ebcpy import load_time_series_data, Optimizer 

16from ebcpy.simulationapi import SimulationAPI 

17from aixcalibuha.utils import visualizer, MaxIterationsReached, MaxTimeReached, convert_mat_to_suffix, \ 

18 empty_postprocessing 

19from aixcalibuha import CalibrationClass, Goals, TunerParas 

20 

21 

22class Calibrator(Optimizer): 

23 """ 

24 This class can Calibrator be used for single 

25 time-intervals of calibration. 

26 

27 :param str,Path working_directory: 

28 Working directory 

29 :param ebcpy.simulationapi.SimulationAPI sim_api: 

30 Simulation-API for running the models 

31 :param CalibrationClass calibration_class: 

32 Class with information on Goals and tuner-parameters for calibration 

33 :keyword str result_path: 

34 If given, then the resulting parameter values will be stored in a JSON file 

35 at the given path. 

36 :keyword float timedelta: 

37 If you use this class for calibrating a single time-interval, 

38 you can set the timedelta to instantiate the simulation before 

39 actually evaluating it for the objective. 

40 The given float (default is 0) is subtracted from the start_time 

41 of your calibration_class. You can find a visualisation of said timedelta 

42 in the img folder of the project. 

43 :keyword boolean save_files: 

44 If true, all simulation files for each iteration will be saved! 

45 :keyword str suffix_files: 

46 Default 'csv'. Specifies the data format to store the simulation files in. 

47 Options are 'csv', 'parquet', or 'parquet.COMPRESSION' (e.g. 'parquet.snappy', 

48 'parquet.gzip') to save only the goals. 

49 If you want to keep the original 'mat' file specify 'mat' here 

50 (not recommended due to high disk size usage). 

51 :keyword str parquet_engine: 

52 The engine to use for the data format parquet. 

53 Supported options can be extracted 

54 from the ebcpy DataFrame accessor ``df.tsd.save()`` function. 

55 Default is 'pyarrow'. 

56 :keyword boolean verbose_logging: 

57 Default is True. If False, the standard Logger without 

58 Visualization in Form of plots is used. 

59 If you use this, the following keyword arguments below will help 

60 to further adjust the logging. 

61 :keyword boolean show_plot: 

62 Default is True. If False, all created plots are not shown during 

63 calibration but only stored at the end of the process. 

64 :keyword boolean create_tsd_plot: 

65 Default is True. If False, the plot of the time series data (goals) 

66 is not created and thus shown in during calibration. It therefore is 

67 also not stored, even if you set the save_tsd_plot keyword-argument to true. 

68 :keyword boolean save_tsd_plot: 

69 Default is False. If True, at each iteration the created plot of the 

70 time-series is saved. This may make the process much slower 

71 :keyword boolean fail_on_error: 

72 Default is False. If True, the calibration will stop with an error if 

73 the simulation fails. See also: ``ret_val_on_error`` 

74 :keyword float,np.nan ret_val_on_error: 

75 Default is np.nan. If ``fail_on_error`` is false, you can specify here 

76 which value to return in the case of a failed simulation. Possible 

77 options are np.nan, np.inf or some other high numbers. be aware that this 

78 max influence the solver. 

79 :keyword dict fixed_parameters: 

80 Default is an empty dict. This dict may be used to add certain parameters 

81 to the simulation which are not tuned / variable during calibration. 

82 Such parameters may be used if the default values in the model don't 

83 represent the parameter values you want to use. 

84 :keyword boolean apply_penalty: 

85 Default is true. Specifies if a penalty function should be applied or not. 

86 :keyword boolean penalty_factor: 

87 Default is 0. Quantifies the impact of the penalty term on the objective function. 

88 The penalty factor is added to the objective function. 

89 :keyword boolean recalibration_count: 

90 Default is 0. Works as a counter and specifies the current cycle of recalibration. 

91 :keyword boolean perform_square_deviation: 

92 Default is false. 

93 If true the penalty function will evaluate the penalty factor with a quadratic approach. 

94 :keyword int max_itercount: 

95 Default is Infinity. 

96 Maximum number of iterations of calibration. 

97 This may be useful to explicitly limit the calibration 

98 time. 

99 :keyword int max_time": 

100 Deault is Infinity. 

101 Maximum time in seconds, after which the calibration is stopped. Useful to explicitly limit the calibration time. 

102 :keyword str plot_file_type: 

103 File ending of created plots. 

104 Any supported option in matplotlib, e.g. svg, png, pdf ... 

105 Default is png 

106 

107 """ 

108 

109 def __init__(self, 

110 working_directory: Union[Path, str], 

111 sim_api: SimulationAPI, 

112 calibration_class: CalibrationClass, 

113 **kwargs): 

114 """Instantiate instance attributes""" 

115 # %% Kwargs 

116 # Initialize supported keywords with default value 

117 # Pop the items so they wont be added when calling the 

118 # __init__ of the parent class. Always pop with a default value in case 

119 # the keyword is not passed. 

120 self.verbose_logging = kwargs.pop("verbose_logging", True) 

121 self.save_files = kwargs.pop("save_files", False) 

122 self.suffix_files = kwargs.pop("suffix_files", "csv") 

123 self.parquet_engine = kwargs.pop('parquet_engine', 'pyarrow') 

124 self.timedelta = kwargs.pop("timedelta", 0) 

125 self.fail_on_error = kwargs.pop("fail_on_error", False) 

126 self.ret_val_on_error = kwargs.pop("ret_val_on_error", np.nan) 

127 self.fixed_parameters = kwargs.pop("fixed_parameters", {}) 

128 self.apply_penalty = kwargs.pop("apply_penalty", True) 

129 self.penalty_factor = kwargs.pop("penalty_factor", 0) 

130 self.recalibration_count = kwargs.pop("recalibration_count", 0) 

131 self.perform_square_deviation = kwargs.pop("square_deviation", False) 

132 self.result_path = kwargs.pop('result_path', None) 

133 self.max_itercount = kwargs.pop('max_itercount', np.inf) 

134 self.max_time = kwargs.pop('max_time', np.inf) 

135 self.save_current_best_iterate = kwargs.pop('save_current_best_iterate', False) 

136 self.at_calibration = True # Boolean to indicate if validating or calibrating 

137 # Extract kwargs for the visualizer 

138 visualizer_kwargs = { 

139 "save_tsd_plot": kwargs.pop("save_tsd_plot", False), 

140 "create_tsd_plot": kwargs.pop("create_tsd_plot", True), 

141 "show_plot": kwargs.pop("show_plot", True), 

142 "show_plot_pause_time": kwargs.pop("show_plot_pause_time", 1e-3), 

143 "file_type": kwargs.pop("plot_file_type", "png"), 

144 } 

145 

146 # Check if types are correct: 

147 # Booleans: 

148 _bool_kwargs = ["save_files"] 

149 for bool_keyword in _bool_kwargs: 

150 keyword_value = self.__getattribute__(bool_keyword) 

151 if not isinstance(keyword_value, bool): 

152 raise TypeError(f"Given {bool_keyword} is of type " 

153 f"{type(keyword_value).__name__} but should be type bool") 

154 

155 # %% Initialize all public parameters 

156 super().__init__(working_directory, **kwargs) 

157 # Set sim_api 

158 self.sim_api = sim_api 

159 

160 if not isinstance(calibration_class, CalibrationClass): 

161 raise TypeError(f"calibration_classes is of type {type(calibration_class).__name__} " 

162 f"but should be CalibrationClass") 

163 self.calibration_class = calibration_class 

164 # Scale tuner on boundaries 

165 self.x0 = self.tuner_paras.scale(self.tuner_paras.get_initial_values()) 

166 if self.tuner_paras.bounds is None: 

167 self.bounds = None 

168 else: 

169 # As tuner-parameters are scaled between 0 and 1, the scaled bounds are always 0 and 1 

170 self.bounds = [(0, 1) for i in range(len(self.x0))] 

171 # Add the values to the simulation setup. 

172 self.sim_api.set_sim_setup( 

173 {"start_time": self.calibration_class.start_time - self.timedelta, 

174 "stop_time": self.calibration_class.stop_time} 

175 ) 

176 

177 # %% Setup the logger 

178 # De-register the logger setup in the optimization class: 

179 if self.verbose_logging: 

180 self.logger = visualizer.CalibrationVisualizer( 

181 working_directory=working_directory, 

182 name=self.__class__.__name__, 

183 calibration_class=self.calibration_class, 

184 logger=self.logger, 

185 **visualizer_kwargs 

186 ) 

187 else: 

188 self.logger = visualizer.CalibrationLogger( 

189 working_directory=working_directory, 

190 name=self.__class__.__name__, 

191 calibration_class=self.calibration_class, 

192 logger=self.logger 

193 ) 

194 

195 self.working_directory_of_class = working_directory # Single class does not need an extra folder 

196 

197 # Set the output interval according the the given Goals 

198 mean_freq = self.goals.get_meas_frequency() 

199 self.logger.log("Setting output_interval of simulation according " 

200 f"to measurement target data frequency: {mean_freq}") 

201 self.sim_api.sim_setup.output_interval = mean_freq 

202 self.start_time = time.perf_counter() 

203 

204 def _check_for_termination(self): 

205 if self._counter >= self.max_itercount: 

206 raise MaxIterationsReached( 

207 "Terminating calibration as the maximum number " 

208 f"of iterations {self.max_itercount} has been reached." 

209 ) 

210 

211 if time.perf_counter() - self.start_time > self.max_time: 

212 raise MaxTimeReached( 

213 f"Terminating calibration as the maximum time of {self.max_time} s has been " 

214 f"reached" 

215 ) 

216 

217 

218 def obj(self, xk, *args, verbose: bool = False): 

219 """ 

220 Default objective function. 

221 The usual function will be implemented here: 

222 

223 1. Convert the set to modelica-units 

224 2. Simulate the converted-set 

225 3. Get data as a dataFrame 

226 4. Get penalty factor for the penalty function 

227 5. Calculate the objective based on statistical values 

228 

229 :param np.array xk: 

230 Array with normalized values for the minimizer 

231 :param int work_id: 

232 id for worker in Multiprocessing 

233 :param bool verbose: 

234 If True, returns the objective value and the unweighted objective dict (for validation). 

235 If False, returns only the objective value (for optimization). 

236 :return: 

237 If verbose == False (default) 

238 Objective value based on the used quality measurement 

239 If verbose == True 

240 Objective value and unweighted objective dict as a tuple 

241 :rtype: float or tuple 

242 """ 

243 # Info: This function is called by the optimization framework (scipy, dlib, etc.) 

244 # Initialize class objects 

245 self._current_iterate = xk 

246 self._counter += 1 

247 

248 # Convert set if multiple goals of different scales are used 

249 xk_descaled = self.tuner_paras.descale(xk) 

250 

251 # Set initial values of variable and fixed parameters 

252 self.sim_api.result_names = self.goals.get_sim_var_names() 

253 initial_names = self.tuner_paras.get_names() 

254 parameters = self.fixed_parameters.copy() 

255 parameters.update({name: value for name, value in zip(initial_names, xk_descaled.values)}) 

256 # Simulate 

257 # pylint: disable=broad-except 

258 try: 

259 # Generate the folder name for the calibration 

260 if self.save_files: 

261 if self.suffix_files == "mat": 

262 postprocess_mat_result = empty_postprocessing 

263 kwargs_postprocessing = {} 

264 else: 

265 postprocess_mat_result = convert_mat_to_suffix 

266 kwargs_postprocessing = { 

267 'variable_names': self.sim_api.result_names, 

268 'suffix_files': self.suffix_files, 

269 'parquet_engine': self.parquet_engine 

270 } 

271 savepath_files = os.path.join(self.sim_api.working_directory, 

272 f"simulation_{self._counter}") 

273 if self.sim_api.__class__.__name__ == "DymolaAPI": 

274 self.calibration_class.input_kwargs["postprocess_mat_result"] = postprocess_mat_result 

275 self.calibration_class.input_kwargs["kwargs_postprocessing"] = kwargs_postprocessing 

276 _filepath = self.sim_api.simulate( 

277 parameters=parameters, 

278 return_option="savepath", 

279 savepath=savepath_files, 

280 inputs=self.calibration_class.inputs, 

281 **self.calibration_class.input_kwargs 

282 ) 

283 # %% Load results and write to goals object 

284 sim_target_data = load_time_series_data(_filepath) 

285 else: 

286 sim_target_data = self.sim_api.simulate( 

287 parameters=parameters, 

288 inputs=self.calibration_class.inputs, 

289 **self.calibration_class.input_kwargs 

290 ) 

291 except Exception as err: 

292 if self.fail_on_error: 

293 self.logger.error("Simulation failed. Raising the error.") 

294 raise err 

295 self.logger.error( 

296 f"Simulation failed. Returning '{self.ret_val_on_error}' " 

297 f"for the optimization. Error message: {err}" 

298 ) 

299 return self.ret_val_on_error 

300 

301 total_res, unweighted_objective = self._kpi_and_logging_calculation( 

302 xk_descaled=xk_descaled, 

303 counter=self._counter, 

304 results=sim_target_data 

305 ) 

306 self._check_for_termination() 

307 

308 if verbose: 

309 return total_res, unweighted_objective 

310 

311 return total_res 

312 

313 def mp_obj(self, x, *args): 

314 # Initialize list for results 

315 num_evals = len(x) 

316 total_res_list = np.empty([num_evals, 1]) 

317 # Set initial values of variable and fixed parameters 

318 self.sim_api.result_names = self.goals.get_sim_var_names() 

319 initial_names = self.tuner_paras.get_names() 

320 parameters = self.fixed_parameters.copy() 

321 

322 parameter_list = [] 

323 xk_descaled_list = [] 

324 for _xk_single in x: 

325 # Convert set if multiple goals of different scales are used 

326 xk_descaled = self.tuner_paras.descale(_xk_single) 

327 xk_descaled_list.append(xk_descaled) 

328 # Update Parameters 

329 parameter_copy = parameters.copy() 

330 parameter_copy.update( 

331 {name: value for name, value in zip(initial_names, xk_descaled.values)}) 

332 parameter_list.append(parameter_copy) 

333 

334 # Simulate 

335 if self.save_files: 

336 if self.suffix_files == "mat": 

337 postprocess_mat_result = empty_postprocessing 

338 kwargs_postprocessing = {} 

339 else: 

340 postprocess_mat_result = convert_mat_to_suffix 

341 kwargs_postprocessing = { 

342 'variable_names': self.sim_api.result_names, 

343 'suffix_files': self.suffix_files, 

344 'parquet_engine': self.parquet_engine 

345 } 

346 result_file_names = [f"simulation_{self._counter + idx}" for idx in 

347 range(len(parameter_list))] 

348 if self.sim_api.__class__.__name__ == "DymolaAPI": 

349 self.calibration_class.input_kwargs["postprocess_mat_result"] = postprocess_mat_result 

350 self.calibration_class.input_kwargs["kwargs_postprocessing"] = kwargs_postprocessing 

351 _filepaths = self.sim_api.simulate( 

352 parameters=parameter_list, 

353 return_option="savepath", 

354 savepath=self.sim_api.working_directory, 

355 result_file_name=result_file_names, 

356 fail_on_error=self.fail_on_error, 

357 inputs=self.calibration_class.inputs, 

358 **self.calibration_class.input_kwargs 

359 ) 

360 # Load results 

361 results = [] 

362 for _filepath in _filepaths: 

363 if _filepath is None: 

364 results.append(None) 

365 else: 

366 results.append(load_time_series_data(_filepath)) 

367 else: 

368 results = self.sim_api.simulate( 

369 parameters=parameter_list, 

370 inputs=self.calibration_class.inputs, 

371 fail_on_error=self.fail_on_error, 

372 **self.calibration_class.input_kwargs 

373 ) 

374 

375 for idx, result in enumerate(results): 

376 self._counter += 1 

377 self._current_iterate = result 

378 if result is None: 

379 total_res_list[idx] = self.ret_val_on_error 

380 continue 

381 total_res, unweighted_objective = self._kpi_and_logging_calculation( 

382 xk_descaled=xk_descaled_list[idx], 

383 counter=self._counter, 

384 results=result 

385 ) 

386 # Add single objective to objective list of total Population 

387 total_res_list[idx] = total_res 

388 self._check_for_termination() 

389 

390 return total_res_list 

391 

392 def _kpi_and_logging_calculation(self, *, xk_descaled, counter, results): 

393 """ 

394 Function to calculate everything needed in the obj or mp_obj 

395 function after the simulation finished. 

396 

397 """ 

398 xk = self.tuner_paras.scale(xk_descaled) 

399 

400 self.goals.set_sim_target_data(results) 

401 # Trim results based on start and end-time of cal class 

402 self.goals.set_relevant_time_intervals(self.calibration_class.relevant_intervals) 

403 

404 # %% Evaluate the current objective 

405 # Penalty function (get penalty factor) 

406 if self.recalibration_count > 1 and self.apply_penalty: 

407 # There is no benchmark in the first iteration or 

408 # first iterations were skipped, so no penalty is applied 

409 penaltyfactor = self.get_penalty(xk_descaled, xk) 

410 # Evaluate with penalty 

411 penalty = penaltyfactor 

412 else: 

413 # Evaluate without penalty 

414 penaltyfactor = 1 

415 penalty = None 

416 total_res, unweighted_objective = self.goals.eval_difference( 

417 verbose=True, 

418 penaltyfactor=penaltyfactor 

419 ) 

420 if self.at_calibration: # Only plot if at_calibration 

421 self.logger.calibration_callback_func( 

422 xk=xk, 

423 obj=total_res, 

424 verbose_information=unweighted_objective, 

425 penalty=penalty 

426 ) 

427 # current best iteration step of current calibration class 

428 if total_res < self._current_best_iterate["Objective"]: 

429 # self.best_goals = self.goals 

430 self._current_best_iterate = { 

431 "Iterate": counter, 

432 "Objective": total_res, 

433 "Unweighted Objective": unweighted_objective, 

434 "Parameters": xk_descaled, 

435 "Goals": self.goals, 

436 # For penalty function and for saving goals as csv 

437 "better_current_result": True, 

438 # Changed to false in this script after calling function "save_calibration_results" 

439 "Penaltyfactor": penalty 

440 } 

441 if self.save_current_best_iterate: 

442 parameter_values = self._get_parameter_dict_from_current_best_iterate() 

443 

444 temp_save = { 

445 "parameters": parameter_values, 

446 "objective": total_res 

447 } 

448 with open(self.working_directory.joinpath('best_iterate.json'), 'w') as json_file: 

449 json.dump(temp_save, json_file, indent=4) 

450 

451 return total_res, unweighted_objective 

452 

453 def calibrate(self, framework, method=None, **kwargs) -> dict: 

454 """ 

455 Start the calibration process of the calibration classes, visualize and save the results. 

456 

457 The arguments of this function are equal to the 

458 arguments in Optimizer.optimize(). Look at the docstring 

459 in ebcpy to know which options are available. 

460 """ 

461 # %% Start Calibration: 

462 self.at_calibration = True 

463 self.logger.log(f"Start calibration of model: {self.sim_api.model_name}" 

464 f" with framework-class {self.__class__.__name__}") 

465 self.logger.log(f"Class: {self.calibration_class.name}, Start and Stop-Time " 

466 f"of simulation: {self.calibration_class.start_time}" 

467 f"-{self.calibration_class.stop_time} s\n Time-Intervals used" 

468 f" for objective: {self.calibration_class.relevant_intervals}") 

469 

470 # Setup the visualizer for plotting and logging: 

471 self.logger.calibrate_new_class(self.calibration_class, 

472 working_directory=self.working_directory_of_class, 

473 for_validation=False) 

474 self.logger.log_initial_names() 

475 

476 # Duration of Calibration 

477 t_cal_start = time.time() 

478 

479 # Run optimization 

480 try: 

481 _res = self.optimize( 

482 framework=framework, 

483 method=method, 

484 n_cpu=self.sim_api.n_cpu, 

485 **kwargs) 

486 except (MaxIterationsReached, MaxTimeReached) as err: 

487 self.logger.log(msg=str(err), level=logging.WARNING) 

488 t_cal_stop = time.time() 

489 t_cal = t_cal_stop - t_cal_start 

490 

491 # Check if optimization worked correctly 

492 if "Iterate" not in self._current_best_iterate: 

493 raise Exception( 

494 "Some error during calibration yielded no successful iteration. " 

495 "Can't save or return any results." 

496 ) 

497 

498 # %% Save the relevant results. 

499 self.logger.save_calibration_result(self._current_best_iterate, 

500 self.sim_api.model_name, 

501 duration=t_cal, 

502 itercount=self.recalibration_count) 

503 # Reset 

504 self._current_best_iterate['better_current_result'] = False 

505 

506 # Save calibrated parameter values in JSON 

507 parameter_values = self._get_parameter_dict_from_current_best_iterate() 

508 self.save_results(parameter_values=parameter_values, 

509 filename=self.calibration_class.name) 

510 return parameter_values 

511 

512 def _get_parameter_dict_from_current_best_iterate(self) -> dict: 

513 """ 

514 Get the parameter dictionary from the current best iterate. 

515 """ 

516 parameter_values = {} 

517 for p_name in self._current_best_iterate['Parameters'].index: 

518 parameter_values[p_name] = self._current_best_iterate['Parameters'][p_name] 

519 return parameter_values 

520 

521 @property 

522 def calibration_class(self) -> CalibrationClass: 

523 """Get the current calibration class""" 

524 return self._cal_class 

525 

526 @calibration_class.setter 

527 def calibration_class(self, calibration_class: CalibrationClass): 

528 """Set the current calibration class""" 

529 self.sim_api.set_sim_setup( 

530 {"start_time": self._apply_start_time_method(start_time=calibration_class.start_time), 

531 "stop_time": calibration_class.stop_time} 

532 ) 

533 self._cal_class = calibration_class 

534 

535 @property 

536 def tuner_paras(self) -> TunerParas: 

537 """Get the current tuner parameters of the calibration class""" 

538 return self.calibration_class.tuner_paras 

539 

540 @tuner_paras.setter 

541 def tuner_paras(self, tuner_paras: TunerParas): 

542 """Set the current tuner parameters of the calibration class""" 

543 self.calibration_class.tuner_paras = tuner_paras 

544 

545 @property 

546 def goals(self) -> Goals: 

547 """Get the current goals of the calibration class""" 

548 return self.calibration_class.goals 

549 

550 @goals.setter 

551 def goals(self, goals: Goals): 

552 """Set the current goals of the calibration class""" 

553 self.calibration_class.goals = goals 

554 

555 @property 

556 def fixed_parameters(self) -> dict: 

557 """Get the currently fixed parameters during calibration""" 

558 return self._fixed_pars 

559 

560 @fixed_parameters.setter 

561 def fixed_parameters(self, fixed_parameters: dict): 

562 """Set the currently fixed parameters during calibration""" 

563 self._fixed_pars = fixed_parameters 

564 

565 def save_results(self, parameter_values: dict, filename: str): 

566 """Saves the given dict into a file with path 

567 self.result_path and name filename.""" 

568 if self.result_path is not None: 

569 os.makedirs(self.result_path, exist_ok=True) 

570 s_path = os.path.join(self.result_path, f'{filename}.json') 

571 with open(s_path, 'w') as json_file: 

572 json.dump(parameter_values, json_file, indent=4) 

573 

574 def validate(self, validation_class: CalibrationClass, calibration_result: Dict, verbose=False): 

575 """ 

576 Validate the given calibration class based on the given 

577 values for tuner_parameters. 

578 

579 :param CalibrationClass validation_class: 

580 The class to validate on 

581 :param dict calibration_result: 

582 The calibration result to apply to the validation class on. 

583 """ 

584 # Start Validation: 

585 self.at_calibration = False 

586 self.logger.log(f"Start validation of model: {self.sim_api.model_name} with " 

587 f"framework-class {self.__class__.__name__}") 

588 # Use start-time of calibration class 

589 self.calibration_class = validation_class 

590 start_time = self._apply_start_time_method( 

591 start_time=self.calibration_class.start_time 

592 ) 

593 old_tuner_paras = copy(self.calibration_class.tuner_paras) 

594 tuner_values = list(calibration_result.values()) 

595 self.calibration_class.tuner_paras = TunerParas( 

596 names=list(calibration_result.keys()), 

597 initial_values=tuner_values, 

598 # Dummy bounds as they are scaled anyway 

599 bounds=[(val - 1, val + 1) for val in tuner_values] 

600 ) 

601 

602 # Set the start-time for the simulation 

603 self.sim_api.sim_setup.start_time = start_time 

604 

605 self.logger.calibrate_new_class(self.calibration_class, 

606 working_directory=self.working_directory_of_class, 

607 for_validation=True) 

608 

609 # Use the results parameter vector to simulate again. 

610 self._counter = 0 # Reset to one 

611 # Scale the tuner parameters 

612 xk = self.tuner_paras.scale(tuner_values) 

613 # Evaluate objective 

614 obj, unweighted_objective = self.obj(xk=xk, verbose=True) 

615 self.logger.validation_callback_func( 

616 obj=obj 

617 ) 

618 # Reset tuner_parameters to avoid unwanted behaviour 

619 self.calibration_class.tuner_paras = old_tuner_paras 

620 if verbose: 

621 weights = [1] 

622 objectives = [obj] 

623 goals = ['all'] 

624 for goal, val in unweighted_objective.items(): 

625 weights.append(val[0]) 

626 objectives.append(val[1]) 

627 goals.append(goal) 

628 index = pd.MultiIndex.from_product( 

629 [[validation_class.name], goals], 

630 names=['Class', 'Goal'] 

631 ) 

632 obj_verbos = pd.DataFrame( 

633 {'weight': weights, validation_class.goals.statistical_measure: objectives}, 

634 index=index 

635 ) 

636 return obj_verbos 

637 return obj 

638 

639 def _handle_error(self, error): 

640 """ 

641 Also save the plots if an error occurs. 

642 See ebcpy.optimization.Optimizer._handle_error for more info. 

643 """ 

644 # This error is our own, we handle it in the calibrate() function 

645 if isinstance(error, (MaxIterationsReached, MaxTimeReached)): 

646 raise error 

647 self.logger.save_calibration_result(best_iterate=self._current_best_iterate, 

648 model_name=self.sim_api.model_name, 

649 duration=0, 

650 itercount=0) 

651 super()._handle_error(error) 

652 

653 def get_penalty(self, current_tuners, current_tuners_scaled): 

654 """ 

655 Get penalty factor for evaluation of current objective. The penaltyfactor 

656 considers deviations of the tuner parameters in the objective function. 

657 First the relative deviation between the current best values 

658 of the tuner parameters from the recalibration steps and 

659 the tuner parameters obtained in the current iteration step is determined. 

660 Then the penaltyfactor is being increased according to the relative deviation. 

661 

662 :param pd.series current_tuner_values: 

663 To add 

664 :return: float penalty 

665 Penaltyfactor for evaluation. 

666 """ 

667 # TO-DO: Add possibility to consider the sensitivity of tuner parameters 

668 

669 # Get lists of tuner values (latest best (with all other tuners) & current values) 

670 previous = self.sim_api.all_tuners_dict 

671 previous_scaled = self.sim_api.all_tuners_dict_scaled 

672 # previous_scaled = list(self.sim_api.all_tuners_dict.keys()) 

673 current = current_tuners 

674 current_scaled = dict(current_tuners_scaled) 

675 

676 # Apply penalty function 

677 penalty = 1 

678 for key, value in current_scaled.items(): 

679 # Add corresponding function for penaltyfactor here 

680 if self.perform_square_deviation: 

681 # Apply quadratic deviation 

682 dev_square = (value - previous_scaled[key]) ** 2 

683 penalty += self.penalty_factor * dev_square 

684 else: 

685 # Apply relative deviation 

686 # Ingore tuner parameter whose current best value is 0 

687 if previous[key] == 0: 

688 continue 

689 # Get relative deviation of tuner values (reference: previous) 

690 try: 

691 dev = abs(current[key] - previous[key]) / abs(previous[key]) 

692 penalty += self.penalty_factor * dev 

693 except ZeroDivisionError: 

694 pass 

695 

696 return penalty 

697 

698 def _apply_start_time_method(self, start_time): 

699 """ 

700 Method to be calculate the start_time based on the used 

701 timedelta method. 

702 

703 :param float start_time: 

704 Start time which was specified by the user in the TOML file. 

705 :return float start_time - self.timedelta: 

706 Calculated "timedelta", if specified in the TOML file. 

707 """ 

708 return start_time - self.timedelta