Coverage for aixcalibuha/sensitivity_analysis/sensitivity_analyzer.py: 24%
422 statements
« prev ^ index » next coverage.py v7.4.4, created at 2026-04-20 14:06 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2026-04-20 14:06 +0000
1"""Package containing modules for sensitivity analysis.
2The module contains the relevant base-classes."""
3import abc
4import copy
5import os
6from pathlib import Path
7import multiprocessing as mp
8import warnings
9from typing import List
10from collections import Counter
11import numpy as np
12import pandas as pd
13from ebcpy.utils import setup_logger
14from ebcpy.utils.reproduction import CopyFile
15from ebcpy.simulationapi import SimulationAPI
16from ebcpy.data_types import TimeSeriesData
17from aixcalibuha import CalibrationClass, data_types
18from aixcalibuha.utils import validate_cal_class_input, convert_mat_to_suffix, empty_postprocessing
19from aixcalibuha.sensitivity_analysis.plotting import plot_single, plot_time_dependent
22def _load_single_file(_filepath, parquet_engine='pyarrow'):
23 """Helper function"""
24 if _filepath is None:
25 return None
26 return data_types.TimeSeriesData(_filepath, default_tag='sim', key='simulation',
27 engine=parquet_engine)
30def _load_files(_filepaths, parquet_engine='pyarrow'):
31 """Helper function"""
32 results = []
33 for _filepath in _filepaths:
34 results.append(_load_single_file(_filepath, parquet_engine=parquet_engine))
35 return results
38def _restruct_verbose(list_output_verbose):
39 """Helper function"""
40 output_verbose = {}
41 for key, val in list_output_verbose[0].items():
42 output_verbose[key] = np.array([])
43 for i in list_output_verbose:
44 for key, val in i.items():
45 output_verbose[key] = np.append(output_verbose[key], np.array([val[1]]))
46 return output_verbose
49def _concat_all_sims(sim_results_list):
50 """Helper function that concat all results in a list to one DataFrame."""
51 sim_results_list = [r.to_df() for r in sim_results_list]
52 sim_results_list = pd.concat(sim_results_list, keys=range(len(sim_results_list)),
53 axis='columns')
54 sim_results_list = sim_results_list.swaplevel(axis=1).sort_index(axis=1)
55 return sim_results_list
58def _restruct_time_dependent(sen_time_dependent_list, time_index):
59 """Helper function that restructures the time dependent sensitivity results."""
61 def _restruct_single(sen_time_dependent_list_s, second_order=False):
62 sen_time_dependent_df = pd.concat(sen_time_dependent_list_s, keys=time_index, axis=0)
63 sen_time_dependent_df = sen_time_dependent_df.droplevel('Class', axis='index')
64 sen_time_dependent_df = sen_time_dependent_df.swaplevel(0, 1)
65 sen_time_dependent_df = sen_time_dependent_df.swaplevel(1, 2).sort_index(axis=0)
66 if second_order:
67 sen_time_dependent_df = sen_time_dependent_df.swaplevel(2, 3).sort_index(axis=0)
68 sen_time_dependent_df.index.set_names(
69 ['Goal', 'Analysis variable', 'Interaction', 'time'], inplace=True)
70 else:
71 sen_time_dependent_df.index.set_names(['Goal', 'Analysis variable', 'time'],
72 inplace=True)
73 return sen_time_dependent_df
75 if isinstance(sen_time_dependent_list[0], tuple):
76 sen_time_dependent_list1, sen_time_dependent_list2 = zip(*sen_time_dependent_list)
77 return _restruct_single(sen_time_dependent_list1), _restruct_single(
78 sen_time_dependent_list2, True)
79 return _restruct_single(sen_time_dependent_list)
82def _divide_chunks(long_list, chunk_length):
83 """Helper function that divides all list into multiple list with a specific chunk length."""
84 for i in range(0, len(long_list), chunk_length):
85 yield long_list[i:i + chunk_length]
88class SenAnalyzer(abc.ABC):
89 """
90 Class to perform a Sensitivity Analysis.
92 :param SimulationAPI sim_api:
93 Simulation-API used to simulate the samples
94 :param int num_samples:
95 The parameter `N` to the sampler methods of sobol and morris. NOTE: This is not the
96 number of samples produced, but relates to the total number of samples produced in
97 a manner dependent on the sampler method used. See the documentation of the specific
98 method in the SALib for more information.
99 :keyword str,Path working_directory:
100 The path for the current working directory.
101 Logger and results will be stored here.
102 :keyword boolean fail_on_error:
103 Default is False. If True, the calibration will stop with an error if
104 the simulation fails. See also: ``ret_val_on_error``
105 :keyword float,np.NAN ret_val_on_error:
106 Default is np.NAN. If ``fail_on_error`` is false, you can specify here
107 which value to return in the case of a failed simulation. Possible
108 options are np.NaN, np.inf or some other high numbers. be aware that this
109 max influence the solver.
110 :keyword boolean save_files:
111 Default False. If true, all simulation files for each iteration will be saved!
112 :keyword str suffix_files:
113 Default 'csv'. Specifies the data format to store the simulation files in.
114 Options are 'csv' and 'parquet' to save only the goals.
115 If you want to keep the original 'mat' file specify 'mat' here (not recommended due to high disk size usage).
116 :keyword str parquet_engine:
117 The engine to use for the data format parquet.
118 Supported options can be extracted
119 from the ebcpy.TimeSeriesData.save() function.
120 Default is 'pyarrow'.
121 :keyword str,Path savepath_sim:
122 Default is working_directory. Own directory for the time series data sets of all simulations
123 during the sensitivity analysis. The own dir can be necessary for large data sets,
124 because they can crash IDE during indexing when they are in the project folder.
126 """
128 def __init__(self,
129 sim_api: SimulationAPI,
130 num_samples: int,
131 **kwargs):
132 """Instantiate class parameters"""
133 # Setup the instance attributes
134 self.sim_api = sim_api
135 self.num_samples = num_samples
137 # Update kwargs
138 self.fail_on_error = kwargs.pop("fail_on_error", True)
139 self.save_files = kwargs.pop("save_files", False)
140 self.suffix_files = kwargs.pop('suffix_files', 'csv')
141 self.parquet_engine = kwargs.pop('parquet_engine', 'pyarrow')
142 self.ret_val_on_error = kwargs.pop("ret_val_on_error", np.NAN)
143 self.working_directory = kwargs.pop("working_directory", os.getcwd())
145 if "cd" in kwargs:
146 warnings.warn(
147 "cd was renamed to working_directory in all classes. "
148 "Use working_directory instead.",
149 category=DeprecationWarning)
150 self.working_directory = kwargs.pop("cd")
152 self.savepath_sim = kwargs.pop('savepath_sim', self.working_directory)
154 if isinstance(self.working_directory, str):
155 self.working_directory = Path(self.working_directory)
156 if not self.working_directory.exists():
157 self.working_directory.mkdir(parents=True, exist_ok=True)
159 if isinstance(self.savepath_sim, str):
160 self.savepath_sim = Path(self.savepath_sim)
162 # Setup the logger
163 self.logger = setup_logger(working_directory=self.working_directory,
164 name=self.__class__.__name__)
166 # Setup default values
167 self.problem: dict = None
168 self.reproduction_files = []
170 @property
171 @abc.abstractmethod
172 def analysis_variables(self) -> List[str]:
173 """
174 Indicate which variables are
175 able to be selected for analysis
177 :return:
178 A list of strings
179 :rtype: List[str]
180 """
181 raise NotImplementedError(f'{self.__class__.__name__}.analysis_variables '
182 f'property is not defined yet')
184 @abc.abstractmethod
185 def analysis_function(self, x, y):
186 """
187 Use the method to analyze the simulation results.
189 :param np.array x:
190 the `X` parameter of the method (The NumPy matrix containing the model inputs)
191 :param np.array y:
192 The NumPy array containing the model outputs
193 """
194 raise NotImplementedError(f'{self.__class__.__name__}.analysis_function '
195 f'function is not defined yet')
197 @abc.abstractmethod
198 def create_sampler_demand(self) -> dict:
199 """
200 Return the sampler parameters
202 :return:
203 dict: A dict with the sampler demand
204 """
205 raise NotImplementedError(f'{self.__class__.__name__}.analysis_function '
206 f'function is not defined yet')
208 @abc.abstractmethod
209 def generate_samples(self):
210 """
211 Run the sampler specified by `method` and return the results.
213 :return:
214 The list of samples generated as a NumPy array with one row per sample
215 and each row containing one value for each variable name in `problem['names']`.
216 :rtype: np.ndarray
217 """
218 raise NotImplementedError(f'{self.__class__.__name__}.generate_samples '
219 f'function is not defined yet')
221 def simulate_samples(self, cal_class, **kwargs):
222 """
223 Creates the samples for the calibration class and simulates them.
225 :param cal_class:
226 One class for calibration. Goals and tuner_paras have to be set
227 :keyword scale:
228 Default is False. If True the bounds of the tuner-parameters
229 will be scaled between 0 and 1.
231 :return:
232 Returns two lists. First a list with the simulation results for each sample.
233 If save_files the list contains the filepaths to the results
234 Second a list of the samples.
235 :rtype: list
236 """
237 scale = kwargs.pop('scale', False)
238 # Set the output interval according the given Goals
239 mean_freq = cal_class.goals.get_meas_frequency()
240 self.logger.info("Setting output_interval of simulation according "
241 "to measurement target data frequency: %s", mean_freq)
242 self.sim_api.sim_setup.output_interval = mean_freq
243 initial_names = cal_class.tuner_paras.get_names()
244 self.sim_api.set_sim_setup({"start_time": cal_class.start_time,
245 "stop_time": cal_class.stop_time})
246 self.sim_api.result_names = cal_class.goals.get_sim_var_names()
248 self.problem = self.create_problem(cal_class.tuner_paras, scale=scale)
249 samples = self.generate_samples()
251 # creat df of samples with the result_file_names as the index
252 result_file_names = [f"simulation_{idx}" for idx in range(len(samples))]
253 samples_df = pd.DataFrame(samples, columns=initial_names, index=result_file_names)
254 samples_df.to_csv(self.working_directory.joinpath(f'samples_{cal_class.name}.csv'))
256 # Simulate the current values
257 parameters = []
258 for initial_values in samples:
259 if scale:
260 initial_values = cal_class.tuner_paras.descale(initial_values)
261 parameters.append(dict(zip(initial_names, initial_values)))
263 self.logger.info('Starting %s parameter variations on %s cores',
264 len(samples), self.sim_api.n_cpu)
265 if self.save_files:
266 sim_dir = self.savepath_sim.joinpath(f'simulations_{cal_class.name}')
267 os.makedirs(sim_dir, exist_ok=True)
268 samples_df.to_csv(self.savepath_sim.joinpath(f'samples_{cal_class.name}.csv'))
269 self.logger.info(f'Saving simulation files in: {sim_dir}')
270 if self.suffix_files == "mat":
271 postprocess_mat_result = empty_postprocessing
272 kwargs_postprocessing = {}
273 else:
274 postprocess_mat_result = convert_mat_to_suffix
275 kwargs_postprocessing = {
276 'variable_names': self.sim_api.result_names,
277 'suffix_files': self.suffix_files,
278 'parquet_engine': self.parquet_engine
279 }
280 if self.sim_api.__class__.__name__ == "DymolaAPI":
281 cal_class.input_kwargs["postprocess_mat_result"] = postprocess_mat_result
282 cal_class.input_kwargs["kwargs_postprocessing"] = kwargs_postprocessing
283 _filepaths = self.sim_api.simulate(
284 parameters=parameters,
285 return_option="savepath",
286 savepath=sim_dir,
287 result_file_name=result_file_names,
288 fail_on_error=self.fail_on_error,
289 inputs=cal_class.inputs,
290 **cal_class.input_kwargs
291 )
292 self.reproduction_files.extend(_filepaths)
293 results = _filepaths
294 else:
295 results = self.sim_api.simulate(
296 parameters=parameters,
297 inputs=cal_class.inputs,
298 fail_on_error=self.fail_on_error,
299 **cal_class.input_kwargs
300 )
301 self.logger.info('Finished %s simulations', len(samples))
302 return results, samples
304 def _check_index(self, tsd: data_types.TimeSeriesData, sim_num=None):
305 freq = tsd.frequency
306 if sim_num is None:
307 sim_num = tsd.filepath.name
308 if freq[0] != self.sim_api.sim_setup.output_interval:
309 self.logger.info(
310 f'The mean value of the frequency from {sim_num} does not match output '
311 'interval index will be cleaned and spaced equally')
312 tsd.to_datetime_index()
313 tsd.clean_and_space_equally(f'{str(self.sim_api.sim_setup.output_interval * 1000)}ms')
314 tsd.to_float_index()
315 freq = tsd.frequency
316 if freq[1] > 0.0:
317 self.logger.info(f'The standard deviation of the frequency from {sim_num} is to high '
318 f'and will be rounded to the accuracy of the output interval')
319 tsd.index = np.round(tsd.index.astype("float64"),
320 str(self.sim_api.sim_setup.output_interval)[::-1].find('.'))
321 return tsd
323 def _single_eval_statistical_measure(self, kwargs_eval):
324 """Evaluates statistical measure of one result"""
325 cal_class = kwargs_eval.pop('cal_class')
326 result = kwargs_eval.pop('result')
327 num_sim = kwargs_eval.pop('sim_num', None)
328 if result is None:
329 verbose_error = {}
330 for goal, weight in zip(cal_class.goals.get_goals_list(), cal_class.goals.weightings):
331 verbose_error[goal] = (weight, self.ret_val_on_error)
332 return self.ret_val_on_error, verbose_error
333 result = self._check_index(result, num_sim)
334 cal_class.goals.set_sim_target_data(result)
335 cal_class.goals.set_relevant_time_intervals(cal_class.relevant_intervals)
336 # Evaluate the current objective
337 total_res, verbose_calculation = cal_class.goals.eval_difference(verbose=True)
338 return total_res, verbose_calculation
340 def eval_statistical_measure(self, cal_class, results, verbose=True):
341 """Evaluates statistical measures of results on single core"""
342 self.logger.info('Starting evaluation of statistical measure')
343 output = []
344 list_output_verbose = []
345 for i, result in enumerate(results):
346 total_res, verbose_calculation = self._single_eval_statistical_measure(
347 {'cal_class': cal_class, 'result': result, 'sim_num': f'simulation_{i}'}
348 )
349 output.append(total_res)
350 list_output_verbose.append(verbose_calculation)
351 if verbose:
352 # restructure output_verbose
353 output_verbose = _restruct_verbose(list_output_verbose)
354 return np.asarray(output), output_verbose
355 return np.asarray(output)
357 def _single_load_eval_file(self, kwargs_load_eval):
358 """For multiprocessing"""
359 filepath = kwargs_load_eval.pop('filepath')
360 _result = _load_single_file(filepath, self.parquet_engine)
361 kwargs_load_eval.update({'result': _result})
362 total_res, verbose_calculation = self._single_eval_statistical_measure(kwargs_load_eval)
363 return total_res, verbose_calculation
365 def _mp_load_eval(self, _filepaths, cal_class, n_cpu):
366 """
367 Loading and evaluating the statistical measure of saved simulation files on multiple cores
368 """
369 self.logger.info(f'Load files and evaluate statistical measure on {n_cpu} processes.')
370 kwargs_load_eval = []
371 for filepath in _filepaths:
372 kwargs_load_eval.append({'filepath': filepath, 'cal_class': cal_class})
373 output_array = []
374 list_output_verbose = []
375 with mp.Pool(processes=n_cpu) as pool:
376 for total, verbose in pool.imap(self._single_load_eval_file, kwargs_load_eval):
377 output_array.append(total)
378 list_output_verbose.append(verbose)
379 output_array = np.asarray(output_array)
380 output_verbose = _restruct_verbose(list_output_verbose)
381 return output_array, output_verbose
383 def _load_eval(self, _filepaths, cal_class, n_cpu):
384 """
385 Loading and evaluating the statistical measure of saved simulation files.
386 Single- or multiprocessing possible with definition of n_cpu.
387 """
388 if n_cpu == 1:
389 results = _load_files(_filepaths, self.parquet_engine)
390 output_array, output_verbose = self.eval_statistical_measure(
391 cal_class=cal_class,
392 results=results
393 )
394 return output_array, output_verbose
395 output_array, output_verbose = self._mp_load_eval(_filepaths, cal_class, n_cpu)
396 return output_array, output_verbose
398 def run(self, calibration_classes, merge_multiple_classes=True, **kwargs):
399 """
400 Execute the sensitivity analysis for each class and
401 return the result.
403 :param CalibrationClass,list calibration_classes:
404 Either one or multiple classes for calibration with same tuner-parameters.
405 :param bool merge_multiple_classes:
406 Default True. If False, the given list of calibration-classes
407 is handled as-is. This means if you pass two CalibrationClass objects
408 with the same name (e.g. "device on"), the calibration process will run
409 for both these classes stand-alone.
410 This will automatically yield an intersection of tuner-parameters, however may
411 have advantages in some cases.
412 :keyword bool verbose:
413 Default False. If True, in addition to the combined Goals of the Classes
414 (saved under index Goal: all), the sensitivity measures of the individual
415 Goals will also be calculated and returned.
416 :keyword scale:
417 Default is False. If True the bounds of the tuner-parameters
418 will be scaled between 0 and 1.
419 :keyword bool use_fist_sim:
420 Default False. If True, the simulations of the first calibration class will be used for
421 all other calibration classes with their relevant time intervals.
422 The simulations must be stored on a hard-drive, so it must be used with
423 either save_files or load_files.
424 :keyword int n_cpu:
425 Default is 1. The number of processes to use for the evaluation of the statistical
426 measure. For n_cpu > 1 only one simulation file is loaded at once in a process and
427 dumped directly after the evaluation of the statistical measure,
428 so that only minimal memory is used.
429 Use this option for large analyses.
430 Only implemented for save_files=True or load_sim_files=True.
431 :keyword bool load_sim_files:
432 Default False. If True, no new simulations are done and old simulations are loaded.
433 The simulations and corresponding samples will be loaded from self.savepath_sim like
434 they were saved from self.save_files. Currently, the name of the sim folder must be
435 "simulations_CAL_CLASS_NAME" and for the samples "samples_CAL_CLASS_NAME".
436 The usage of the same simulations for different
437 calibration classes is not supported yet.
438 :keyword bool save_results:
439 Default True. If True, all results are saved as a csv in working_directory.
440 (samples, statistical measures and analysis variables).
441 :keyword bool plot_result:
442 Default True. If True, the results will be plotted.
443 :return:
444 Returns a pandas.DataFrame. The DataFrame has a Multiindex with the
445 levels Class, Goal and Analysis variable. The Goal name of combined goals is 'all'.
446 The variables are the tuner-parameters.
447 For the Sobol Method and calc_second_order returns a tuple of DataFrames (df_1, df_2)
448 where df_2 contains the second oder analysis variables and has an extra index level
449 Interaction, which also contains the variables.
450 :rtype: pandas.DataFrame
451 """
452 verbose = kwargs.pop('verbose', False)
453 scale = kwargs.pop('scale', False)
454 use_first_sim = kwargs.pop('use_first_sim', False)
455 n_cpu = kwargs.pop('n_cpu', 1)
456 save_results = kwargs.pop('save_results', True)
457 plot_result = kwargs.pop('plot_result', True)
458 load_sim_files = kwargs.pop('load_sim_files', False)
459 # Check correct input
460 calibration_classes = validate_cal_class_input(calibration_classes)
461 # Merge the classes for avoiding possible intersection of tuner-parameters
462 if merge_multiple_classes:
463 calibration_classes = data_types.merge_calibration_classes(calibration_classes)
465 # Check n_cpu
466 if n_cpu > mp.cpu_count():
467 raise ValueError(f"Given n_cpu '{n_cpu}' is greater "
468 "than the available number of "
469 f"cpus on your machine '{mp.cpu_count()}'")
471 # Check if the usage of the simulations from the first calibration class for all is possible
472 if use_first_sim:
473 if not self.save_files and not load_sim_files:
474 raise AttributeError('To use the simulations of the first calibration class '
475 'for all classes the simulation files must be saved. '
476 'Either set save_files=True or load already exiting files '
477 'with load_sim_files=True.')
478 start_time = 0
479 stop_time = 0
480 for idx, cal_class in enumerate(calibration_classes):
481 if idx == 0:
482 start_time = cal_class.start_time
483 stop_time = cal_class.stop_time
484 continue
485 if start_time > cal_class.start_time or stop_time < cal_class.stop_time:
486 raise ValueError(f'To use the simulations of the first calibration class '
487 f'for all classes the start and stop times of the other '
488 f'classes must be in the interval [{start_time}, {stop_time}] '
489 f'of the first calibration class.')
491 all_results = []
492 for idx, cal_class in enumerate(calibration_classes):
494 self.logger.info('Start sensitivity analysis of class: %s, '
495 'Time-Interval: %s-%s s', cal_class.name,
496 cal_class.start_time, cal_class.stop_time)
498 # Generate list with metrics of every parameter variation
499 results_goals = {}
500 if load_sim_files:
501 self.problem = self.create_problem(cal_class.tuner_paras, scale=scale)
502 if use_first_sim:
503 class_name = calibration_classes[0].name
504 else:
505 class_name = cal_class.name
506 sim_dir = self.savepath_sim.joinpath(f'simulations_{class_name}')
507 samples_path = self.savepath_sim.joinpath(f'samples_{class_name}.csv')
508 self.logger.info(f'Loading samples from {samples_path}')
509 samples = pd.read_csv(samples_path,
510 header=0,
511 index_col=0)
512 samples = samples.to_numpy()
513 result_file_names = [f"simulation_{idx}.{self.suffix_files}" for idx in
514 range(len(samples))]
515 _filepaths = [sim_dir.joinpath(result_file_name) for result_file_name in
516 result_file_names]
517 self.logger.info(f'Loading simulation files from {sim_dir}')
518 output_array, output_verbose = self._load_eval(_filepaths, cal_class, n_cpu)
519 else:
520 results, samples = self.simulate_samples(
521 cal_class=cal_class,
522 scale=scale
523 )
524 if self.save_files:
525 output_array, output_verbose = self._load_eval(results, cal_class, n_cpu)
526 else:
527 output_array, output_verbose = self.eval_statistical_measure(
528 cal_class=cal_class,
529 results=results
530 )
531 if use_first_sim:
532 load_sim_files = True
534 # combine output_array and output_verbose
535 # set key for output_array depending on one or multiple goals
536 stat_mea = {'all': output_array}
537 if len(output_verbose) == 1:
538 stat_mea = output_verbose
539 if len(output_verbose) > 1 and verbose:
540 stat_mea.update(output_verbose)
542 # save statistical measure and corresponding samples for each cal_class in working_directory
543 if save_results:
544 result_file_names = [f"simulation_{idx}" for idx in range(len(output_array))]
545 stat_mea_df = pd.DataFrame(stat_mea, index=result_file_names)
546 savepath_stat_mea = self.working_directory.joinpath(
547 f'{cal_class.goals.statistical_measure}_{cal_class.name}.csv')
548 stat_mea_df.to_csv(savepath_stat_mea)
549 self.reproduction_files.append(savepath_stat_mea)
550 samples_df = pd.DataFrame(samples, columns=cal_class.tuner_paras.get_names(),
551 index=result_file_names)
552 savepath_samples = self.working_directory.joinpath(f'samples_{cal_class.name}.csv')
553 samples_df.to_csv(savepath_samples)
554 self.reproduction_files.append(savepath_samples)
556 self.logger.info('Starting calculation of analysis variables')
557 for key, val in stat_mea.items():
558 result_goal = self.analysis_function(
559 x=samples,
560 y=val
561 )
562 results_goals[key] = result_goal
563 all_results.append(results_goals)
564 self.logger.info('Finished sensitivity analysis of class: %s, '
565 'Time-Interval: %s-%s s', cal_class.name,
566 cal_class.start_time, cal_class.stop_time)
567 result = self._conv_local_results(results=all_results,
568 local_classes=calibration_classes)
569 if save_results:
570 self._save(result)
571 if plot_result:
572 self.plot(result)
573 return result, calibration_classes
575 def _save(self, result: pd.DataFrame, time_dependent: bool = False):
576 """
577 Saves the result DataFrame of run and run_time_dependent.
578 Needs to be overwritten for Sobol results.
579 """
580 if time_dependent:
581 savepath_result = self.working_directory.joinpath(
582 f'{self.__class__.__name__}_results_time.csv')
583 else:
584 savepath_result = self.working_directory.joinpath(
585 f'{self.__class__.__name__}_results.csv')
586 result.to_csv(savepath_result)
587 self.reproduction_files.append(savepath_result)
589 @staticmethod
590 def create_problem(tuner_paras, scale=False) -> dict:
591 """Create function for later access if multiple calibration-classes are used."""
592 num_vars = len(tuner_paras.get_names())
593 bounds = np.array(tuner_paras.get_bounds())
594 if scale:
595 bounds = [np.zeros_like(bounds[0]), np.ones_like(bounds[1])]
596 problem = {'num_vars': num_vars,
597 'names': tuner_paras.get_names(),
598 'bounds': np.transpose(bounds)}
599 return problem
601 @staticmethod
602 def select_by_threshold(calibration_classes, result, analysis_variable, threshold):
603 """
604 Automatically select sensitive tuner parameters based on a given threshold
605 of a given analysis variable from a sensitivity result.
606 Uses only the combined goals.
608 :param list calibration_classes:
609 List of aixcalibuha.data_types.CalibrationClass objects that you want to
610 automatically select sensitive tuner-parameters.
611 :param pd.DataFrame result:
612 Result object of sensitivity analysis run
613 :param str analysis_variable:
614 Analysis variable to use for the selection
615 :param float threshold:
616 Minimal required value of given key
617 :return: list calibration_classes
618 """
619 for cal_class in calibration_classes:
620 first_goal = result.index.get_level_values(1)[0]
621 class_result = result.loc[cal_class.name, first_goal, analysis_variable]
622 tuner_paras = copy.deepcopy(cal_class.tuner_paras)
623 select_names = class_result[class_result < threshold].index.values
624 tuner_paras.remove_names(select_names)
625 if not tuner_paras.get_names():
626 raise ValueError(
627 'Automatic selection removed all tuner parameter '
628 f'from class {cal_class.name} after '
629 'SensitivityAnalysis was done. Please adjust the '
630 'threshold in json or manually chose tuner '
631 'parameters for the calibration.')
632 # cal_class.set_tuner_paras(tuner_paras)
633 cal_class.tuner_paras = tuner_paras
634 return calibration_classes
636 @staticmethod
637 def select_by_threshold_verbose(calibration_class: CalibrationClass,
638 result: pd.DataFrame,
639 analysis_variable: str,
640 threshold: float,
641 calc_names_for_selection: List[str] = None):
642 """
643 Select tuner-parameters of single calibration class with verbose sensitivity results.
644 This function selects tuner-parameters if their sensitivity is equal or greater
645 than the threshold in just one target value of one calibration class in the
646 sensitivity result. This can be more robust because a small sensitivity in one target
647 value and state of the system can mean that the parameter can also be calibrated in
648 a global calibration class which calibrates multiple states and target values at
649 the same time and has there not directly the same sensitivity as in the isolated
650 view of a calibration class for only one state.
652 :param CalibrationClass calibration_class:
653 The calibration class from which the tuner parameters will be selected.
654 :param pd.DataFrame result:
655 Sensitivity results to use for the selection. Can include multiple classes.
656 :param str analysis_variable:
657 The analysis variable to use for the selection.
658 :param float threshold:
659 Minimal required value of given analysis variable.
660 :param List[str] calc_names_for_selection:
661 Specifies which calibration classes in the sensitivity results will be used for
662 the selection. Default are all classes.
663 """
664 if Counter(calibration_class.tuner_paras.get_names()) != Counter(list(result.columns)):
665 raise NameError("The tuner-parameter of the calibration class do not "
666 "match the tuner-parameters in the sensitivity result."
667 "They have to match.")
669 result = result.loc[:, :, analysis_variable]
670 calc_names_results = result.index.get_level_values("Class").unique()
671 if calc_names_for_selection:
672 for cal_class in calc_names_for_selection:
673 if cal_class not in calc_names_results:
674 raise NameError(f"The calibration class name {cal_class} "
675 f"does not match any class name "
676 f"in the given sensitivity result.")
677 result = result.loc[calc_names_for_selection, :, :]
679 selected_tuners = (result >= threshold).any()
681 remove_tuners = []
682 for tuner, selected in selected_tuners.items():
683 if not selected:
684 remove_tuners.append(tuner)
685 tuner_paras = copy.deepcopy(calibration_class.tuner_paras)
686 tuner_paras.remove_names(remove_tuners)
687 if not tuner_paras.get_names():
688 raise ValueError("Threshold to small. All tuner-parameters would be removed.")
689 calibration_class.tuner_paras = tuner_paras
690 return calibration_class
692 def run_time_dependent(self, cal_class: CalibrationClass, **kwargs):
693 """
694 Calculate the time dependent sensitivity for all the single goals in the calibration class.
696 :param CalibrationClass cal_class:
697 Calibration class with tuner-parameters to calculate sensitivity for.
698 Can include dummy target date.
699 :keyword scale:
700 Default is False. If True the bounds of the tuner-parameters
701 will be scaled between 0 and 1.
702 :keyword bool load_sim_files:
703 Default False. If True, no new simulations are done and old simulations are loaded.
704 The simulations and corresponding samples will be loaded from self.savepath_sim like
705 they were saved from self.save_files. Currently, the name of the sim folder must be
706 "simulations_CAL_CLASS_NAME" and for the samples "samples_CAL_CLASS_NAME".
707 :keyword bool save_results:
708 Default True. If True, all results are saved as a csv in working_directory.
709 (samples and analysis variables).
710 :keyword int n_steps:
711 Default is all time steps. If the problem is large, the evaluation of all time steps
712 at once can cause a memory error. Then n_steps defines how many time_steps
713 are evaluated at once in chunks. This increases the needed time exponentially and
714 the simulation files must be saved.
715 :keyword bool plot_result:
716 Default True. If True, the results will be plotted.
717 :return:
718 Returns a pandas.DataFrame.
719 :rtype: pandas.DataFrame
720 """
721 scale = kwargs.pop('scale', False)
722 save_results = kwargs.pop('save_results', True)
723 plot_result = kwargs.pop('plot_result', True)
724 load_sim_files = kwargs.pop('load_sim_files', False)
725 n_steps = kwargs.pop('n_steps', 'all')
727 self.logger.info("Start time dependent sensitivity analysis.")
728 if load_sim_files:
729 self.problem = self.create_problem(cal_class.tuner_paras, scale=scale)
730 sim_dir = self.savepath_sim.joinpath(f'simulations_{cal_class.name}')
731 samples_path = self.savepath_sim.joinpath(f'samples_{cal_class.name}.csv')
732 samples = pd.read_csv(samples_path,
733 header=0,
734 index_col=0)
735 samples = samples.to_numpy()
736 result_file_names = [f"simulation_{idx}.{self.suffix_files}" for idx in
737 range(len(samples))]
738 _filepaths = [sim_dir.joinpath(result_file_name) for result_file_name in
739 result_file_names]
741 sen_time_dependent_list, time_index = self._load_analyze_tsteps(_filepaths=_filepaths,
742 samples=samples,
743 n_steps=n_steps,
744 cal_class=cal_class)
745 sen_time_dependent_df = _restruct_time_dependent(sen_time_dependent_list, time_index)
746 else:
747 results, samples = self.simulate_samples(
748 cal_class=cal_class,
749 scale=scale
750 )
751 if self.save_files:
752 sen_time_dependent_list, time_index = self._load_analyze_tsteps(_filepaths=results,
753 samples=samples,
754 n_steps=n_steps,
755 cal_class=cal_class)
756 sen_time_dependent_df = _restruct_time_dependent(sen_time_dependent_list,
757 time_index)
758 else:
759 variables = results[0].get_variable_names()
760 time_index = results[0].index.to_numpy()
761 total_result = _concat_all_sims(results)
762 sen_time_dependent_list = []
763 for time_step in time_index:
764 result_df_tstep = self._analyze_tstep_df(time_step=time_step,
765 tsteps_sim_results=total_result,
766 variables=variables,
767 samples=samples,
768 cal_class=cal_class)
769 sen_time_dependent_list.append(result_df_tstep)
770 sen_time_dependent_df = _restruct_time_dependent(sen_time_dependent_list,
771 time_index)
772 self.logger.info("Finished time dependent sensitivity analysys.")
773 if save_results:
774 self._save(sen_time_dependent_df, time_dependent=True)
775 if plot_result:
776 if isinstance(sen_time_dependent_df, pd.DataFrame):
777 plot_time_dependent(sen_time_dependent_df)
778 else:
779 plot_time_dependent(sen_time_dependent_df[0])
780 return sen_time_dependent_df
782 def _analyze_tstep_df(self, time_step, tsteps_sim_results, variables, samples, cal_class):
783 """Analyze the sensitivity at a single time step."""
784 result_dict_tstep = {}
785 for var in variables:
786 result_tstep_var = tsteps_sim_results[var].loc[time_step].to_numpy()
787 if np.all(result_tstep_var == result_tstep_var[0]):
788 sen_tstep_var = None
789 else:
790 sen_tstep_var = self.analysis_function(
791 x=samples,
792 y=result_tstep_var
793 )
794 result_dict_tstep[var] = sen_tstep_var
795 result_df_tstep = self._conv_local_results(results=[result_dict_tstep],
796 local_classes=[cal_class])
797 return result_df_tstep
799 def _load_tsteps_df(self, tsteps, _filepaths):
800 """
801 Load all simulations and extract and concat the sim results of the time steps in tsteps.
802 """
803 self.logger.info(
804 f"Loading time steps from {tsteps[0]} to {tsteps[-1]} of the simulation files.")
805 tsteps_sim_results = []
806 for _filepath in _filepaths:
807 sim = _load_single_file(_filepath)
808 tsteps_sim_results.append(sim.loc[tsteps[0]:tsteps[-1]])
809 tsteps_sim_results = _concat_all_sims(tsteps_sim_results)
810 return tsteps_sim_results
812 def _load_analyze_tsteps(self, _filepaths, samples, n_steps, cal_class):
813 """
814 Load and analyze all time steps in chunks with n_steps time steps.
815 """
816 sim1 = _load_single_file(_filepaths[0])
817 time_index = sim1.index.to_numpy()
818 variables = sim1.get_variable_names()
819 sen_time_dependent_list = []
820 if n_steps == 'all':
821 list_tsteps = [time_index]
822 elif isinstance(n_steps, int) and not (n_steps <= 0 or n_steps > len(time_index)):
823 list_tsteps = _divide_chunks(time_index, n_steps)
824 else:
825 raise ValueError(
826 f"n_steps can only be between 1 and {len(time_index)} or the string all.")
828 for tsteps in list_tsteps:
829 tsteps_sim_results = self._load_tsteps_df(tsteps=tsteps, _filepaths=_filepaths)
830 self.logger.info("Analyzing these time steps.")
831 for tstep in tsteps:
832 result_df_tstep = self._analyze_tstep_df(time_step=tstep,
833 tsteps_sim_results=tsteps_sim_results,
834 variables=variables,
835 samples=samples,
836 cal_class=cal_class)
837 sen_time_dependent_list.append(result_df_tstep)
838 return sen_time_dependent_list, time_index
840 def _conv_global_result(self, result: dict, cal_class: CalibrationClass,
841 analysis_variable: str):
842 glo_res_dict = self._get_res_dict(result=result, cal_class=cal_class,
843 analysis_variable=analysis_variable)
844 return pd.DataFrame(glo_res_dict, index=['global'])
846 def _conv_local_results(self, results: list, local_classes: list):
847 """
848 Convert the result dictionaries form SALib of each class and goal into one DataFrame.
849 Overwritten for Sobol.
850 """
851 _conv_results = []
852 tuples = []
853 for class_results, local_class in zip(results, local_classes):
854 for goal, goal_results in class_results.items():
855 for analysis_var in self.analysis_variables:
856 _conv_results.append(self._get_res_dict(result=goal_results,
857 cal_class=local_class,
858 analysis_variable=analysis_var))
859 tuples.append((local_class.name, goal, analysis_var))
860 index = pd.MultiIndex.from_tuples(tuples=tuples,
861 names=['Class', 'Goal', 'Analysis variable'])
862 df = pd.DataFrame(_conv_results, index=index)
863 return df
865 @abc.abstractmethod
866 def _get_res_dict(self, result: dict, cal_class: CalibrationClass, analysis_variable: str):
867 """
868 Convert the result object to a dict with the key
869 being the variable name and the value being the result
870 associated to analysis_variable.
871 """
872 raise NotImplementedError
874 def plot(self, result):
875 """
876 Plot the results of the sensitivity analysis method from run().
878 :param pd.DataFrame result:
879 Dataframe of the results like from the run() function.
880 :return tuple of matplotlib objects (fig, ax):
881 """
882 plot_single(result=result)
884 @staticmethod
885 def load_from_csv(path):
886 """
887 Load sensitivity results which were saved with the run() or run_time_dependent() function.
889 For second order results use the load_second_order_from_csv() function of the SobolAnalyzer.
890 """
891 result = pd.read_csv(path, index_col=[0, 1, 2])
892 return result
894 def save_for_reproduction(self,
895 title: str,
896 path: Path = None,
897 files: list = None,
898 exclude_sim_files: bool = False,
899 remove_saved_files: bool = False,
900 **kwargs):
901 """
902 Save the settings of the SenAnalyzer and SimApi in order to
903 reproduce the simulations and sensitivity analysis method.
904 All saved results will be also saved in the reproduction
905 archive. The simulations can be excluded from saving.
907 :param str title:
908 Title of the study
909 :param Path path:
910 Where to store the .zip file. If not given, self.working_directory is used.
911 :param list files:
912 List of files to save along the standard ones.
913 Examples would be plots, tables etc.
914 :param bool exclude_sim_files:
915 Default False. If True, the simulation files will not be saved in
916 the reproduction archive.
917 :param bool remove_saved_files:
918 Default False. If True, the result and simulation files will be moved
919 instead of just copied.
920 :param dict kwargs:
921 All keyword arguments except title, files, and path of the function
922 `save_reproduction_archive`. Most importantly, `log_message` may be
923 specified to avoid input during execution.
924 """
925 if files is None:
926 files = []
928 for file_path in self.reproduction_files:
929 if exclude_sim_files:
930 if 'simulation' in str(file_path):
931 continue
932 filename = "SenAnalyzer" + \
933 str(file_path).rsplit(self.working_directory.name, maxsplit=1)[-1]
934 files.append(CopyFile(
935 sourcepath=file_path,
936 filename=filename,
937 remove=remove_saved_files
938 ))
940 return self.sim_api.save_for_reproduction(
941 title=title,
942 path=path,
943 files=files,
944 **kwargs
945 )