Coverage for aixcalibuha/sensitivity_analysis/sensitivity_analyzer.py: 24%

422 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2026-04-20 14:06 +0000

1"""Package containing modules for sensitivity analysis. 

2The module contains the relevant base-classes.""" 

3import abc 

4import copy 

5import os 

6from pathlib import Path 

7import multiprocessing as mp 

8import warnings 

9from typing import List 

10from collections import Counter 

11import numpy as np 

12import pandas as pd 

13from ebcpy.utils import setup_logger 

14from ebcpy.utils.reproduction import CopyFile 

15from ebcpy.simulationapi import SimulationAPI 

16from ebcpy.data_types import TimeSeriesData 

17from aixcalibuha import CalibrationClass, data_types 

18from aixcalibuha.utils import validate_cal_class_input, convert_mat_to_suffix, empty_postprocessing 

19from aixcalibuha.sensitivity_analysis.plotting import plot_single, plot_time_dependent 

20 

21 

22def _load_single_file(_filepath, parquet_engine='pyarrow'): 

23 """Helper function""" 

24 if _filepath is None: 

25 return None 

26 return data_types.TimeSeriesData(_filepath, default_tag='sim', key='simulation', 

27 engine=parquet_engine) 

28 

29 

30def _load_files(_filepaths, parquet_engine='pyarrow'): 

31 """Helper function""" 

32 results = [] 

33 for _filepath in _filepaths: 

34 results.append(_load_single_file(_filepath, parquet_engine=parquet_engine)) 

35 return results 

36 

37 

38def _restruct_verbose(list_output_verbose): 

39 """Helper function""" 

40 output_verbose = {} 

41 for key, val in list_output_verbose[0].items(): 

42 output_verbose[key] = np.array([]) 

43 for i in list_output_verbose: 

44 for key, val in i.items(): 

45 output_verbose[key] = np.append(output_verbose[key], np.array([val[1]])) 

46 return output_verbose 

47 

48 

49def _concat_all_sims(sim_results_list): 

50 """Helper function that concat all results in a list to one DataFrame.""" 

51 sim_results_list = [r.to_df() for r in sim_results_list] 

52 sim_results_list = pd.concat(sim_results_list, keys=range(len(sim_results_list)), 

53 axis='columns') 

54 sim_results_list = sim_results_list.swaplevel(axis=1).sort_index(axis=1) 

55 return sim_results_list 

56 

57 

58def _restruct_time_dependent(sen_time_dependent_list, time_index): 

59 """Helper function that restructures the time dependent sensitivity results.""" 

60 

61 def _restruct_single(sen_time_dependent_list_s, second_order=False): 

62 sen_time_dependent_df = pd.concat(sen_time_dependent_list_s, keys=time_index, axis=0) 

63 sen_time_dependent_df = sen_time_dependent_df.droplevel('Class', axis='index') 

64 sen_time_dependent_df = sen_time_dependent_df.swaplevel(0, 1) 

65 sen_time_dependent_df = sen_time_dependent_df.swaplevel(1, 2).sort_index(axis=0) 

66 if second_order: 

67 sen_time_dependent_df = sen_time_dependent_df.swaplevel(2, 3).sort_index(axis=0) 

68 sen_time_dependent_df.index.set_names( 

69 ['Goal', 'Analysis variable', 'Interaction', 'time'], inplace=True) 

70 else: 

71 sen_time_dependent_df.index.set_names(['Goal', 'Analysis variable', 'time'], 

72 inplace=True) 

73 return sen_time_dependent_df 

74 

75 if isinstance(sen_time_dependent_list[0], tuple): 

76 sen_time_dependent_list1, sen_time_dependent_list2 = zip(*sen_time_dependent_list) 

77 return _restruct_single(sen_time_dependent_list1), _restruct_single( 

78 sen_time_dependent_list2, True) 

79 return _restruct_single(sen_time_dependent_list) 

80 

81 

82def _divide_chunks(long_list, chunk_length): 

83 """Helper function that divides all list into multiple list with a specific chunk length.""" 

84 for i in range(0, len(long_list), chunk_length): 

85 yield long_list[i:i + chunk_length] 

86 

87 

88class SenAnalyzer(abc.ABC): 

89 """ 

90 Class to perform a Sensitivity Analysis. 

91 

92 :param SimulationAPI sim_api: 

93 Simulation-API used to simulate the samples 

94 :param int num_samples: 

95 The parameter `N` to the sampler methods of sobol and morris. NOTE: This is not the 

96 number of samples produced, but relates to the total number of samples produced in 

97 a manner dependent on the sampler method used. See the documentation of the specific  

98 method in the SALib for more information. 

99 :keyword str,Path working_directory: 

100 The path for the current working directory. 

101 Logger and results will be stored here. 

102 :keyword boolean fail_on_error: 

103 Default is False. If True, the calibration will stop with an error if 

104 the simulation fails. See also: ``ret_val_on_error`` 

105 :keyword float,np.NAN ret_val_on_error: 

106 Default is np.NAN. If ``fail_on_error`` is false, you can specify here 

107 which value to return in the case of a failed simulation. Possible 

108 options are np.NaN, np.inf or some other high numbers. be aware that this 

109 max influence the solver. 

110 :keyword boolean save_files: 

111 Default False. If true, all simulation files for each iteration will be saved! 

112 :keyword str suffix_files: 

113 Default 'csv'. Specifies the data format to store the simulation files in. 

114 Options are 'csv' and 'parquet' to save only the goals. 

115 If you want to keep the original 'mat' file specify 'mat' here (not recommended due to high disk size usage). 

116 :keyword str parquet_engine: 

117 The engine to use for the data format parquet. 

118 Supported options can be extracted 

119 from the ebcpy.TimeSeriesData.save() function. 

120 Default is 'pyarrow'. 

121 :keyword str,Path savepath_sim: 

122 Default is working_directory. Own directory for the time series data sets of all simulations 

123 during the sensitivity analysis. The own dir can be necessary for large data sets, 

124 because they can crash IDE during indexing when they are in the project folder. 

125 

126 """ 

127 

128 def __init__(self, 

129 sim_api: SimulationAPI, 

130 num_samples: int, 

131 **kwargs): 

132 """Instantiate class parameters""" 

133 # Setup the instance attributes 

134 self.sim_api = sim_api 

135 self.num_samples = num_samples 

136 

137 # Update kwargs 

138 self.fail_on_error = kwargs.pop("fail_on_error", True) 

139 self.save_files = kwargs.pop("save_files", False) 

140 self.suffix_files = kwargs.pop('suffix_files', 'csv') 

141 self.parquet_engine = kwargs.pop('parquet_engine', 'pyarrow') 

142 self.ret_val_on_error = kwargs.pop("ret_val_on_error", np.NAN) 

143 self.working_directory = kwargs.pop("working_directory", os.getcwd()) 

144 

145 if "cd" in kwargs: 

146 warnings.warn( 

147 "cd was renamed to working_directory in all classes. " 

148 "Use working_directory instead.", 

149 category=DeprecationWarning) 

150 self.working_directory = kwargs.pop("cd") 

151 

152 self.savepath_sim = kwargs.pop('savepath_sim', self.working_directory) 

153 

154 if isinstance(self.working_directory, str): 

155 self.working_directory = Path(self.working_directory) 

156 if not self.working_directory.exists(): 

157 self.working_directory.mkdir(parents=True, exist_ok=True) 

158 

159 if isinstance(self.savepath_sim, str): 

160 self.savepath_sim = Path(self.savepath_sim) 

161 

162 # Setup the logger 

163 self.logger = setup_logger(working_directory=self.working_directory, 

164 name=self.__class__.__name__) 

165 

166 # Setup default values 

167 self.problem: dict = None 

168 self.reproduction_files = [] 

169 

170 @property 

171 @abc.abstractmethod 

172 def analysis_variables(self) -> List[str]: 

173 """ 

174 Indicate which variables are 

175 able to be selected for analysis 

176 

177 :return: 

178 A list of strings 

179 :rtype: List[str] 

180 """ 

181 raise NotImplementedError(f'{self.__class__.__name__}.analysis_variables ' 

182 f'property is not defined yet') 

183 

184 @abc.abstractmethod 

185 def analysis_function(self, x, y): 

186 """ 

187 Use the method to analyze the simulation results. 

188 

189 :param np.array x: 

190 the `X` parameter of the method (The NumPy matrix containing the model inputs) 

191 :param np.array y: 

192 The NumPy array containing the model outputs 

193 """ 

194 raise NotImplementedError(f'{self.__class__.__name__}.analysis_function ' 

195 f'function is not defined yet') 

196 

197 @abc.abstractmethod 

198 def create_sampler_demand(self) -> dict: 

199 """ 

200 Return the sampler parameters 

201 

202 :return: 

203 dict: A dict with the sampler demand 

204 """ 

205 raise NotImplementedError(f'{self.__class__.__name__}.analysis_function ' 

206 f'function is not defined yet') 

207 

208 @abc.abstractmethod 

209 def generate_samples(self): 

210 """ 

211 Run the sampler specified by `method` and return the results. 

212 

213 :return: 

214 The list of samples generated as a NumPy array with one row per sample 

215 and each row containing one value for each variable name in `problem['names']`. 

216 :rtype: np.ndarray 

217 """ 

218 raise NotImplementedError(f'{self.__class__.__name__}.generate_samples ' 

219 f'function is not defined yet') 

220 

221 def simulate_samples(self, cal_class, **kwargs): 

222 """ 

223 Creates the samples for the calibration class and simulates them. 

224 

225 :param cal_class: 

226 One class for calibration. Goals and tuner_paras have to be set 

227 :keyword scale: 

228 Default is False. If True the bounds of the tuner-parameters 

229 will be scaled between 0 and 1. 

230 

231 :return: 

232 Returns two lists. First a list with the simulation results for each sample. 

233 If save_files the list contains the filepaths to the results 

234 Second a list of the samples. 

235 :rtype: list 

236 """ 

237 scale = kwargs.pop('scale', False) 

238 # Set the output interval according the given Goals 

239 mean_freq = cal_class.goals.get_meas_frequency() 

240 self.logger.info("Setting output_interval of simulation according " 

241 "to measurement target data frequency: %s", mean_freq) 

242 self.sim_api.sim_setup.output_interval = mean_freq 

243 initial_names = cal_class.tuner_paras.get_names() 

244 self.sim_api.set_sim_setup({"start_time": cal_class.start_time, 

245 "stop_time": cal_class.stop_time}) 

246 self.sim_api.result_names = cal_class.goals.get_sim_var_names() 

247 

248 self.problem = self.create_problem(cal_class.tuner_paras, scale=scale) 

249 samples = self.generate_samples() 

250 

251 # creat df of samples with the result_file_names as the index 

252 result_file_names = [f"simulation_{idx}" for idx in range(len(samples))] 

253 samples_df = pd.DataFrame(samples, columns=initial_names, index=result_file_names) 

254 samples_df.to_csv(self.working_directory.joinpath(f'samples_{cal_class.name}.csv')) 

255 

256 # Simulate the current values 

257 parameters = [] 

258 for initial_values in samples: 

259 if scale: 

260 initial_values = cal_class.tuner_paras.descale(initial_values) 

261 parameters.append(dict(zip(initial_names, initial_values))) 

262 

263 self.logger.info('Starting %s parameter variations on %s cores', 

264 len(samples), self.sim_api.n_cpu) 

265 if self.save_files: 

266 sim_dir = self.savepath_sim.joinpath(f'simulations_{cal_class.name}') 

267 os.makedirs(sim_dir, exist_ok=True) 

268 samples_df.to_csv(self.savepath_sim.joinpath(f'samples_{cal_class.name}.csv')) 

269 self.logger.info(f'Saving simulation files in: {sim_dir}') 

270 if self.suffix_files == "mat": 

271 postprocess_mat_result = empty_postprocessing 

272 kwargs_postprocessing = {} 

273 else: 

274 postprocess_mat_result = convert_mat_to_suffix 

275 kwargs_postprocessing = { 

276 'variable_names': self.sim_api.result_names, 

277 'suffix_files': self.suffix_files, 

278 'parquet_engine': self.parquet_engine 

279 } 

280 if self.sim_api.__class__.__name__ == "DymolaAPI": 

281 cal_class.input_kwargs["postprocess_mat_result"] = postprocess_mat_result 

282 cal_class.input_kwargs["kwargs_postprocessing"] = kwargs_postprocessing 

283 _filepaths = self.sim_api.simulate( 

284 parameters=parameters, 

285 return_option="savepath", 

286 savepath=sim_dir, 

287 result_file_name=result_file_names, 

288 fail_on_error=self.fail_on_error, 

289 inputs=cal_class.inputs, 

290 **cal_class.input_kwargs 

291 ) 

292 self.reproduction_files.extend(_filepaths) 

293 results = _filepaths 

294 else: 

295 results = self.sim_api.simulate( 

296 parameters=parameters, 

297 inputs=cal_class.inputs, 

298 fail_on_error=self.fail_on_error, 

299 **cal_class.input_kwargs 

300 ) 

301 self.logger.info('Finished %s simulations', len(samples)) 

302 return results, samples 

303 

304 def _check_index(self, tsd: data_types.TimeSeriesData, sim_num=None): 

305 freq = tsd.frequency 

306 if sim_num is None: 

307 sim_num = tsd.filepath.name 

308 if freq[0] != self.sim_api.sim_setup.output_interval: 

309 self.logger.info( 

310 f'The mean value of the frequency from {sim_num} does not match output ' 

311 'interval index will be cleaned and spaced equally') 

312 tsd.to_datetime_index() 

313 tsd.clean_and_space_equally(f'{str(self.sim_api.sim_setup.output_interval * 1000)}ms') 

314 tsd.to_float_index() 

315 freq = tsd.frequency 

316 if freq[1] > 0.0: 

317 self.logger.info(f'The standard deviation of the frequency from {sim_num} is to high ' 

318 f'and will be rounded to the accuracy of the output interval') 

319 tsd.index = np.round(tsd.index.astype("float64"), 

320 str(self.sim_api.sim_setup.output_interval)[::-1].find('.')) 

321 return tsd 

322 

323 def _single_eval_statistical_measure(self, kwargs_eval): 

324 """Evaluates statistical measure of one result""" 

325 cal_class = kwargs_eval.pop('cal_class') 

326 result = kwargs_eval.pop('result') 

327 num_sim = kwargs_eval.pop('sim_num', None) 

328 if result is None: 

329 verbose_error = {} 

330 for goal, weight in zip(cal_class.goals.get_goals_list(), cal_class.goals.weightings): 

331 verbose_error[goal] = (weight, self.ret_val_on_error) 

332 return self.ret_val_on_error, verbose_error 

333 result = self._check_index(result, num_sim) 

334 cal_class.goals.set_sim_target_data(result) 

335 cal_class.goals.set_relevant_time_intervals(cal_class.relevant_intervals) 

336 # Evaluate the current objective 

337 total_res, verbose_calculation = cal_class.goals.eval_difference(verbose=True) 

338 return total_res, verbose_calculation 

339 

340 def eval_statistical_measure(self, cal_class, results, verbose=True): 

341 """Evaluates statistical measures of results on single core""" 

342 self.logger.info('Starting evaluation of statistical measure') 

343 output = [] 

344 list_output_verbose = [] 

345 for i, result in enumerate(results): 

346 total_res, verbose_calculation = self._single_eval_statistical_measure( 

347 {'cal_class': cal_class, 'result': result, 'sim_num': f'simulation_{i}'} 

348 ) 

349 output.append(total_res) 

350 list_output_verbose.append(verbose_calculation) 

351 if verbose: 

352 # restructure output_verbose 

353 output_verbose = _restruct_verbose(list_output_verbose) 

354 return np.asarray(output), output_verbose 

355 return np.asarray(output) 

356 

357 def _single_load_eval_file(self, kwargs_load_eval): 

358 """For multiprocessing""" 

359 filepath = kwargs_load_eval.pop('filepath') 

360 _result = _load_single_file(filepath, self.parquet_engine) 

361 kwargs_load_eval.update({'result': _result}) 

362 total_res, verbose_calculation = self._single_eval_statistical_measure(kwargs_load_eval) 

363 return total_res, verbose_calculation 

364 

365 def _mp_load_eval(self, _filepaths, cal_class, n_cpu): 

366 """ 

367 Loading and evaluating the statistical measure of saved simulation files on multiple cores 

368 """ 

369 self.logger.info(f'Load files and evaluate statistical measure on {n_cpu} processes.') 

370 kwargs_load_eval = [] 

371 for filepath in _filepaths: 

372 kwargs_load_eval.append({'filepath': filepath, 'cal_class': cal_class}) 

373 output_array = [] 

374 list_output_verbose = [] 

375 with mp.Pool(processes=n_cpu) as pool: 

376 for total, verbose in pool.imap(self._single_load_eval_file, kwargs_load_eval): 

377 output_array.append(total) 

378 list_output_verbose.append(verbose) 

379 output_array = np.asarray(output_array) 

380 output_verbose = _restruct_verbose(list_output_verbose) 

381 return output_array, output_verbose 

382 

383 def _load_eval(self, _filepaths, cal_class, n_cpu): 

384 """ 

385 Loading and evaluating the statistical measure of saved simulation files. 

386 Single- or multiprocessing possible with definition of n_cpu. 

387 """ 

388 if n_cpu == 1: 

389 results = _load_files(_filepaths, self.parquet_engine) 

390 output_array, output_verbose = self.eval_statistical_measure( 

391 cal_class=cal_class, 

392 results=results 

393 ) 

394 return output_array, output_verbose 

395 output_array, output_verbose = self._mp_load_eval(_filepaths, cal_class, n_cpu) 

396 return output_array, output_verbose 

397 

398 def run(self, calibration_classes, merge_multiple_classes=True, **kwargs): 

399 """ 

400 Execute the sensitivity analysis for each class and 

401 return the result. 

402 

403 :param CalibrationClass,list calibration_classes: 

404 Either one or multiple classes for calibration with same tuner-parameters. 

405 :param bool merge_multiple_classes: 

406 Default True. If False, the given list of calibration-classes 

407 is handled as-is. This means if you pass two CalibrationClass objects 

408 with the same name (e.g. "device on"), the calibration process will run 

409 for both these classes stand-alone. 

410 This will automatically yield an intersection of tuner-parameters, however may 

411 have advantages in some cases. 

412 :keyword bool verbose: 

413 Default False. If True, in addition to the combined Goals of the Classes 

414 (saved under index Goal: all), the sensitivity measures of the individual 

415 Goals will also be calculated and returned. 

416 :keyword scale: 

417 Default is False. If True the bounds of the tuner-parameters 

418 will be scaled between 0 and 1. 

419 :keyword bool use_fist_sim: 

420 Default False. If True, the simulations of the first calibration class will be used for 

421 all other calibration classes with their relevant time intervals. 

422 The simulations must be stored on a hard-drive, so it must be used with 

423 either save_files or load_files. 

424 :keyword int n_cpu: 

425 Default is 1. The number of processes to use for the evaluation of the statistical 

426 measure. For n_cpu > 1 only one simulation file is loaded at once in a process and 

427 dumped directly after the evaluation of the statistical measure, 

428 so that only minimal memory is used. 

429 Use this option for large analyses. 

430 Only implemented for save_files=True or load_sim_files=True. 

431 :keyword bool load_sim_files: 

432 Default False. If True, no new simulations are done and old simulations are loaded. 

433 The simulations and corresponding samples will be loaded from self.savepath_sim like 

434 they were saved from self.save_files. Currently, the name of the sim folder must be 

435 "simulations_CAL_CLASS_NAME" and for the samples "samples_CAL_CLASS_NAME". 

436 The usage of the same simulations for different 

437 calibration classes is not supported yet. 

438 :keyword bool save_results: 

439 Default True. If True, all results are saved as a csv in working_directory. 

440 (samples, statistical measures and analysis variables). 

441 :keyword bool plot_result: 

442 Default True. If True, the results will be plotted. 

443 :return: 

444 Returns a pandas.DataFrame. The DataFrame has a Multiindex with the 

445 levels Class, Goal and Analysis variable. The Goal name of combined goals is 'all'. 

446 The variables are the tuner-parameters. 

447 For the Sobol Method and calc_second_order returns a tuple of DataFrames (df_1, df_2) 

448 where df_2 contains the second oder analysis variables and has an extra index level 

449 Interaction, which also contains the variables. 

450 :rtype: pandas.DataFrame 

451 """ 

452 verbose = kwargs.pop('verbose', False) 

453 scale = kwargs.pop('scale', False) 

454 use_first_sim = kwargs.pop('use_first_sim', False) 

455 n_cpu = kwargs.pop('n_cpu', 1) 

456 save_results = kwargs.pop('save_results', True) 

457 plot_result = kwargs.pop('plot_result', True) 

458 load_sim_files = kwargs.pop('load_sim_files', False) 

459 # Check correct input 

460 calibration_classes = validate_cal_class_input(calibration_classes) 

461 # Merge the classes for avoiding possible intersection of tuner-parameters 

462 if merge_multiple_classes: 

463 calibration_classes = data_types.merge_calibration_classes(calibration_classes) 

464 

465 # Check n_cpu 

466 if n_cpu > mp.cpu_count(): 

467 raise ValueError(f"Given n_cpu '{n_cpu}' is greater " 

468 "than the available number of " 

469 f"cpus on your machine '{mp.cpu_count()}'") 

470 

471 # Check if the usage of the simulations from the first calibration class for all is possible 

472 if use_first_sim: 

473 if not self.save_files and not load_sim_files: 

474 raise AttributeError('To use the simulations of the first calibration class ' 

475 'for all classes the simulation files must be saved. ' 

476 'Either set save_files=True or load already exiting files ' 

477 'with load_sim_files=True.') 

478 start_time = 0 

479 stop_time = 0 

480 for idx, cal_class in enumerate(calibration_classes): 

481 if idx == 0: 

482 start_time = cal_class.start_time 

483 stop_time = cal_class.stop_time 

484 continue 

485 if start_time > cal_class.start_time or stop_time < cal_class.stop_time: 

486 raise ValueError(f'To use the simulations of the first calibration class ' 

487 f'for all classes the start and stop times of the other ' 

488 f'classes must be in the interval [{start_time}, {stop_time}] ' 

489 f'of the first calibration class.') 

490 

491 all_results = [] 

492 for idx, cal_class in enumerate(calibration_classes): 

493 

494 self.logger.info('Start sensitivity analysis of class: %s, ' 

495 'Time-Interval: %s-%s s', cal_class.name, 

496 cal_class.start_time, cal_class.stop_time) 

497 

498 # Generate list with metrics of every parameter variation 

499 results_goals = {} 

500 if load_sim_files: 

501 self.problem = self.create_problem(cal_class.tuner_paras, scale=scale) 

502 if use_first_sim: 

503 class_name = calibration_classes[0].name 

504 else: 

505 class_name = cal_class.name 

506 sim_dir = self.savepath_sim.joinpath(f'simulations_{class_name}') 

507 samples_path = self.savepath_sim.joinpath(f'samples_{class_name}.csv') 

508 self.logger.info(f'Loading samples from {samples_path}') 

509 samples = pd.read_csv(samples_path, 

510 header=0, 

511 index_col=0) 

512 samples = samples.to_numpy() 

513 result_file_names = [f"simulation_{idx}.{self.suffix_files}" for idx in 

514 range(len(samples))] 

515 _filepaths = [sim_dir.joinpath(result_file_name) for result_file_name in 

516 result_file_names] 

517 self.logger.info(f'Loading simulation files from {sim_dir}') 

518 output_array, output_verbose = self._load_eval(_filepaths, cal_class, n_cpu) 

519 else: 

520 results, samples = self.simulate_samples( 

521 cal_class=cal_class, 

522 scale=scale 

523 ) 

524 if self.save_files: 

525 output_array, output_verbose = self._load_eval(results, cal_class, n_cpu) 

526 else: 

527 output_array, output_verbose = self.eval_statistical_measure( 

528 cal_class=cal_class, 

529 results=results 

530 ) 

531 if use_first_sim: 

532 load_sim_files = True 

533 

534 # combine output_array and output_verbose 

535 # set key for output_array depending on one or multiple goals 

536 stat_mea = {'all': output_array} 

537 if len(output_verbose) == 1: 

538 stat_mea = output_verbose 

539 if len(output_verbose) > 1 and verbose: 

540 stat_mea.update(output_verbose) 

541 

542 # save statistical measure and corresponding samples for each cal_class in working_directory 

543 if save_results: 

544 result_file_names = [f"simulation_{idx}" for idx in range(len(output_array))] 

545 stat_mea_df = pd.DataFrame(stat_mea, index=result_file_names) 

546 savepath_stat_mea = self.working_directory.joinpath( 

547 f'{cal_class.goals.statistical_measure}_{cal_class.name}.csv') 

548 stat_mea_df.to_csv(savepath_stat_mea) 

549 self.reproduction_files.append(savepath_stat_mea) 

550 samples_df = pd.DataFrame(samples, columns=cal_class.tuner_paras.get_names(), 

551 index=result_file_names) 

552 savepath_samples = self.working_directory.joinpath(f'samples_{cal_class.name}.csv') 

553 samples_df.to_csv(savepath_samples) 

554 self.reproduction_files.append(savepath_samples) 

555 

556 self.logger.info('Starting calculation of analysis variables') 

557 for key, val in stat_mea.items(): 

558 result_goal = self.analysis_function( 

559 x=samples, 

560 y=val 

561 ) 

562 results_goals[key] = result_goal 

563 all_results.append(results_goals) 

564 self.logger.info('Finished sensitivity analysis of class: %s, ' 

565 'Time-Interval: %s-%s s', cal_class.name, 

566 cal_class.start_time, cal_class.stop_time) 

567 result = self._conv_local_results(results=all_results, 

568 local_classes=calibration_classes) 

569 if save_results: 

570 self._save(result) 

571 if plot_result: 

572 self.plot(result) 

573 return result, calibration_classes 

574 

575 def _save(self, result: pd.DataFrame, time_dependent: bool = False): 

576 """ 

577 Saves the result DataFrame of run and run_time_dependent. 

578 Needs to be overwritten for Sobol results. 

579 """ 

580 if time_dependent: 

581 savepath_result = self.working_directory.joinpath( 

582 f'{self.__class__.__name__}_results_time.csv') 

583 else: 

584 savepath_result = self.working_directory.joinpath( 

585 f'{self.__class__.__name__}_results.csv') 

586 result.to_csv(savepath_result) 

587 self.reproduction_files.append(savepath_result) 

588 

589 @staticmethod 

590 def create_problem(tuner_paras, scale=False) -> dict: 

591 """Create function for later access if multiple calibration-classes are used.""" 

592 num_vars = len(tuner_paras.get_names()) 

593 bounds = np.array(tuner_paras.get_bounds()) 

594 if scale: 

595 bounds = [np.zeros_like(bounds[0]), np.ones_like(bounds[1])] 

596 problem = {'num_vars': num_vars, 

597 'names': tuner_paras.get_names(), 

598 'bounds': np.transpose(bounds)} 

599 return problem 

600 

601 @staticmethod 

602 def select_by_threshold(calibration_classes, result, analysis_variable, threshold): 

603 """ 

604 Automatically select sensitive tuner parameters based on a given threshold 

605 of a given analysis variable from a sensitivity result. 

606 Uses only the combined goals. 

607 

608 :param list calibration_classes: 

609 List of aixcalibuha.data_types.CalibrationClass objects that you want to 

610 automatically select sensitive tuner-parameters. 

611 :param pd.DataFrame result: 

612 Result object of sensitivity analysis run 

613 :param str analysis_variable: 

614 Analysis variable to use for the selection 

615 :param float threshold: 

616 Minimal required value of given key 

617 :return: list calibration_classes 

618 """ 

619 for cal_class in calibration_classes: 

620 first_goal = result.index.get_level_values(1)[0] 

621 class_result = result.loc[cal_class.name, first_goal, analysis_variable] 

622 tuner_paras = copy.deepcopy(cal_class.tuner_paras) 

623 select_names = class_result[class_result < threshold].index.values 

624 tuner_paras.remove_names(select_names) 

625 if not tuner_paras.get_names(): 

626 raise ValueError( 

627 'Automatic selection removed all tuner parameter ' 

628 f'from class {cal_class.name} after ' 

629 'SensitivityAnalysis was done. Please adjust the ' 

630 'threshold in json or manually chose tuner ' 

631 'parameters for the calibration.') 

632 # cal_class.set_tuner_paras(tuner_paras) 

633 cal_class.tuner_paras = tuner_paras 

634 return calibration_classes 

635 

636 @staticmethod 

637 def select_by_threshold_verbose(calibration_class: CalibrationClass, 

638 result: pd.DataFrame, 

639 analysis_variable: str, 

640 threshold: float, 

641 calc_names_for_selection: List[str] = None): 

642 """ 

643 Select tuner-parameters of single calibration class with verbose sensitivity results. 

644 This function selects tuner-parameters if their sensitivity is equal or greater 

645 than the threshold in just one target value of one calibration class in the 

646 sensitivity result. This can be more robust because a small sensitivity in one target 

647 value and state of the system can mean that the parameter can also be calibrated in 

648 a global calibration class which calibrates multiple states and target values at 

649 the same time and has there not directly the same sensitivity as in the isolated 

650 view of a calibration class for only one state. 

651 

652 :param CalibrationClass calibration_class: 

653 The calibration class from which the tuner parameters will be selected. 

654 :param pd.DataFrame result: 

655 Sensitivity results to use for the selection. Can include multiple classes. 

656 :param str analysis_variable: 

657 The analysis variable to use for the selection. 

658 :param float threshold: 

659 Minimal required value of given analysis variable. 

660 :param List[str] calc_names_for_selection: 

661 Specifies which calibration classes in the sensitivity results will be used for 

662 the selection. Default are all classes. 

663 """ 

664 if Counter(calibration_class.tuner_paras.get_names()) != Counter(list(result.columns)): 

665 raise NameError("The tuner-parameter of the calibration class do not " 

666 "match the tuner-parameters in the sensitivity result." 

667 "They have to match.") 

668 

669 result = result.loc[:, :, analysis_variable] 

670 calc_names_results = result.index.get_level_values("Class").unique() 

671 if calc_names_for_selection: 

672 for cal_class in calc_names_for_selection: 

673 if cal_class not in calc_names_results: 

674 raise NameError(f"The calibration class name {cal_class} " 

675 f"does not match any class name " 

676 f"in the given sensitivity result.") 

677 result = result.loc[calc_names_for_selection, :, :] 

678 

679 selected_tuners = (result >= threshold).any() 

680 

681 remove_tuners = [] 

682 for tuner, selected in selected_tuners.items(): 

683 if not selected: 

684 remove_tuners.append(tuner) 

685 tuner_paras = copy.deepcopy(calibration_class.tuner_paras) 

686 tuner_paras.remove_names(remove_tuners) 

687 if not tuner_paras.get_names(): 

688 raise ValueError("Threshold to small. All tuner-parameters would be removed.") 

689 calibration_class.tuner_paras = tuner_paras 

690 return calibration_class 

691 

692 def run_time_dependent(self, cal_class: CalibrationClass, **kwargs): 

693 """ 

694 Calculate the time dependent sensitivity for all the single goals in the calibration class. 

695 

696 :param CalibrationClass cal_class: 

697 Calibration class with tuner-parameters to calculate sensitivity for. 

698 Can include dummy target date. 

699 :keyword scale: 

700 Default is False. If True the bounds of the tuner-parameters 

701 will be scaled between 0 and 1. 

702 :keyword bool load_sim_files: 

703 Default False. If True, no new simulations are done and old simulations are loaded. 

704 The simulations and corresponding samples will be loaded from self.savepath_sim like 

705 they were saved from self.save_files. Currently, the name of the sim folder must be 

706 "simulations_CAL_CLASS_NAME" and for the samples "samples_CAL_CLASS_NAME". 

707 :keyword bool save_results: 

708 Default True. If True, all results are saved as a csv in working_directory. 

709 (samples and analysis variables). 

710 :keyword int n_steps: 

711 Default is all time steps. If the problem is large, the evaluation of all time steps 

712 at once can cause a memory error. Then n_steps defines how many time_steps 

713 are evaluated at once in chunks. This increases the needed time exponentially and 

714 the simulation files must be saved. 

715 :keyword bool plot_result: 

716 Default True. If True, the results will be plotted. 

717 :return: 

718 Returns a pandas.DataFrame. 

719 :rtype: pandas.DataFrame 

720 """ 

721 scale = kwargs.pop('scale', False) 

722 save_results = kwargs.pop('save_results', True) 

723 plot_result = kwargs.pop('plot_result', True) 

724 load_sim_files = kwargs.pop('load_sim_files', False) 

725 n_steps = kwargs.pop('n_steps', 'all') 

726 

727 self.logger.info("Start time dependent sensitivity analysis.") 

728 if load_sim_files: 

729 self.problem = self.create_problem(cal_class.tuner_paras, scale=scale) 

730 sim_dir = self.savepath_sim.joinpath(f'simulations_{cal_class.name}') 

731 samples_path = self.savepath_sim.joinpath(f'samples_{cal_class.name}.csv') 

732 samples = pd.read_csv(samples_path, 

733 header=0, 

734 index_col=0) 

735 samples = samples.to_numpy() 

736 result_file_names = [f"simulation_{idx}.{self.suffix_files}" for idx in 

737 range(len(samples))] 

738 _filepaths = [sim_dir.joinpath(result_file_name) for result_file_name in 

739 result_file_names] 

740 

741 sen_time_dependent_list, time_index = self._load_analyze_tsteps(_filepaths=_filepaths, 

742 samples=samples, 

743 n_steps=n_steps, 

744 cal_class=cal_class) 

745 sen_time_dependent_df = _restruct_time_dependent(sen_time_dependent_list, time_index) 

746 else: 

747 results, samples = self.simulate_samples( 

748 cal_class=cal_class, 

749 scale=scale 

750 ) 

751 if self.save_files: 

752 sen_time_dependent_list, time_index = self._load_analyze_tsteps(_filepaths=results, 

753 samples=samples, 

754 n_steps=n_steps, 

755 cal_class=cal_class) 

756 sen_time_dependent_df = _restruct_time_dependent(sen_time_dependent_list, 

757 time_index) 

758 else: 

759 variables = results[0].get_variable_names() 

760 time_index = results[0].index.to_numpy() 

761 total_result = _concat_all_sims(results) 

762 sen_time_dependent_list = [] 

763 for time_step in time_index: 

764 result_df_tstep = self._analyze_tstep_df(time_step=time_step, 

765 tsteps_sim_results=total_result, 

766 variables=variables, 

767 samples=samples, 

768 cal_class=cal_class) 

769 sen_time_dependent_list.append(result_df_tstep) 

770 sen_time_dependent_df = _restruct_time_dependent(sen_time_dependent_list, 

771 time_index) 

772 self.logger.info("Finished time dependent sensitivity analysys.") 

773 if save_results: 

774 self._save(sen_time_dependent_df, time_dependent=True) 

775 if plot_result: 

776 if isinstance(sen_time_dependent_df, pd.DataFrame): 

777 plot_time_dependent(sen_time_dependent_df) 

778 else: 

779 plot_time_dependent(sen_time_dependent_df[0]) 

780 return sen_time_dependent_df 

781 

782 def _analyze_tstep_df(self, time_step, tsteps_sim_results, variables, samples, cal_class): 

783 """Analyze the sensitivity at a single time step.""" 

784 result_dict_tstep = {} 

785 for var in variables: 

786 result_tstep_var = tsteps_sim_results[var].loc[time_step].to_numpy() 

787 if np.all(result_tstep_var == result_tstep_var[0]): 

788 sen_tstep_var = None 

789 else: 

790 sen_tstep_var = self.analysis_function( 

791 x=samples, 

792 y=result_tstep_var 

793 ) 

794 result_dict_tstep[var] = sen_tstep_var 

795 result_df_tstep = self._conv_local_results(results=[result_dict_tstep], 

796 local_classes=[cal_class]) 

797 return result_df_tstep 

798 

799 def _load_tsteps_df(self, tsteps, _filepaths): 

800 """ 

801 Load all simulations and extract and concat the sim results of the time steps in tsteps. 

802 """ 

803 self.logger.info( 

804 f"Loading time steps from {tsteps[0]} to {tsteps[-1]} of the simulation files.") 

805 tsteps_sim_results = [] 

806 for _filepath in _filepaths: 

807 sim = _load_single_file(_filepath) 

808 tsteps_sim_results.append(sim.loc[tsteps[0]:tsteps[-1]]) 

809 tsteps_sim_results = _concat_all_sims(tsteps_sim_results) 

810 return tsteps_sim_results 

811 

812 def _load_analyze_tsteps(self, _filepaths, samples, n_steps, cal_class): 

813 """ 

814 Load and analyze all time steps in chunks with n_steps time steps. 

815 """ 

816 sim1 = _load_single_file(_filepaths[0]) 

817 time_index = sim1.index.to_numpy() 

818 variables = sim1.get_variable_names() 

819 sen_time_dependent_list = [] 

820 if n_steps == 'all': 

821 list_tsteps = [time_index] 

822 elif isinstance(n_steps, int) and not (n_steps <= 0 or n_steps > len(time_index)): 

823 list_tsteps = _divide_chunks(time_index, n_steps) 

824 else: 

825 raise ValueError( 

826 f"n_steps can only be between 1 and {len(time_index)} or the string all.") 

827 

828 for tsteps in list_tsteps: 

829 tsteps_sim_results = self._load_tsteps_df(tsteps=tsteps, _filepaths=_filepaths) 

830 self.logger.info("Analyzing these time steps.") 

831 for tstep in tsteps: 

832 result_df_tstep = self._analyze_tstep_df(time_step=tstep, 

833 tsteps_sim_results=tsteps_sim_results, 

834 variables=variables, 

835 samples=samples, 

836 cal_class=cal_class) 

837 sen_time_dependent_list.append(result_df_tstep) 

838 return sen_time_dependent_list, time_index 

839 

840 def _conv_global_result(self, result: dict, cal_class: CalibrationClass, 

841 analysis_variable: str): 

842 glo_res_dict = self._get_res_dict(result=result, cal_class=cal_class, 

843 analysis_variable=analysis_variable) 

844 return pd.DataFrame(glo_res_dict, index=['global']) 

845 

846 def _conv_local_results(self, results: list, local_classes: list): 

847 """ 

848 Convert the result dictionaries form SALib of each class and goal into one DataFrame. 

849 Overwritten for Sobol. 

850 """ 

851 _conv_results = [] 

852 tuples = [] 

853 for class_results, local_class in zip(results, local_classes): 

854 for goal, goal_results in class_results.items(): 

855 for analysis_var in self.analysis_variables: 

856 _conv_results.append(self._get_res_dict(result=goal_results, 

857 cal_class=local_class, 

858 analysis_variable=analysis_var)) 

859 tuples.append((local_class.name, goal, analysis_var)) 

860 index = pd.MultiIndex.from_tuples(tuples=tuples, 

861 names=['Class', 'Goal', 'Analysis variable']) 

862 df = pd.DataFrame(_conv_results, index=index) 

863 return df 

864 

865 @abc.abstractmethod 

866 def _get_res_dict(self, result: dict, cal_class: CalibrationClass, analysis_variable: str): 

867 """ 

868 Convert the result object to a dict with the key 

869 being the variable name and the value being the result 

870 associated to analysis_variable. 

871 """ 

872 raise NotImplementedError 

873 

874 def plot(self, result): 

875 """ 

876 Plot the results of the sensitivity analysis method from run(). 

877 

878 :param pd.DataFrame result: 

879 Dataframe of the results like from the run() function. 

880 :return tuple of matplotlib objects (fig, ax): 

881 """ 

882 plot_single(result=result) 

883 

884 @staticmethod 

885 def load_from_csv(path): 

886 """ 

887 Load sensitivity results which were saved with the run() or run_time_dependent() function. 

888 

889 For second order results use the load_second_order_from_csv() function of the SobolAnalyzer. 

890 """ 

891 result = pd.read_csv(path, index_col=[0, 1, 2]) 

892 return result 

893 

894 def save_for_reproduction(self, 

895 title: str, 

896 path: Path = None, 

897 files: list = None, 

898 exclude_sim_files: bool = False, 

899 remove_saved_files: bool = False, 

900 **kwargs): 

901 """ 

902 Save the settings of the SenAnalyzer and SimApi in order to 

903 reproduce the simulations and sensitivity analysis method. 

904 All saved results will be also saved in the reproduction 

905 archive. The simulations can be excluded from saving. 

906 

907 :param str title: 

908 Title of the study 

909 :param Path path: 

910 Where to store the .zip file. If not given, self.working_directory is used. 

911 :param list files: 

912 List of files to save along the standard ones. 

913 Examples would be plots, tables etc. 

914 :param bool exclude_sim_files: 

915 Default False. If True, the simulation files will not be saved in 

916 the reproduction archive. 

917 :param bool remove_saved_files: 

918 Default False. If True, the result and simulation files will be moved 

919 instead of just copied. 

920 :param dict kwargs: 

921 All keyword arguments except title, files, and path of the function 

922 `save_reproduction_archive`. Most importantly, `log_message` may be 

923 specified to avoid input during execution. 

924 """ 

925 if files is None: 

926 files = [] 

927 

928 for file_path in self.reproduction_files: 

929 if exclude_sim_files: 

930 if 'simulation' in str(file_path): 

931 continue 

932 filename = "SenAnalyzer" + \ 

933 str(file_path).rsplit(self.working_directory.name, maxsplit=1)[-1] 

934 files.append(CopyFile( 

935 sourcepath=file_path, 

936 filename=filename, 

937 remove=remove_saved_files 

938 )) 

939 

940 return self.sim_api.save_for_reproduction( 

941 title=title, 

942 path=path, 

943 files=files, 

944 **kwargs 

945 )